diff options
Diffstat (limited to 'third_party/rust/hyper/src/proto')
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/conn.rs | 1419 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/decode.rs | 731 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/dispatch.rs | 750 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/encode.rs | 439 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/io.rs | 1002 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/mod.rs | 122 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h1/role.rs | 2847 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h2/client.rs | 450 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h2/mod.rs | 471 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h2/ping.rs | 555 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/h2/server.rs | 548 | ||||
-rw-r--r-- | third_party/rust/hyper/src/proto/mod.rs | 71 |
12 files changed, 9405 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/proto/h1/conn.rs b/third_party/rust/hyper/src/proto/h1/conn.rs new file mode 100644 index 0000000000..37ab380f8b --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/conn.rs @@ -0,0 +1,1419 @@ +use std::fmt; +use std::io; +use std::marker::PhantomData; +#[cfg(all(feature = "server", feature = "runtime"))] +use std::time::Duration; + +use bytes::{Buf, Bytes}; +use http::header::{HeaderValue, CONNECTION}; +use http::{HeaderMap, Method, Version}; +use httparse::ParserConfig; +use tokio::io::{AsyncRead, AsyncWrite}; +#[cfg(all(feature = "server", feature = "runtime"))] +use tokio::time::Sleep; +use tracing::{debug, error, trace}; + +use super::io::Buffered; +use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants}; +use crate::body::DecodedLength; +use crate::common::{task, Pin, Poll, Unpin}; +use crate::headers::connection_keep_alive; +use crate::proto::{BodyLength, MessageHead}; + +const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// This handles a connection, which will have been established over an +/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple +/// `Transaction`s over HTTP. +/// +/// The connection will determine when a message begins and ends as well as +/// determine if this connection can be kept alive after the message, +/// or if it is complete. +pub(crate) struct Conn<I, B, T> { + io: Buffered<I, EncodedBuf<B>>, + state: State, + _marker: PhantomData<fn(T)>, +} + +impl<I, B, T> Conn<I, B, T> +where + I: AsyncRead + AsyncWrite + Unpin, + B: Buf, + T: Http1Transaction, +{ + pub(crate) fn new(io: I) -> Conn<I, B, T> { + Conn { + io: Buffered::new(io), + state: State { + allow_half_close: false, + cached_headers: None, + error: None, + keep_alive: KA::Busy, + method: None, + h1_parser_config: ParserConfig::default(), + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout: None, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_fut: None, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_running: false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + title_case_headers: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: None, + #[cfg(feature = "ffi")] + raw_headers: false, + notify_read: false, + reading: Reading::Init, + writing: Writing::Init, + upgrade: None, + // We assume a modern world where the remote speaks HTTP/1.1. + // If they tell us otherwise, we'll downgrade in `read_head`. + version: Version::HTTP_11, + }, + _marker: PhantomData, + } + } + + #[cfg(feature = "server")] + pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { + self.io.set_flush_pipeline(enabled); + } + + pub(crate) fn set_write_strategy_queue(&mut self) { + self.io.set_write_strategy_queue(); + } + + pub(crate) fn set_max_buf_size(&mut self, max: usize) { + self.io.set_max_buf_size(max); + } + + #[cfg(feature = "client")] + pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { + self.io.set_read_buf_exact_size(sz); + } + + pub(crate) fn set_write_strategy_flatten(&mut self) { + self.io.set_write_strategy_flatten(); + } + + #[cfg(feature = "client")] + pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) { + self.state.h1_parser_config = parser_config; + } + + pub(crate) fn set_title_case_headers(&mut self) { + self.state.title_case_headers = true; + } + + pub(crate) fn set_preserve_header_case(&mut self) { + self.state.preserve_header_case = true; + } + + #[cfg(feature = "ffi")] + pub(crate) fn set_preserve_header_order(&mut self) { + self.state.preserve_header_order = true; + } + + #[cfg(feature = "client")] + pub(crate) fn set_h09_responses(&mut self) { + self.state.h09_responses = true; + } + + #[cfg(all(feature = "server", feature = "runtime"))] + pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) { + self.state.h1_header_read_timeout = Some(val); + } + + #[cfg(feature = "server")] + pub(crate) fn set_allow_half_close(&mut self) { + self.state.allow_half_close = true; + } + + #[cfg(feature = "ffi")] + pub(crate) fn set_raw_headers(&mut self, enabled: bool) { + self.state.raw_headers = enabled; + } + + pub(crate) fn into_inner(self) -> (I, Bytes) { + self.io.into_inner() + } + + pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> { + self.state.upgrade.take() + } + + pub(crate) fn is_read_closed(&self) -> bool { + self.state.is_read_closed() + } + + pub(crate) fn is_write_closed(&self) -> bool { + self.state.is_write_closed() + } + + pub(crate) fn can_read_head(&self) -> bool { + if !matches!(self.state.reading, Reading::Init) { + return false; + } + + if T::should_read_first() { + return true; + } + + !matches!(self.state.writing, Writing::Init) + } + + pub(crate) fn can_read_body(&self) -> bool { + match self.state.reading { + Reading::Body(..) | Reading::Continue(..) => true, + _ => false, + } + } + + fn should_error_on_eof(&self) -> bool { + // If we're idle, it's probably just the connection closing gracefully. + T::should_error_on_parse_eof() && !self.state.is_idle() + } + + fn has_h2_prefix(&self) -> bool { + let read_buf = self.io.read_buf(); + read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE + } + + pub(super) fn poll_read_head( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> { + debug_assert!(self.can_read_head()); + trace!("Conn::read_head"); + + let msg = match ready!(self.io.parse::<T>( + cx, + ParseContext { + cached_headers: &mut self.state.cached_headers, + req_method: &mut self.state.method, + h1_parser_config: self.state.h1_parser_config.clone(), + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout: self.state.h1_header_read_timeout, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running, + preserve_header_case: self.state.preserve_header_case, + #[cfg(feature = "ffi")] + preserve_header_order: self.state.preserve_header_order, + h09_responses: self.state.h09_responses, + #[cfg(feature = "ffi")] + on_informational: &mut self.state.on_informational, + #[cfg(feature = "ffi")] + raw_headers: self.state.raw_headers, + } + )) { + Ok(msg) => msg, + Err(e) => return self.on_read_head_error(e), + }; + + // Note: don't deconstruct `msg` into local variables, it appears + // the optimizer doesn't remove the extra copies. + + debug!("incoming body is {}", msg.decode); + + // Prevent accepting HTTP/0.9 responses after the initial one, if any. + self.state.h09_responses = false; + + // Drop any OnInformational callbacks, we're done there! + #[cfg(feature = "ffi")] + { + self.state.on_informational = None; + } + + self.state.busy(); + self.state.keep_alive &= msg.keep_alive; + self.state.version = msg.head.version; + + let mut wants = if msg.wants_upgrade { + Wants::UPGRADE + } else { + Wants::EMPTY + }; + + if msg.decode == DecodedLength::ZERO { + if msg.expect_continue { + debug!("ignoring expect-continue since body is empty"); + } + self.state.reading = Reading::KeepAlive; + if !T::should_read_first() { + self.try_keep_alive(cx); + } + } else if msg.expect_continue { + self.state.reading = Reading::Continue(Decoder::new(msg.decode)); + wants = wants.add(Wants::EXPECT); + } else { + self.state.reading = Reading::Body(Decoder::new(msg.decode)); + } + + Poll::Ready(Some(Ok((msg.head, msg.decode, wants)))) + } + + fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> { + // If we are currently waiting on a message, then an empty + // message should be reported as an error. If not, it is just + // the connection closing gracefully. + let must_error = self.should_error_on_eof(); + self.close_read(); + self.io.consume_leading_lines(); + let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty(); + if was_mid_parse || must_error { + // We check if the buf contains the h2 Preface + debug!( + "parse error ({}) with {} bytes", + e, + self.io.read_buf().len() + ); + match self.on_parse_error(e) { + Ok(()) => Poll::Pending, // XXX: wat? + Err(e) => Poll::Ready(Some(Err(e))), + } + } else { + debug!("read eof"); + self.close_write(); + Poll::Ready(None) + } + } + + pub(crate) fn poll_read_body( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<Option<io::Result<Bytes>>> { + debug_assert!(self.can_read_body()); + + let (reading, ret) = match self.state.reading { + Reading::Body(ref mut decoder) => { + match ready!(decoder.decode(cx, &mut self.io)) { + Ok(slice) => { + let (reading, chunk) = if decoder.is_eof() { + debug!("incoming body completed"); + ( + Reading::KeepAlive, + if !slice.is_empty() { + Some(Ok(slice)) + } else { + None + }, + ) + } else if slice.is_empty() { + error!("incoming body unexpectedly ended"); + // This should be unreachable, since all 3 decoders + // either set eof=true or return an Err when reading + // an empty slice... + (Reading::Closed, None) + } else { + return Poll::Ready(Some(Ok(slice))); + }; + (reading, Poll::Ready(chunk)) + } + Err(e) => { + debug!("incoming body decode error: {}", e); + (Reading::Closed, Poll::Ready(Some(Err(e)))) + } + } + } + Reading::Continue(ref decoder) => { + // Write the 100 Continue if not already responded... + if let Writing::Init = self.state.writing { + trace!("automatically sending 100 Continue"); + let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; + self.io.headers_buf().extend_from_slice(cont); + } + + // And now recurse once in the Reading::Body state... + self.state.reading = Reading::Body(decoder.clone()); + return self.poll_read_body(cx); + } + _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading), + }; + + self.state.reading = reading; + self.try_keep_alive(cx); + ret + } + + pub(crate) fn wants_read_again(&mut self) -> bool { + let ret = self.state.notify_read; + self.state.notify_read = false; + ret + } + + pub(crate) fn poll_read_keep_alive( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<crate::Result<()>> { + debug_assert!(!self.can_read_head() && !self.can_read_body()); + + if self.is_read_closed() { + Poll::Pending + } else if self.is_mid_message() { + self.mid_message_detect_eof(cx) + } else { + self.require_empty_read(cx) + } + } + + fn is_mid_message(&self) -> bool { + !matches!( + (&self.state.reading, &self.state.writing), + (&Reading::Init, &Writing::Init) + ) + } + + // This will check to make sure the io object read is empty. + // + // This should only be called for Clients wanting to enter the idle + // state. + fn require_empty_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); + debug_assert!(!self.is_mid_message()); + debug_assert!(T::is_client()); + + if !self.io.read_buf().is_empty() { + debug!("received an unexpected {} bytes", self.io.read_buf().len()); + return Poll::Ready(Err(crate::Error::new_unexpected_message())); + } + + let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; + + if num_read == 0 { + let ret = if self.should_error_on_eof() { + trace!("found unexpected EOF on busy connection: {:?}", self.state); + Poll::Ready(Err(crate::Error::new_incomplete())) + } else { + trace!("found EOF on idle connection, closing"); + Poll::Ready(Ok(())) + }; + + // order is important: should_error needs state BEFORE close_read + self.state.close_read(); + return ret; + } + + debug!( + "received unexpected {} bytes on an idle connection", + num_read + ); + Poll::Ready(Err(crate::Error::new_unexpected_message())) + } + + fn mid_message_detect_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed()); + debug_assert!(self.is_mid_message()); + + if self.state.allow_half_close || !self.io.read_buf().is_empty() { + return Poll::Pending; + } + + let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?; + + if num_read == 0 { + trace!("found unexpected EOF on busy connection: {:?}", self.state); + self.state.close_read(); + Poll::Ready(Err(crate::Error::new_incomplete())) + } else { + Poll::Ready(Ok(())) + } + } + + fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> { + debug_assert!(!self.state.is_read_closed()); + + let result = ready!(self.io.poll_read_from_io(cx)); + Poll::Ready(result.map_err(|e| { + trace!("force_io_read; io error = {:?}", e); + self.state.close(); + e + })) + } + + fn maybe_notify(&mut self, cx: &mut task::Context<'_>) { + // its possible that we returned NotReady from poll() without having + // exhausted the underlying Io. We would have done this when we + // determined we couldn't keep reading until we knew how writing + // would finish. + + match self.state.reading { + Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => { + return + } + Reading::Init => (), + }; + + match self.state.writing { + Writing::Body(..) => return, + Writing::Init | Writing::KeepAlive | Writing::Closed => (), + } + + if !self.io.is_read_blocked() { + if self.io.read_buf().is_empty() { + match self.io.poll_read_from_io(cx) { + Poll::Ready(Ok(n)) => { + if n == 0 { + trace!("maybe_notify; read eof"); + if self.state.is_idle() { + self.state.close(); + } else { + self.close_read() + } + return; + } + } + Poll::Pending => { + trace!("maybe_notify; read_from_io blocked"); + return; + } + Poll::Ready(Err(e)) => { + trace!("maybe_notify; read_from_io error: {}", e); + self.state.close(); + self.state.error = Some(crate::Error::new_io(e)); + } + } + } + self.state.notify_read = true; + } + } + + fn try_keep_alive(&mut self, cx: &mut task::Context<'_>) { + self.state.try_keep_alive::<T>(); + self.maybe_notify(cx); + } + + pub(crate) fn can_write_head(&self) -> bool { + if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) { + return false; + } + + match self.state.writing { + Writing::Init => self.io.can_headers_buf(), + _ => false, + } + } + + pub(crate) fn can_write_body(&self) -> bool { + match self.state.writing { + Writing::Body(..) => true, + Writing::Init | Writing::KeepAlive | Writing::Closed => false, + } + } + + pub(crate) fn can_buffer_body(&self) -> bool { + self.io.can_buffer() + } + + pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) { + if let Some(encoder) = self.encode_head(head, body) { + self.state.writing = if !encoder.is_eof() { + Writing::Body(encoder) + } else if encoder.is_last() { + Writing::Closed + } else { + Writing::KeepAlive + }; + } + } + + pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) { + if let Some(encoder) = + self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64))) + { + let is_last = encoder.is_last(); + // Make sure we don't write a body if we weren't actually allowed + // to do so, like because its a HEAD request. + if !encoder.is_eof() { + encoder.danger_full_buf(body, self.io.write_buf()); + } + self.state.writing = if is_last { + Writing::Closed + } else { + Writing::KeepAlive + } + } + } + + fn encode_head( + &mut self, + mut head: MessageHead<T::Outgoing>, + body: Option<BodyLength>, + ) -> Option<Encoder> { + debug_assert!(self.can_write_head()); + + if !T::should_read_first() { + self.state.busy(); + } + + self.enforce_version(&mut head); + + let buf = self.io.headers_buf(); + match super::role::encode_headers::<T>( + Encode { + head: &mut head, + body, + #[cfg(feature = "server")] + keep_alive: self.state.wants_keep_alive(), + req_method: &mut self.state.method, + title_case_headers: self.state.title_case_headers, + }, + buf, + ) { + Ok(encoder) => { + debug_assert!(self.state.cached_headers.is_none()); + debug_assert!(head.headers.is_empty()); + self.state.cached_headers = Some(head.headers); + + #[cfg(feature = "ffi")] + { + self.state.on_informational = + head.extensions.remove::<crate::ffi::OnInformational>(); + } + + Some(encoder) + } + Err(err) => { + self.state.error = Some(err); + self.state.writing = Writing::Closed; + None + } + } + } + + // Fix keep-alive when Connection: keep-alive header is not present + fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) { + let outgoing_is_keep_alive = head + .headers + .get(CONNECTION) + .map(connection_keep_alive) + .unwrap_or(false); + + if !outgoing_is_keep_alive { + match head.version { + // If response is version 1.0 and keep-alive is not present in the response, + // disable keep-alive so the server closes the connection + Version::HTTP_10 => self.state.disable_keep_alive(), + // If response is version 1.1 and keep-alive is wanted, add + // Connection: keep-alive header when not present + Version::HTTP_11 => { + if self.state.wants_keep_alive() { + head.headers + .insert(CONNECTION, HeaderValue::from_static("keep-alive")); + } + } + _ => (), + } + } + } + + // If we know the remote speaks an older version, we try to fix up any messages + // to work with our older peer. + fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) { + if let Version::HTTP_10 = self.state.version { + // Fixes response or connection when keep-alive header is not present + self.fix_keep_alive(head); + // If the remote only knows HTTP/1.0, we should force ourselves + // to do only speak HTTP/1.0 as well. + head.version = Version::HTTP_10; + } + // If the remote speaks HTTP/1.1, then it *should* be fine with + // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let + // the user's headers be. + } + + pub(crate) fn write_body(&mut self, chunk: B) { + debug_assert!(self.can_write_body() && self.can_buffer_body()); + // empty chunks should be discarded at Dispatcher level + debug_assert!(chunk.remaining() != 0); + + let state = match self.state.writing { + Writing::Body(ref mut encoder) => { + self.io.buffer(encoder.encode(chunk)); + + if !encoder.is_eof() { + return; + } + + if encoder.is_last() { + Writing::Closed + } else { + Writing::KeepAlive + } + } + _ => unreachable!("write_body invalid state: {:?}", self.state.writing), + }; + + self.state.writing = state; + } + + pub(crate) fn write_body_and_end(&mut self, chunk: B) { + debug_assert!(self.can_write_body() && self.can_buffer_body()); + // empty chunks should be discarded at Dispatcher level + debug_assert!(chunk.remaining() != 0); + + let state = match self.state.writing { + Writing::Body(ref encoder) => { + let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf()); + if can_keep_alive { + Writing::KeepAlive + } else { + Writing::Closed + } + } + _ => unreachable!("write_body invalid state: {:?}", self.state.writing), + }; + + self.state.writing = state; + } + + pub(crate) fn end_body(&mut self) -> crate::Result<()> { + debug_assert!(self.can_write_body()); + + let encoder = match self.state.writing { + Writing::Body(ref mut enc) => enc, + _ => return Ok(()), + }; + + // end of stream, that means we should try to eof + match encoder.end() { + Ok(end) => { + if let Some(end) = end { + self.io.buffer(end); + } + + self.state.writing = if encoder.is_last() || encoder.is_close_delimited() { + Writing::Closed + } else { + Writing::KeepAlive + }; + + Ok(()) + } + Err(not_eof) => { + self.state.writing = Writing::Closed; + Err(crate::Error::new_body_write_aborted().with(not_eof)) + } + } + } + + // When we get a parse error, depending on what side we are, we might be able + // to write a response before closing the connection. + // + // - Client: there is nothing we can do + // - Server: if Response hasn't been written yet, we can send a 4xx response + fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> { + if let Writing::Init = self.state.writing { + if self.has_h2_prefix() { + return Err(crate::Error::new_version_h2()); + } + if let Some(msg) = T::on_error(&err) { + // Drop the cached headers so as to not trigger a debug + // assert in `write_head`... + self.state.cached_headers.take(); + self.write_head(msg, None); + self.state.error = Some(err); + return Ok(()); + } + } + + // fallback is pass the error back up + Err(err) + } + + pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + ready!(Pin::new(&mut self.io).poll_flush(cx))?; + self.try_keep_alive(cx); + trace!("flushed({}): {:?}", T::LOG, self.state); + Poll::Ready(Ok(())) + } + + pub(crate) fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) { + Ok(()) => { + trace!("shut down IO complete"); + Poll::Ready(Ok(())) + } + Err(e) => { + debug!("error shutting down IO: {}", e); + Poll::Ready(Err(e)) + } + } + } + + /// If the read side can be cheaply drained, do so. Otherwise, close. + pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { + let _ = self.poll_read_body(cx); + + // If still in Reading::Body, just give up + match self.state.reading { + Reading::Init | Reading::KeepAlive => trace!("body drained"), + _ => self.close_read(), + } + } + + pub(crate) fn close_read(&mut self) { + self.state.close_read(); + } + + pub(crate) fn close_write(&mut self) { + self.state.close_write(); + } + + #[cfg(feature = "server")] + pub(crate) fn disable_keep_alive(&mut self) { + if self.state.is_idle() { + trace!("disable_keep_alive; closing idle connection"); + self.state.close(); + } else { + trace!("disable_keep_alive; in-progress connection"); + self.state.disable_keep_alive(); + } + } + + pub(crate) fn take_error(&mut self) -> crate::Result<()> { + if let Some(err) = self.state.error.take() { + Err(err) + } else { + Ok(()) + } + } + + pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade { + trace!("{}: prepare possible HTTP upgrade", T::LOG); + self.state.prepare_upgrade() + } +} + +impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Conn") + .field("state", &self.state) + .field("io", &self.io) + .finish() + } +} + +// B and T are never pinned +impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {} + +struct State { + allow_half_close: bool, + /// Re-usable HeaderMap to reduce allocating new ones. + cached_headers: Option<HeaderMap>, + /// If an error occurs when there wasn't a direct way to return it + /// back to the user, this is set. + error: Option<crate::Error>, + /// Current keep-alive status. + keep_alive: KA, + /// If mid-message, the HTTP Method that started it. + /// + /// This is used to know things such as if the message can include + /// a body or not. + method: Option<Method>, + h1_parser_config: ParserConfig, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout: Option<Duration>, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_running: bool, + preserve_header_case: bool, + #[cfg(feature = "ffi")] + preserve_header_order: bool, + title_case_headers: bool, + h09_responses: bool, + /// If set, called with each 1xx informational response received for + /// the current request. MUST be unset after a non-1xx response is + /// received. + #[cfg(feature = "ffi")] + on_informational: Option<crate::ffi::OnInformational>, + #[cfg(feature = "ffi")] + raw_headers: bool, + /// Set to true when the Dispatcher should poll read operations + /// again. See the `maybe_notify` method for more. + notify_read: bool, + /// State of allowed reads + reading: Reading, + /// State of allowed writes + writing: Writing, + /// An expected pending HTTP upgrade. + upgrade: Option<crate::upgrade::Pending>, + /// Either HTTP/1.0 or 1.1 connection + version: Version, +} + +#[derive(Debug)] +enum Reading { + Init, + Continue(Decoder), + Body(Decoder), + KeepAlive, + Closed, +} + +enum Writing { + Init, + Body(Encoder), + KeepAlive, + Closed, +} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut builder = f.debug_struct("State"); + builder + .field("reading", &self.reading) + .field("writing", &self.writing) + .field("keep_alive", &self.keep_alive); + + // Only show error field if it's interesting... + if let Some(ref error) = self.error { + builder.field("error", error); + } + + if self.allow_half_close { + builder.field("allow_half_close", &true); + } + + // Purposefully leaving off other fields.. + + builder.finish() + } +} + +impl fmt::Debug for Writing { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + Writing::Init => f.write_str("Init"), + Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(), + Writing::KeepAlive => f.write_str("KeepAlive"), + Writing::Closed => f.write_str("Closed"), + } + } +} + +impl std::ops::BitAndAssign<bool> for KA { + fn bitand_assign(&mut self, enabled: bool) { + if !enabled { + trace!("remote disabling keep-alive"); + *self = KA::Disabled; + } + } +} + +#[derive(Clone, Copy, Debug)] +enum KA { + Idle, + Busy, + Disabled, +} + +impl Default for KA { + fn default() -> KA { + KA::Busy + } +} + +impl KA { + fn idle(&mut self) { + *self = KA::Idle; + } + + fn busy(&mut self) { + *self = KA::Busy; + } + + fn disable(&mut self) { + *self = KA::Disabled; + } + + fn status(&self) -> KA { + *self + } +} + +impl State { + fn close(&mut self) { + trace!("State::close()"); + self.reading = Reading::Closed; + self.writing = Writing::Closed; + self.keep_alive.disable(); + } + + fn close_read(&mut self) { + trace!("State::close_read()"); + self.reading = Reading::Closed; + self.keep_alive.disable(); + } + + fn close_write(&mut self) { + trace!("State::close_write()"); + self.writing = Writing::Closed; + self.keep_alive.disable(); + } + + fn wants_keep_alive(&self) -> bool { + if let KA::Disabled = self.keep_alive.status() { + false + } else { + true + } + } + + fn try_keep_alive<T: Http1Transaction>(&mut self) { + match (&self.reading, &self.writing) { + (&Reading::KeepAlive, &Writing::KeepAlive) => { + if let KA::Busy = self.keep_alive.status() { + self.idle::<T>(); + } else { + trace!( + "try_keep_alive({}): could keep-alive, but status = {:?}", + T::LOG, + self.keep_alive + ); + self.close(); + } + } + (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => { + self.close() + } + _ => (), + } + } + + fn disable_keep_alive(&mut self) { + self.keep_alive.disable() + } + + fn busy(&mut self) { + if let KA::Disabled = self.keep_alive.status() { + return; + } + self.keep_alive.busy(); + } + + fn idle<T: Http1Transaction>(&mut self) { + debug_assert!(!self.is_idle(), "State::idle() called while idle"); + + self.method = None; + self.keep_alive.idle(); + + if !self.is_idle() { + self.close(); + return; + } + + self.reading = Reading::Init; + self.writing = Writing::Init; + + // !T::should_read_first() means Client. + // + // If Client connection has just gone idle, the Dispatcher + // should try the poll loop one more time, so as to poll the + // pending requests stream. + if !T::should_read_first() { + self.notify_read = true; + } + } + + fn is_idle(&self) -> bool { + matches!(self.keep_alive.status(), KA::Idle) + } + + fn is_read_closed(&self) -> bool { + matches!(self.reading, Reading::Closed) + } + + fn is_write_closed(&self) -> bool { + matches!(self.writing, Writing::Closed) + } + + fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade { + let (tx, rx) = crate::upgrade::pending(); + self.upgrade = Some(tx); + rx + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "nightly")] + #[bench] + fn bench_read_head_short(b: &mut ::test::Bencher) { + use super::*; + let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"; + let len = s.len(); + b.bytes = len as u64; + + // an empty IO, we'll be skipping and using the read buffer anyways + let io = tokio_test::io::Builder::new().build(); + let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io); + *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]); + conn.state.cached_headers = Some(HeaderMap::with_capacity(2)); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + b.iter(|| { + rt.block_on(futures_util::future::poll_fn(|cx| { + match conn.poll_read_head(cx) { + Poll::Ready(Some(Ok(x))) => { + ::test::black_box(&x); + let mut headers = x.0.headers; + headers.clear(); + conn.state.cached_headers = Some(headers); + } + f => panic!("expected Ready(Some(Ok(..))): {:?}", f), + } + + conn.io.read_buf_mut().reserve(1); + unsafe { + conn.io.read_buf_mut().set_len(len); + } + conn.state.reading = Reading::Init; + Poll::Ready(()) + })); + }); + } + + /* + //TODO: rewrite these using dispatch... someday... + use futures::{Async, Future, Stream, Sink}; + use futures::future; + + use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; + use super::super::Encoder; + use mock::AsyncIo; + + use super::{Conn, Decoder, Reading, Writing}; + use ::uri::Uri; + + use std::str::FromStr; + + #[test] + fn test_conn_init_read() { + let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); + let len = good_message.len(); + let io = AsyncIo::new_buf(good_message, len); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + + match conn.poll().unwrap() { + Async::Ready(Some(Frame::Message { message, body: false })) => { + assert_eq!(message, MessageHead { + subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()), + .. MessageHead::default() + }) + }, + f => panic!("frame is not Frame::Message: {:?}", f) + } + } + + #[test] + fn test_conn_parse_partial() { + let _: Result<(), ()> = future::lazy(|| { + let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); + let io = AsyncIo::new_buf(good_message, 10); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + assert!(conn.poll().unwrap().is_not_ready()); + conn.io.io_mut().block_in(50); + let async = conn.poll().unwrap(); + assert!(async.is_ready()); + match async { + Async::Ready(Some(Frame::Message { .. })) => (), + f => panic!("frame is not Message: {:?}", f), + } + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_init_read_eof_idle() { + let io = AsyncIo::new_buf(vec![], 1); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.idle(); + + match conn.poll().unwrap() { + Async::Ready(None) => {}, + other => panic!("frame is not None: {:?}", other) + } + } + + #[test] + fn test_conn_init_read_eof_idle_partial_parse() { + let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.idle(); + + match conn.poll() { + Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, + other => panic!("unexpected frame: {:?}", other) + } + } + + #[test] + fn test_conn_init_read_eof_busy() { + let _: Result<(), ()> = future::lazy(|| { + // server ignores + let io = AsyncIo::new_eof(); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.busy(); + + match conn.poll().unwrap() { + Async::Ready(None) => {}, + other => panic!("unexpected frame: {:?}", other) + } + + // client + let io = AsyncIo::new_eof(); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); + conn.state.busy(); + + match conn.poll() { + Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {}, + other => panic!("unexpected frame: {:?}", other) + } + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_body_finish_read_eof() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_eof(); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); + conn.state.busy(); + conn.state.writing = Writing::KeepAlive; + conn.state.reading = Reading::Body(Decoder::length(0)); + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + // conn eofs, but tokio-proto will call poll() again, before calling flush() + // the conn eof in this case is perfectly fine + + match conn.poll() { + Ok(Async::Ready(None)) => (), + other => panic!("unexpected frame: {:?}", other) + } + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_message_empty_body_read_eof() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024); + let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io); + conn.state.busy(); + conn.state.writing = Writing::KeepAlive; + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + // conn eofs, but tokio-proto will call poll() again, before calling flush() + // the conn eof in this case is perfectly fine + + match conn.poll() { + Ok(Async::Ready(None)) => (), + other => panic!("unexpected frame: {:?}", other) + } + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_read_body_end() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.busy(); + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` + match conn.poll() { + Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + match conn.poll() { + Ok(Async::NotReady) => (), + other => panic!("unexpected frame: {:?}", other) + } + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_closed_read() { + let io = AsyncIo::new_buf(vec![], 0); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.close(); + + match conn.poll().unwrap() { + Async::Ready(None) => {}, + other => panic!("frame is not None: {:?}", other) + } + } + + #[test] + fn test_conn_body_write_length() { + let _ = pretty_env_logger::try_init(); + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 0); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096; + conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); + + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); + assert!(!conn.can_buffer_body()); + + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); + + conn.io.io_mut().block_in(1024 * 3); + assert!(conn.poll_complete().unwrap().is_not_ready()); + conn.io.io_mut().block_in(1024 * 3); + assert!(conn.poll_complete().unwrap().is_not_ready()); + conn.io.io_mut().block_in(max * 2); + assert!(conn.poll_complete().unwrap().is_ready()); + + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready()); + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_body_write_chunked() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.writing = Writing::Body(Encoder::chunked()); + + assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_body_flush() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); + assert!(!conn.can_buffer_body()); + conn.io.io_mut().block_in(1024 * 1024 * 5); + assert!(conn.poll_complete().unwrap().is_ready()); + assert!(conn.can_buffer_body()); + assert!(conn.io.io_mut().flushed()); + + Ok(()) + }).wait(); + } + + #[test] + fn test_conn_parking() { + use std::sync::Arc; + use futures::executor::Notify; + use futures::executor::NotifyHandle; + + struct Car { + permit: bool, + } + impl Notify for Car { + fn notify(&self, _id: usize) { + assert!(self.permit, "unparked without permit"); + } + } + + fn car(permit: bool) -> NotifyHandle { + Arc::new(Car { + permit: permit, + }).into() + } + + // test that once writing is done, unparks + let f = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.reading = Reading::KeepAlive; + assert!(conn.poll().unwrap().is_not_ready()); + + conn.state.writing = Writing::KeepAlive; + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap(); + + + // test that flushing when not waiting on read doesn't unpark + let f = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.writing = Writing::KeepAlive; + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); + + + // test that flushing and writing isn't done doesn't unpark + let f = future::lazy(|| { + let io = AsyncIo::new_buf(vec![], 4096); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.reading = Reading::KeepAlive; + assert!(conn.poll().unwrap().is_not_ready()); + conn.state.writing = Writing::Body(Encoder::length(5_000)); + assert!(conn.poll_complete().unwrap().is_ready()); + Ok::<(), ()>(()) + }); + ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap(); + } + + #[test] + fn test_conn_closed_write() { + let io = AsyncIo::new_buf(vec![], 0); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.close(); + + match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) { + Err(_e) => {}, + other => panic!("did not return Err: {:?}", other) + } + + assert!(conn.state.is_write_closed()); + } + + #[test] + fn test_conn_write_empty_chunk() { + let io = AsyncIo::new_buf(vec![], 0); + let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io); + conn.state.writing = Writing::KeepAlive; + + assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready()); + assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); + conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); + } + */ +} diff --git a/third_party/rust/hyper/src/proto/h1/decode.rs b/third_party/rust/hyper/src/proto/h1/decode.rs new file mode 100644 index 0000000000..1e3a38effc --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/decode.rs @@ -0,0 +1,731 @@ +use std::error::Error as StdError; +use std::fmt; +use std::io; +use std::usize; + +use bytes::Bytes; +use tracing::{debug, trace}; + +use crate::common::{task, Poll}; + +use super::io::MemRead; +use super::DecodedLength; + +use self::Kind::{Chunked, Eof, Length}; + +/// Decoders to handle different Transfer-Encodings. +/// +/// If a message body does not include a Transfer-Encoding, it *should* +/// include a Content-Length header. +#[derive(Clone, PartialEq)] +pub(crate) struct Decoder { + kind: Kind, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum Kind { + /// A Reader used when a Content-Length header is passed with a positive integer. + Length(u64), + /// A Reader used when Transfer-Encoding is `chunked`. + Chunked(ChunkedState, u64), + /// A Reader used for responses that don't indicate a length or chunked. + /// + /// The bool tracks when EOF is seen on the transport. + /// + /// Note: This should only used for `Response`s. It is illegal for a + /// `Request` to be made with both `Content-Length` and + /// `Transfer-Encoding: chunked` missing, as explained from the spec: + /// + /// > If a Transfer-Encoding header field is present in a response and + /// > the chunked transfer coding is not the final encoding, the + /// > message body length is determined by reading the connection until + /// > it is closed by the server. If a Transfer-Encoding header field + /// > is present in a request and the chunked transfer coding is not + /// > the final encoding, the message body length cannot be determined + /// > reliably; the server MUST respond with the 400 (Bad Request) + /// > status code and then close the connection. + Eof(bool), +} + +#[derive(Debug, PartialEq, Clone, Copy)] +enum ChunkedState { + Size, + SizeLws, + Extension, + SizeLf, + Body, + BodyCr, + BodyLf, + Trailer, + TrailerLf, + EndCr, + EndLf, + End, +} + +impl Decoder { + // constructors + + pub(crate) fn length(x: u64) -> Decoder { + Decoder { + kind: Kind::Length(x), + } + } + + pub(crate) fn chunked() -> Decoder { + Decoder { + kind: Kind::Chunked(ChunkedState::Size, 0), + } + } + + pub(crate) fn eof() -> Decoder { + Decoder { + kind: Kind::Eof(false), + } + } + + pub(super) fn new(len: DecodedLength) -> Self { + match len { + DecodedLength::CHUNKED => Decoder::chunked(), + DecodedLength::CLOSE_DELIMITED => Decoder::eof(), + length => Decoder::length(length.danger_len()), + } + } + + // methods + + pub(crate) fn is_eof(&self) -> bool { + matches!(self.kind, Length(0) | Chunked(ChunkedState::End, _) | Eof(true)) + } + + pub(crate) fn decode<R: MemRead>( + &mut self, + cx: &mut task::Context<'_>, + body: &mut R, + ) -> Poll<Result<Bytes, io::Error>> { + trace!("decode; state={:?}", self.kind); + match self.kind { + Length(ref mut remaining) => { + if *remaining == 0 { + Poll::Ready(Ok(Bytes::new())) + } else { + let to_read = *remaining as usize; + let buf = ready!(body.read_mem(cx, to_read))?; + let num = buf.as_ref().len() as u64; + if num > *remaining { + *remaining = 0; + } else if num == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + IncompleteBody, + ))); + } else { + *remaining -= num; + } + Poll::Ready(Ok(buf)) + } + } + Chunked(ref mut state, ref mut size) => { + loop { + let mut buf = None; + // advances the chunked state + *state = ready!(state.step(cx, body, size, &mut buf))?; + if *state == ChunkedState::End { + trace!("end of chunked"); + return Poll::Ready(Ok(Bytes::new())); + } + if let Some(buf) = buf { + return Poll::Ready(Ok(buf)); + } + } + } + Eof(ref mut is_eof) => { + if *is_eof { + Poll::Ready(Ok(Bytes::new())) + } else { + // 8192 chosen because its about 2 packets, there probably + // won't be that much available, so don't have MemReaders + // allocate buffers to big + body.read_mem(cx, 8192).map_ok(|slice| { + *is_eof = slice.is_empty(); + slice + }) + } + } + } + } + + #[cfg(test)] + async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> { + futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await + } +} + +impl fmt::Debug for Decoder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.kind, f) + } +} + +macro_rules! byte ( + ($rdr:ident, $cx:expr) => ({ + let buf = ready!($rdr.read_mem($cx, 1))?; + if !buf.is_empty() { + buf[0] + } else { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof, + "unexpected EOF during chunk size line"))); + } + }) +); + +impl ChunkedState { + fn step<R: MemRead>( + &self, + cx: &mut task::Context<'_>, + body: &mut R, + size: &mut u64, + buf: &mut Option<Bytes>, + ) -> Poll<Result<ChunkedState, io::Error>> { + use self::ChunkedState::*; + match *self { + Size => ChunkedState::read_size(cx, body, size), + SizeLws => ChunkedState::read_size_lws(cx, body), + Extension => ChunkedState::read_extension(cx, body), + SizeLf => ChunkedState::read_size_lf(cx, body, *size), + Body => ChunkedState::read_body(cx, body, size, buf), + BodyCr => ChunkedState::read_body_cr(cx, body), + BodyLf => ChunkedState::read_body_lf(cx, body), + Trailer => ChunkedState::read_trailer(cx, body), + TrailerLf => ChunkedState::read_trailer_lf(cx, body), + EndCr => ChunkedState::read_end_cr(cx, body), + EndLf => ChunkedState::read_end_lf(cx, body), + End => Poll::Ready(Ok(ChunkedState::End)), + } + } + fn read_size<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + size: &mut u64, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("Read chunk hex size"); + + macro_rules! or_overflow { + ($e:expr) => ( + match $e { + Some(val) => val, + None => return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid chunk size: overflow", + ))), + } + ) + } + + let radix = 16; + match byte!(rdr, cx) { + b @ b'0'..=b'9' => { + *size = or_overflow!(size.checked_mul(radix)); + *size = or_overflow!(size.checked_add((b - b'0') as u64)); + } + b @ b'a'..=b'f' => { + *size = or_overflow!(size.checked_mul(radix)); + *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64)); + } + b @ b'A'..=b'F' => { + *size = or_overflow!(size.checked_mul(radix)); + *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64)); + } + b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)), + b';' => return Poll::Ready(Ok(ChunkedState::Extension)), + b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)), + _ => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size line: Invalid Size", + ))); + } + } + Poll::Ready(Ok(ChunkedState::Size)) + } + fn read_size_lws<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("read_size_lws"); + match byte!(rdr, cx) { + // LWS can follow the chunk size, but no more digits can come + b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)), + b';' => Poll::Ready(Ok(ChunkedState::Extension)), + b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size linear white space", + ))), + } + } + fn read_extension<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("read_extension"); + // We don't care about extensions really at all. Just ignore them. + // They "end" at the next CRLF. + // + // However, some implementations may not check for the CR, so to save + // them from themselves, we reject extensions containing plain LF as + // well. + match byte!(rdr, cx) { + b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)), + b'\n' => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid chunk extension contains newline", + ))), + _ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions + } + } + fn read_size_lf<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + size: u64, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("Chunk size is {:?}", size); + match byte!(rdr, cx) { + b'\n' => { + if size == 0 { + Poll::Ready(Ok(ChunkedState::EndCr)) + } else { + debug!("incoming chunked header: {0:#X} ({0} bytes)", size); + Poll::Ready(Ok(ChunkedState::Body)) + } + } + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size LF", + ))), + } + } + + fn read_body<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + rem: &mut u64, + buf: &mut Option<Bytes>, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("Chunked read, remaining={:?}", rem); + + // cap remaining bytes at the max capacity of usize + let rem_cap = match *rem { + r if r > usize::MAX as u64 => usize::MAX, + r => r as usize, + }; + + let to_read = rem_cap; + let slice = ready!(rdr.read_mem(cx, to_read))?; + let count = slice.len(); + + if count == 0 { + *rem = 0; + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + IncompleteBody, + ))); + } + *buf = Some(slice); + *rem -= count as u64; + + if *rem > 0 { + Poll::Ready(Ok(ChunkedState::Body)) + } else { + Poll::Ready(Ok(ChunkedState::BodyCr)) + } + } + fn read_body_cr<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + match byte!(rdr, cx) { + b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)), + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk body CR", + ))), + } + } + fn read_body_lf<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + match byte!(rdr, cx) { + b'\n' => Poll::Ready(Ok(ChunkedState::Size)), + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk body LF", + ))), + } + } + + fn read_trailer<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + trace!("read_trailer"); + match byte!(rdr, cx) { + b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)), + _ => Poll::Ready(Ok(ChunkedState::Trailer)), + } + } + fn read_trailer_lf<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + match byte!(rdr, cx) { + b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)), + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid trailer end LF", + ))), + } + } + + fn read_end_cr<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + match byte!(rdr, cx) { + b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)), + _ => Poll::Ready(Ok(ChunkedState::Trailer)), + } + } + fn read_end_lf<R: MemRead>( + cx: &mut task::Context<'_>, + rdr: &mut R, + ) -> Poll<Result<ChunkedState, io::Error>> { + match byte!(rdr, cx) { + b'\n' => Poll::Ready(Ok(ChunkedState::End)), + _ => Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk end LF", + ))), + } + } +} + +#[derive(Debug)] +struct IncompleteBody; + +impl fmt::Display for IncompleteBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "end of file before message length reached") + } +} + +impl StdError for IncompleteBody {} + +#[cfg(test)] +mod tests { + use super::*; + use std::pin::Pin; + use std::time::Duration; + use tokio::io::{AsyncRead, ReadBuf}; + + impl<'a> MemRead for &'a [u8] { + fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { + let n = std::cmp::min(len, self.len()); + if n > 0 { + let (a, b) = self.split_at(n); + let buf = Bytes::copy_from_slice(a); + *self = b; + Poll::Ready(Ok(buf)) + } else { + Poll::Ready(Ok(Bytes::new())) + } + } + } + + impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) { + fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { + let mut v = vec![0; len]; + let mut buf = ReadBuf::new(&mut v); + ready!(Pin::new(self).poll_read(cx, &mut buf)?); + Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled()))) + } + } + + #[cfg(feature = "nightly")] + impl MemRead for Bytes { + fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { + let n = std::cmp::min(len, self.len()); + let ret = self.split_to(n); + Poll::Ready(Ok(ret)) + } + } + + /* + use std::io; + use std::io::Write; + use super::Decoder; + use super::ChunkedState; + use futures::{Async, Poll}; + use bytes::{BytesMut, Bytes}; + use crate::mock::AsyncIo; + */ + + #[tokio::test] + async fn test_read_chunk_size() { + use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof}; + + async fn read(s: &str) -> u64 { + let mut state = ChunkedState::Size; + let rdr = &mut s.as_bytes(); + let mut size = 0; + loop { + let result = + futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None)) + .await; + let desc = format!("read_size failed for {:?}", s); + state = result.expect(desc.as_str()); + if state == ChunkedState::Body || state == ChunkedState::EndCr { + break; + } + } + size + } + + async fn read_err(s: &str, expected_err: io::ErrorKind) { + let mut state = ChunkedState::Size; + let rdr = &mut s.as_bytes(); + let mut size = 0; + loop { + let result = + futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None)) + .await; + state = match result { + Ok(s) => s, + Err(e) => { + assert!( + expected_err == e.kind(), + "Reading {:?}, expected {:?}, but got {:?}", + s, + expected_err, + e.kind() + ); + return; + } + }; + if state == ChunkedState::Body || state == ChunkedState::End { + panic!("Was Ok. Expected Err for {:?}", s); + } + } + } + + assert_eq!(1, read("1\r\n").await); + assert_eq!(1, read("01\r\n").await); + assert_eq!(0, read("0\r\n").await); + assert_eq!(0, read("00\r\n").await); + assert_eq!(10, read("A\r\n").await); + assert_eq!(10, read("a\r\n").await); + assert_eq!(255, read("Ff\r\n").await); + assert_eq!(255, read("Ff \r\n").await); + // Missing LF or CRLF + read_err("F\rF", InvalidInput).await; + read_err("F", UnexpectedEof).await; + // Invalid hex digit + read_err("X\r\n", InvalidInput).await; + read_err("1X\r\n", InvalidInput).await; + read_err("-\r\n", InvalidInput).await; + read_err("-1\r\n", InvalidInput).await; + // Acceptable (if not fully valid) extensions do not influence the size + assert_eq!(1, read("1;extension\r\n").await); + assert_eq!(10, read("a;ext name=value\r\n").await); + assert_eq!(1, read("1;extension;extension2\r\n").await); + assert_eq!(1, read("1;;; ;\r\n").await); + assert_eq!(2, read("2; extension...\r\n").await); + assert_eq!(3, read("3 ; extension=123\r\n").await); + assert_eq!(3, read("3 ;\r\n").await); + assert_eq!(3, read("3 ; \r\n").await); + // Invalid extensions cause an error + read_err("1 invalid extension\r\n", InvalidInput).await; + read_err("1 A\r\n", InvalidInput).await; + read_err("1;no CRLF", UnexpectedEof).await; + read_err("1;reject\nnewlines\r\n", InvalidData).await; + // Overflow + read_err("f0000000000000003\r\n", InvalidData).await; + } + + #[tokio::test] + async fn test_read_sized_early_eof() { + let mut bytes = &b"foo bar"[..]; + let mut decoder = Decoder::length(10); + assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); + let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); + } + + #[tokio::test] + async fn test_read_chunked_early_eof() { + let mut bytes = &b"\ + 9\r\n\ + foo bar\ + "[..]; + let mut decoder = Decoder::chunked(); + assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7); + let e = decoder.decode_fut(&mut bytes).await.unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); + } + + #[tokio::test] + async fn test_read_chunked_single_read() { + let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..]; + let buf = Decoder::chunked() + .decode_fut(&mut mock_buf) + .await + .expect("decode"); + assert_eq!(16, buf.len()); + let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); + assert_eq!("1234567890abcdef", &result); + } + + #[tokio::test] + async fn test_read_chunked_trailer_with_missing_lf() { + let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..]; + let mut decoder = Decoder::chunked(); + decoder.decode_fut(&mut mock_buf).await.expect("decode"); + let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err(); + assert_eq!(e.kind(), io::ErrorKind::InvalidInput); + } + + #[tokio::test] + async fn test_read_chunked_after_eof() { + let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..]; + let mut decoder = Decoder::chunked(); + + // normal read + let buf = decoder.decode_fut(&mut mock_buf).await.unwrap(); + assert_eq!(16, buf.len()); + let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String"); + assert_eq!("1234567890abcdef", &result); + + // eof read + let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); + assert_eq!(0, buf.len()); + + // ensure read after eof also returns eof + let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode"); + assert_eq!(0, buf.len()); + } + + // perform an async read using a custom buffer size and causing a blocking + // read at the specified byte + async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String { + let mut outs = Vec::new(); + + let mut ins = if block_at == 0 { + tokio_test::io::Builder::new() + .wait(Duration::from_millis(10)) + .read(content) + .build() + } else { + tokio_test::io::Builder::new() + .read(&content[..block_at]) + .wait(Duration::from_millis(10)) + .read(&content[block_at..]) + .build() + }; + + let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin); + + loop { + let buf = decoder + .decode_fut(&mut ins) + .await + .expect("unexpected decode error"); + if buf.is_empty() { + break; // eof + } + outs.extend(buf.as_ref()); + } + + String::from_utf8(outs).expect("decode String") + } + + // iterate over the different ways that this async read could go. + // tests blocking a read at each byte along the content - The shotgun approach + async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) { + let content_len = content.len(); + for block_at in 0..content_len { + let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await; + assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at); + } + } + + #[tokio::test] + async fn test_read_length_async() { + let content = "foobar"; + all_async_cases(content, content, Decoder::length(content.len() as u64)).await; + } + + #[tokio::test] + async fn test_read_chunked_async() { + let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n"; + let expected = "foobar"; + all_async_cases(content, expected, Decoder::chunked()).await; + } + + #[tokio::test] + async fn test_read_eof_async() { + let content = "foobar"; + all_async_cases(content, content, Decoder::eof()).await; + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_decode_chunked_1kb(b: &mut test::Bencher) { + let rt = new_runtime(); + + const LEN: usize = 1024; + let mut vec = Vec::new(); + vec.extend(format!("{:x}\r\n", LEN).as_bytes()); + vec.extend(&[0; LEN][..]); + vec.extend(b"\r\n"); + let content = Bytes::from(vec); + + b.bytes = LEN as u64; + + b.iter(|| { + let mut decoder = Decoder::chunked(); + rt.block_on(async { + let mut raw = content.clone(); + let chunk = decoder.decode_fut(&mut raw).await.unwrap(); + assert_eq!(chunk.len(), LEN); + }); + }); + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_decode_length_1kb(b: &mut test::Bencher) { + let rt = new_runtime(); + + const LEN: usize = 1024; + let content = Bytes::from(&[0; LEN][..]); + b.bytes = LEN as u64; + + b.iter(|| { + let mut decoder = Decoder::length(LEN as u64); + rt.block_on(async { + let mut raw = content.clone(); + let chunk = decoder.decode_fut(&mut raw).await.unwrap(); + assert_eq!(chunk.len(), LEN); + }); + }); + } + + #[cfg(feature = "nightly")] + fn new_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("rt build") + } +} diff --git a/third_party/rust/hyper/src/proto/h1/dispatch.rs b/third_party/rust/hyper/src/proto/h1/dispatch.rs new file mode 100644 index 0000000000..677131bfdd --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/dispatch.rs @@ -0,0 +1,750 @@ +use std::error::Error as StdError; + +use bytes::{Buf, Bytes}; +use http::Request; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, trace}; + +use super::{Http1Transaction, Wants}; +use crate::body::{Body, DecodedLength, HttpBody}; +use crate::common::{task, Future, Pin, Poll, Unpin}; +use crate::proto::{ + BodyLength, Conn, Dispatched, MessageHead, RequestHead, +}; +use crate::upgrade::OnUpgrade; + +pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> { + conn: Conn<I, Bs::Data, T>, + dispatch: D, + body_tx: Option<crate::body::Sender>, + body_rx: Pin<Box<Option<Bs>>>, + is_closing: bool, +} + +pub(crate) trait Dispatch { + type PollItem; + type PollBody; + type PollError; + type RecvItem; + fn poll_msg( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>; + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>; + fn should_poll(&self) -> bool; +} + +cfg_server! { + use crate::service::HttpService; + + pub(crate) struct Server<S: HttpService<B>, B> { + in_flight: Pin<Box<Option<S::Future>>>, + pub(crate) service: S, + } +} + +cfg_client! { + pin_project_lite::pin_project! { + pub(crate) struct Client<B> { + callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>, + #[pin] + rx: ClientRx<B>, + rx_closed: bool, + } + } + + type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>; +} + +impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> +where + D: Dispatch< + PollItem = MessageHead<T::Outgoing>, + PollBody = Bs, + RecvItem = MessageHead<T::Incoming>, + > + Unpin, + D::PollError: Into<Box<dyn StdError + Send + Sync>>, + I: AsyncRead + AsyncWrite + Unpin, + T: Http1Transaction + Unpin, + Bs: HttpBody + 'static, + Bs::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { + Dispatcher { + conn, + dispatch, + body_tx: None, + body_rx: Box::pin(None), + is_closing: false, + } + } + + #[cfg(feature = "server")] + pub(crate) fn disable_keep_alive(&mut self) { + self.conn.disable_keep_alive(); + if self.conn.is_write_closed() { + self.close(); + } + } + + pub(crate) fn into_inner(self) -> (I, Bytes, D) { + let (io, buf) = self.conn.into_inner(); + (io, buf, self.dispatch) + } + + /// Run this dispatcher until HTTP says this connection is done, + /// but don't call `AsyncWrite::shutdown` on the underlying IO. + /// + /// This is useful for old-style HTTP upgrades, but ignores + /// newer-style upgrade API. + pub(crate) fn poll_without_shutdown( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<crate::Result<()>> + where + Self: Unpin, + { + Pin::new(self).poll_catch(cx, false).map_ok(|ds| { + if let Dispatched::Upgrade(pending) = ds { + pending.manual(); + } + }) + } + + fn poll_catch( + &mut self, + cx: &mut task::Context<'_>, + should_shutdown: bool, + ) -> Poll<crate::Result<Dispatched>> { + Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { + // An error means we're shutting down either way. + // We just try to give the error to the user, + // and close the connection with an Ok. If we + // cannot give it to the user, then return the Err. + self.dispatch.recv_msg(Err(e))?; + Ok(Dispatched::Shutdown) + })) + } + + fn poll_inner( + &mut self, + cx: &mut task::Context<'_>, + should_shutdown: bool, + ) -> Poll<crate::Result<Dispatched>> { + T::update_date(); + + ready!(self.poll_loop(cx))?; + + if self.is_done() { + if let Some(pending) = self.conn.pending_upgrade() { + self.conn.take_error()?; + return Poll::Ready(Ok(Dispatched::Upgrade(pending))); + } else if should_shutdown { + ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?; + } + self.conn.take_error()?; + Poll::Ready(Ok(Dispatched::Shutdown)) + } else { + Poll::Pending + } + } + + fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + // Limit the looping on this connection, in case it is ready far too + // often, so that other futures don't starve. + // + // 16 was chosen arbitrarily, as that is number of pipelined requests + // benchmarks often use. Perhaps it should be a config option instead. + for _ in 0..16 { + let _ = self.poll_read(cx)?; + let _ = self.poll_write(cx)?; + let _ = self.poll_flush(cx)?; + + // This could happen if reading paused before blocking on IO, + // such as getting to the end of a framed message, but then + // writing/flushing set the state back to Init. In that case, + // if the read buffer still had bytes, we'd want to try poll_read + // again, or else we wouldn't ever be woken up again. + // + // Using this instead of task::current() and notify() inside + // the Conn is noticeably faster in pipelined benchmarks. + if !self.conn.wants_read_again() { + //break; + return Poll::Ready(Ok(())); + } + } + + trace!("poll_loop yielding (self = {:p})", self); + + task::yield_now(cx).map(|never| match never {}) + } + + fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + loop { + if self.is_closing { + return Poll::Ready(Ok(())); + } else if self.conn.can_read_head() { + ready!(self.poll_read_head(cx))?; + } else if let Some(mut body) = self.body_tx.take() { + if self.conn.can_read_body() { + match body.poll_ready(cx) { + Poll::Ready(Ok(())) => (), + Poll::Pending => { + self.body_tx = Some(body); + return Poll::Pending; + } + Poll::Ready(Err(_canceled)) => { + // user doesn't care about the body + // so we should stop reading + trace!("body receiver dropped before eof, draining or closing"); + self.conn.poll_drain_or_close_read(cx); + continue; + } + } + match self.conn.poll_read_body(cx) { + Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) { + Ok(()) => { + self.body_tx = Some(body); + } + Err(_canceled) => { + if self.conn.can_read_body() { + trace!("body receiver dropped before eof, closing"); + self.conn.close_read(); + } + } + }, + Poll::Ready(None) => { + // just drop, the body will close automatically + } + Poll::Pending => { + self.body_tx = Some(body); + return Poll::Pending; + } + Poll::Ready(Some(Err(e))) => { + body.send_error(crate::Error::new_body(e)); + } + } + } else { + // just drop, the body will close automatically + } + } else { + return self.conn.poll_read_keep_alive(cx); + } + } + } + + fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + // can dispatch receive, or does it still care about, an incoming message? + match ready!(self.dispatch.poll_ready(cx)) { + Ok(()) => (), + Err(()) => { + trace!("dispatch no longer receiving messages"); + self.close(); + return Poll::Ready(Ok(())); + } + } + // dispatch is ready for a message, try to read one + match ready!(self.conn.poll_read_head(cx)) { + Some(Ok((mut head, body_len, wants))) => { + let body = match body_len { + DecodedLength::ZERO => Body::empty(), + other => { + let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); + self.body_tx = Some(tx); + rx + } + }; + if wants.contains(Wants::UPGRADE) { + let upgrade = self.conn.on_upgrade(); + debug_assert!(!upgrade.is_none(), "empty upgrade"); + debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set"); + head.extensions.insert(upgrade); + } + self.dispatch.recv_msg(Ok((head, body)))?; + Poll::Ready(Ok(())) + } + Some(Err(err)) => { + debug!("read_head error: {}", err); + self.dispatch.recv_msg(Err(err))?; + // if here, the dispatcher gave the user the error + // somewhere else. we still need to shutdown, but + // not as a second error. + self.close(); + Poll::Ready(Ok(())) + } + None => { + // read eof, the write side will have been closed too unless + // allow_read_close was set to true, in which case just do + // nothing... + debug_assert!(self.conn.is_read_closed()); + if self.conn.is_write_closed() { + self.close(); + } + Poll::Ready(Ok(())) + } + } + } + + fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + loop { + if self.is_closing { + return Poll::Ready(Ok(())); + } else if self.body_rx.is_none() + && self.conn.can_write_head() + && self.dispatch.should_poll() + { + if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { + let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; + + // Check if the body knows its full data immediately. + // + // If so, we can skip a bit of bookkeeping that streaming + // bodies need to do. + if let Some(full) = crate::body::take_full_data(&mut body) { + self.conn.write_full_msg(head, full); + return Poll::Ready(Ok(())); + } + + let body_type = if body.is_end_stream() { + self.body_rx.set(None); + None + } else { + let btype = body + .size_hint() + .exact() + .map(BodyLength::Known) + .or_else(|| Some(BodyLength::Unknown)); + self.body_rx.set(Some(body)); + btype + }; + self.conn.write_head(head, body_type); + } else { + self.close(); + return Poll::Ready(Ok(())); + } + } else if !self.conn.can_buffer_body() { + ready!(self.poll_flush(cx))?; + } else { + // A new scope is needed :( + if let (Some(mut body), clear_body) = + OptGuard::new(self.body_rx.as_mut()).guard_mut() + { + debug_assert!(!*clear_body, "opt guard defaults to keeping body"); + if !self.conn.can_write_body() { + trace!( + "no more write body allowed, user body is_end_stream = {}", + body.is_end_stream(), + ); + *clear_body = true; + continue; + } + + let item = ready!(body.as_mut().poll_data(cx)); + if let Some(item) = item { + let chunk = item.map_err(|e| { + *clear_body = true; + crate::Error::new_user_body(e) + })?; + let eos = body.is_end_stream(); + if eos { + *clear_body = true; + if chunk.remaining() == 0 { + trace!("discarding empty chunk"); + self.conn.end_body()?; + } else { + self.conn.write_body_and_end(chunk); + } + } else { + if chunk.remaining() == 0 { + trace!("discarding empty chunk"); + continue; + } + self.conn.write_body(chunk); + } + } else { + *clear_body = true; + self.conn.end_body()?; + } + } else { + return Poll::Pending; + } + } + } + } + + fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + self.conn.poll_flush(cx).map_err(|err| { + debug!("error writing: {}", err); + crate::Error::new_body_write(err) + }) + } + + fn close(&mut self) { + self.is_closing = true; + self.conn.close_read(); + self.conn.close_write(); + } + + fn is_done(&self) -> bool { + if self.is_closing { + return true; + } + + let read_done = self.conn.is_read_closed(); + + if !T::should_read_first() && read_done { + // a client that cannot read may was well be done. + true + } else { + let write_done = self.conn.is_write_closed() + || (!self.dispatch.should_poll() && self.body_rx.is_none()); + read_done && write_done + } + } +} + +impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> +where + D: Dispatch< + PollItem = MessageHead<T::Outgoing>, + PollBody = Bs, + RecvItem = MessageHead<T::Incoming>, + > + Unpin, + D::PollError: Into<Box<dyn StdError + Send + Sync>>, + I: AsyncRead + AsyncWrite + Unpin, + T: Http1Transaction + Unpin, + Bs: HttpBody + 'static, + Bs::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<Dispatched>; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.poll_catch(cx, true) + } +} + +// ===== impl OptGuard ===== + +/// A drop guard to allow a mutable borrow of an Option while being able to +/// set whether the `Option` should be cleared on drop. +struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool); + +impl<'a, T> OptGuard<'a, T> { + fn new(pin: Pin<&'a mut Option<T>>) -> Self { + OptGuard(pin, false) + } + + fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) { + (self.0.as_mut().as_pin_mut(), &mut self.1) + } +} + +impl<'a, T> Drop for OptGuard<'a, T> { + fn drop(&mut self) { + if self.1 { + self.0.set(None); + } + } +} + +// ===== impl Server ===== + +cfg_server! { + impl<S, B> Server<S, B> + where + S: HttpService<B>, + { + pub(crate) fn new(service: S) -> Server<S, B> { + Server { + in_flight: Box::pin(None), + service, + } + } + + pub(crate) fn into_service(self) -> S { + self.service + } + } + + // Service is never pinned + impl<S: HttpService<B>, B> Unpin for Server<S, B> {} + + impl<S, Bs> Dispatch for Server<S, Body> + where + S: HttpService<Body, ResBody = Bs>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + Bs: HttpBody, + { + type PollItem = MessageHead<http::StatusCode>; + type PollBody = Bs; + type PollError = S::Error; + type RecvItem = RequestHead; + + fn poll_msg( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> { + let mut this = self.as_mut(); + let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { + let resp = ready!(fut.as_mut().poll(cx)?); + let (parts, body) = resp.into_parts(); + let head = MessageHead { + version: parts.version, + subject: parts.status, + headers: parts.headers, + extensions: parts.extensions, + }; + Poll::Ready(Some(Ok((head, body)))) + } else { + unreachable!("poll_msg shouldn't be called if no inflight"); + }; + + // Since in_flight finished, remove it + this.in_flight.set(None); + ret + } + + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { + let (msg, body) = msg?; + let mut req = Request::new(body); + *req.method_mut() = msg.subject.0; + *req.uri_mut() = msg.subject.1; + *req.headers_mut() = msg.headers; + *req.version_mut() = msg.version; + *req.extensions_mut() = msg.extensions; + let fut = self.service.call(req); + self.in_flight.set(Some(fut)); + Ok(()) + } + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { + if self.in_flight.is_some() { + Poll::Pending + } else { + self.service.poll_ready(cx).map_err(|_e| { + // FIXME: return error value. + trace!("service closed"); + }) + } + } + + fn should_poll(&self) -> bool { + self.in_flight.is_some() + } + } +} + +// ===== impl Client ===== + +cfg_client! { + impl<B> Client<B> { + pub(crate) fn new(rx: ClientRx<B>) -> Client<B> { + Client { + callback: None, + rx, + rx_closed: false, + } + } + } + + impl<B> Dispatch for Client<B> + where + B: HttpBody, + { + type PollItem = RequestHead; + type PollBody = B; + type PollError = crate::common::Never; + type RecvItem = crate::proto::ResponseHead; + + fn poll_msg( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> { + let mut this = self.as_mut(); + debug_assert!(!this.rx_closed); + match this.rx.poll_recv(cx) { + Poll::Ready(Some((req, mut cb))) => { + // check that future hasn't been canceled already + match cb.poll_canceled(cx) { + Poll::Ready(()) => { + trace!("request canceled"); + Poll::Ready(None) + } + Poll::Pending => { + let (parts, body) = req.into_parts(); + let head = RequestHead { + version: parts.version, + subject: crate::proto::RequestLine(parts.method, parts.uri), + headers: parts.headers, + extensions: parts.extensions, + }; + this.callback = Some(cb); + Poll::Ready(Some(Ok((head, body)))) + } + } + } + Poll::Ready(None) => { + // user has dropped sender handle + trace!("client tx closed"); + this.rx_closed = true; + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } + + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { + match msg { + Ok((msg, body)) => { + if let Some(cb) = self.callback.take() { + let res = msg.into_response(body); + cb.send(Ok(res)); + Ok(()) + } else { + // Getting here is likely a bug! An error should have happened + // in Conn::require_empty_read() before ever parsing a + // full message! + Err(crate::Error::new_unexpected_message()) + } + } + Err(err) => { + if let Some(cb) = self.callback.take() { + cb.send(Err((err, None))); + Ok(()) + } else if !self.rx_closed { + self.rx.close(); + if let Some((req, cb)) = self.rx.try_recv() { + trace!("canceling queued request with connection error: {}", err); + // in this case, the message was never even started, so it's safe to tell + // the user that the request was completely canceled + cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); + Ok(()) + } else { + Err(err) + } + } else { + Err(err) + } + } + } + } + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { + match self.callback { + Some(ref mut cb) => match cb.poll_canceled(cx) { + Poll::Ready(()) => { + trace!("callback receiver has dropped"); + Poll::Ready(Err(())) + } + Poll::Pending => Poll::Ready(Ok(())), + }, + None => Poll::Ready(Err(())), + } + } + + fn should_poll(&self) -> bool { + self.callback.is_none() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::h1::ClientTransaction; + use std::time::Duration; + + #[test] + fn client_read_bytes_before_writing_request() { + let _ = pretty_env_logger::try_init(); + + tokio_test::task::spawn(()).enter(|cx, _| { + let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle(); + + // Block at 0 for now, but we will release this response before + // the request is ready to write later... + let (mut tx, rx) = crate::client::dispatch::channel(); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); + let mut dispatcher = Dispatcher::new(Client::new(rx), conn); + + // First poll is needed to allow tx to send... + assert!(Pin::new(&mut dispatcher).poll(cx).is_pending()); + + // Unblock our IO, which has a response before we've sent request! + // + handle.read(b"HTTP/1.1 200 OK\r\n\r\n"); + + let mut res_rx = tx + .try_send(crate::Request::new(crate::Body::empty())) + .unwrap(); + + tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx)); + let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx)) + .expect_err("callback should send error"); + + match (err.0.kind(), err.1) { + (&crate::error::Kind::Canceled, Some(_)) => (), + other => panic!("expected Canceled, got {:?}", other), + } + }); + } + + #[tokio::test] + async fn client_flushing_is_not_ready_for_next_request() { + let _ = pretty_env_logger::try_init(); + + let (io, _handle) = tokio_test::io::Builder::new() + .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n") + .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n") + .wait(std::time::Duration::from_secs(2)) + .build_with_handle(); + + let (mut tx, rx) = crate::client::dispatch::channel(); + let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); + conn.set_write_strategy_queue(); + + let dispatcher = Dispatcher::new(Client::new(rx), conn); + let _dispatcher = tokio::spawn(async move { dispatcher.await }); + + let req = crate::Request::builder() + .method("POST") + .body(crate::Body::from("reee")) + .unwrap(); + + let res = tx.try_send(req).unwrap().await.expect("response"); + drop(res); + + assert!(!tx.is_ready()); + } + + #[tokio::test] + async fn body_empty_chunks_ignored() { + let _ = pretty_env_logger::try_init(); + + let io = tokio_test::io::Builder::new() + // no reading or writing, just be blocked for the test... + .wait(Duration::from_secs(5)) + .build(); + + let (mut tx, rx) = crate::client::dispatch::channel(); + let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); + let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); + + // First poll is needed to allow tx to send... + assert!(dispatcher.poll().is_pending()); + + let body = { + let (mut tx, body) = crate::Body::channel(); + tx.try_send_data("".into()).unwrap(); + body + }; + + let _res_rx = tx.try_send(crate::Request::new(body)).unwrap(); + + // Ensure conn.write_body wasn't called with the empty chunk. + // If it is, it will trigger an assertion. + assert!(dispatcher.poll().is_pending()); + } +} diff --git a/third_party/rust/hyper/src/proto/h1/encode.rs b/third_party/rust/hyper/src/proto/h1/encode.rs new file mode 100644 index 0000000000..f0aa261a4f --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/encode.rs @@ -0,0 +1,439 @@ +use std::fmt; +use std::io::IoSlice; + +use bytes::buf::{Chain, Take}; +use bytes::Buf; +use tracing::trace; + +use super::io::WriteBuf; + +type StaticBuf = &'static [u8]; + +/// Encoders to handle different Transfer-Encodings. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct Encoder { + kind: Kind, + is_last: bool, +} + +#[derive(Debug)] +pub(crate) struct EncodedBuf<B> { + kind: BufKind<B>, +} + +#[derive(Debug)] +pub(crate) struct NotEof(u64); + +#[derive(Debug, PartialEq, Clone)] +enum Kind { + /// An Encoder for when Transfer-Encoding includes `chunked`. + Chunked, + /// An Encoder for when Content-Length is set. + /// + /// Enforces that the body is not longer than the Content-Length header. + Length(u64), + /// An Encoder for when neither Content-Length nor Chunked encoding is set. + /// + /// This is mostly only used with HTTP/1.0 with a length. This kind requires + /// the connection to be closed when the body is finished. + #[cfg(feature = "server")] + CloseDelimited, +} + +#[derive(Debug)] +enum BufKind<B> { + Exact(B), + Limited(Take<B>), + Chunked(Chain<Chain<ChunkSize, B>, StaticBuf>), + ChunkedEnd(StaticBuf), +} + +impl Encoder { + fn new(kind: Kind) -> Encoder { + Encoder { + kind, + is_last: false, + } + } + pub(crate) fn chunked() -> Encoder { + Encoder::new(Kind::Chunked) + } + + pub(crate) fn length(len: u64) -> Encoder { + Encoder::new(Kind::Length(len)) + } + + #[cfg(feature = "server")] + pub(crate) fn close_delimited() -> Encoder { + Encoder::new(Kind::CloseDelimited) + } + + pub(crate) fn is_eof(&self) -> bool { + matches!(self.kind, Kind::Length(0)) + } + + #[cfg(feature = "server")] + pub(crate) fn set_last(mut self, is_last: bool) -> Self { + self.is_last = is_last; + self + } + + pub(crate) fn is_last(&self) -> bool { + self.is_last + } + + pub(crate) fn is_close_delimited(&self) -> bool { + match self.kind { + #[cfg(feature = "server")] + Kind::CloseDelimited => true, + _ => false, + } + } + + pub(crate) fn end<B>(&self) -> Result<Option<EncodedBuf<B>>, NotEof> { + match self.kind { + Kind::Length(0) => Ok(None), + Kind::Chunked => Ok(Some(EncodedBuf { + kind: BufKind::ChunkedEnd(b"0\r\n\r\n"), + })), + #[cfg(feature = "server")] + Kind::CloseDelimited => Ok(None), + Kind::Length(n) => Err(NotEof(n)), + } + } + + pub(crate) fn encode<B>(&mut self, msg: B) -> EncodedBuf<B> + where + B: Buf, + { + let len = msg.remaining(); + debug_assert!(len > 0, "encode() called with empty buf"); + + let kind = match self.kind { + Kind::Chunked => { + trace!("encoding chunked {}B", len); + let buf = ChunkSize::new(len) + .chain(msg) + .chain(b"\r\n" as &'static [u8]); + BufKind::Chunked(buf) + } + Kind::Length(ref mut remaining) => { + trace!("sized write, len = {}", len); + if len as u64 > *remaining { + let limit = *remaining as usize; + *remaining = 0; + BufKind::Limited(msg.take(limit)) + } else { + *remaining -= len as u64; + BufKind::Exact(msg) + } + } + #[cfg(feature = "server")] + Kind::CloseDelimited => { + trace!("close delimited write {}B", len); + BufKind::Exact(msg) + } + }; + EncodedBuf { kind } + } + + pub(super) fn encode_and_end<B>(&self, msg: B, dst: &mut WriteBuf<EncodedBuf<B>>) -> bool + where + B: Buf, + { + let len = msg.remaining(); + debug_assert!(len > 0, "encode() called with empty buf"); + + match self.kind { + Kind::Chunked => { + trace!("encoding chunked {}B", len); + let buf = ChunkSize::new(len) + .chain(msg) + .chain(b"\r\n0\r\n\r\n" as &'static [u8]); + dst.buffer(buf); + !self.is_last + } + Kind::Length(remaining) => { + use std::cmp::Ordering; + + trace!("sized write, len = {}", len); + match (len as u64).cmp(&remaining) { + Ordering::Equal => { + dst.buffer(msg); + !self.is_last + } + Ordering::Greater => { + dst.buffer(msg.take(remaining as usize)); + !self.is_last + } + Ordering::Less => { + dst.buffer(msg); + false + } + } + } + #[cfg(feature = "server")] + Kind::CloseDelimited => { + trace!("close delimited write {}B", len); + dst.buffer(msg); + false + } + } + } + + /// Encodes the full body, without verifying the remaining length matches. + /// + /// This is used in conjunction with HttpBody::__hyper_full_data(), which + /// means we can trust that the buf has the correct size (the buf itself + /// was checked to make the headers). + pub(super) fn danger_full_buf<B>(self, msg: B, dst: &mut WriteBuf<EncodedBuf<B>>) + where + B: Buf, + { + debug_assert!(msg.remaining() > 0, "encode() called with empty buf"); + debug_assert!( + match self.kind { + Kind::Length(len) => len == msg.remaining() as u64, + _ => true, + }, + "danger_full_buf length mismatches" + ); + + match self.kind { + Kind::Chunked => { + let len = msg.remaining(); + trace!("encoding chunked {}B", len); + let buf = ChunkSize::new(len) + .chain(msg) + .chain(b"\r\n0\r\n\r\n" as &'static [u8]); + dst.buffer(buf); + } + _ => { + dst.buffer(msg); + } + } + } +} + +impl<B> Buf for EncodedBuf<B> +where + B: Buf, +{ + #[inline] + fn remaining(&self) -> usize { + match self.kind { + BufKind::Exact(ref b) => b.remaining(), + BufKind::Limited(ref b) => b.remaining(), + BufKind::Chunked(ref b) => b.remaining(), + BufKind::ChunkedEnd(ref b) => b.remaining(), + } + } + + #[inline] + fn chunk(&self) -> &[u8] { + match self.kind { + BufKind::Exact(ref b) => b.chunk(), + BufKind::Limited(ref b) => b.chunk(), + BufKind::Chunked(ref b) => b.chunk(), + BufKind::ChunkedEnd(ref b) => b.chunk(), + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match self.kind { + BufKind::Exact(ref mut b) => b.advance(cnt), + BufKind::Limited(ref mut b) => b.advance(cnt), + BufKind::Chunked(ref mut b) => b.advance(cnt), + BufKind::ChunkedEnd(ref mut b) => b.advance(cnt), + } + } + + #[inline] + fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { + match self.kind { + BufKind::Exact(ref b) => b.chunks_vectored(dst), + BufKind::Limited(ref b) => b.chunks_vectored(dst), + BufKind::Chunked(ref b) => b.chunks_vectored(dst), + BufKind::ChunkedEnd(ref b) => b.chunks_vectored(dst), + } + } +} + +#[cfg(target_pointer_width = "32")] +const USIZE_BYTES: usize = 4; + +#[cfg(target_pointer_width = "64")] +const USIZE_BYTES: usize = 8; + +// each byte will become 2 hex +const CHUNK_SIZE_MAX_BYTES: usize = USIZE_BYTES * 2; + +#[derive(Clone, Copy)] +struct ChunkSize { + bytes: [u8; CHUNK_SIZE_MAX_BYTES + 2], + pos: u8, + len: u8, +} + +impl ChunkSize { + fn new(len: usize) -> ChunkSize { + use std::fmt::Write; + let mut size = ChunkSize { + bytes: [0; CHUNK_SIZE_MAX_BYTES + 2], + pos: 0, + len: 0, + }; + write!(&mut size, "{:X}\r\n", len).expect("CHUNK_SIZE_MAX_BYTES should fit any usize"); + size + } +} + +impl Buf for ChunkSize { + #[inline] + fn remaining(&self) -> usize { + (self.len - self.pos).into() + } + + #[inline] + fn chunk(&self) -> &[u8] { + &self.bytes[self.pos.into()..self.len.into()] + } + + #[inline] + fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.remaining()); + self.pos += cnt as u8; // just asserted cnt fits in u8 + } +} + +impl fmt::Debug for ChunkSize { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ChunkSize") + .field("bytes", &&self.bytes[..self.len.into()]) + .field("pos", &self.pos) + .finish() + } +} + +impl fmt::Write for ChunkSize { + fn write_str(&mut self, num: &str) -> fmt::Result { + use std::io::Write; + (&mut self.bytes[self.len.into()..]) + .write_all(num.as_bytes()) + .expect("&mut [u8].write() cannot error"); + self.len += num.len() as u8; // safe because bytes is never bigger than 256 + Ok(()) + } +} + +impl<B: Buf> From<B> for EncodedBuf<B> { + fn from(buf: B) -> Self { + EncodedBuf { + kind: BufKind::Exact(buf), + } + } +} + +impl<B: Buf> From<Take<B>> for EncodedBuf<B> { + fn from(buf: Take<B>) -> Self { + EncodedBuf { + kind: BufKind::Limited(buf), + } + } +} + +impl<B: Buf> From<Chain<Chain<ChunkSize, B>, StaticBuf>> for EncodedBuf<B> { + fn from(buf: Chain<Chain<ChunkSize, B>, StaticBuf>) -> Self { + EncodedBuf { + kind: BufKind::Chunked(buf), + } + } +} + +impl fmt::Display for NotEof { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "early end, expected {} more bytes", self.0) + } +} + +impl std::error::Error for NotEof {} + +#[cfg(test)] +mod tests { + use bytes::BufMut; + + use super::super::io::Cursor; + use super::Encoder; + + #[test] + fn chunked() { + let mut encoder = Encoder::chunked(); + let mut dst = Vec::new(); + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + assert_eq!(dst, b"7\r\nfoo bar\r\n"); + + let msg2 = b"baz quux herp".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst, b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n"); + + let end = encoder.end::<Cursor<Vec<u8>>>().unwrap().unwrap(); + dst.put(end); + + assert_eq!( + dst, + b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n".as_ref() + ); + } + + #[test] + fn length() { + let max_len = 8; + let mut encoder = Encoder::length(max_len as u64); + let mut dst = Vec::new(); + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + + assert_eq!(dst, b"foo bar"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap_err(); + + let msg2 = b"baz".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst.len(), max_len); + assert_eq!(dst, b"foo barb"); + assert!(encoder.is_eof()); + assert!(encoder.end::<()>().unwrap().is_none()); + } + + #[test] + fn eof() { + let mut encoder = Encoder::close_delimited(); + let mut dst = Vec::new(); + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + + assert_eq!(dst, b"foo bar"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap(); + + let msg2 = b"baz".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst, b"foo barbaz"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap(); + } +} diff --git a/third_party/rust/hyper/src/proto/h1/io.rs b/third_party/rust/hyper/src/proto/h1/io.rs new file mode 100644 index 0000000000..1d251e2c84 --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/io.rs @@ -0,0 +1,1002 @@ +use std::cmp; +use std::fmt; +#[cfg(all(feature = "server", feature = "runtime"))] +use std::future::Future; +use std::io::{self, IoSlice}; +use std::marker::Unpin; +use std::mem::MaybeUninit; +#[cfg(all(feature = "server", feature = "runtime"))] +use std::time::Duration; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +#[cfg(all(feature = "server", feature = "runtime"))] +use tokio::time::Instant; +use tracing::{debug, trace}; + +use super::{Http1Transaction, ParseContext, ParsedMessage}; +use crate::common::buf::BufList; +use crate::common::{task, Pin, Poll}; + +/// The initial buffer size allocated before trying to read from IO. +pub(crate) const INIT_BUFFER_SIZE: usize = 8192; + +/// The minimum value that can be set to max buffer size. +pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE; + +/// The default maximum read buffer size. If the buffer gets this big and +/// a message is still not complete, a `TooLarge` error is triggered. +// Note: if this changes, update server::conn::Http::max_buf_size docs. +pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; + +/// The maximum number of distinct `Buf`s to hold in a list before requiring +/// a flush. Only affects when the buffer strategy is to queue buffers. +/// +/// Note that a flush can happen before reaching the maximum. This simply +/// forces a flush if the queue gets this big. +const MAX_BUF_LIST_BUFFERS: usize = 16; + +pub(crate) struct Buffered<T, B> { + flush_pipeline: bool, + io: T, + read_blocked: bool, + read_buf: BytesMut, + read_buf_strategy: ReadStrategy, + write_buf: WriteBuf<B>, +} + +impl<T, B> fmt::Debug for Buffered<T, B> +where + B: Buf, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Buffered") + .field("read_buf", &self.read_buf) + .field("write_buf", &self.write_buf) + .finish() + } +} + +impl<T, B> Buffered<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf, +{ + pub(crate) fn new(io: T) -> Buffered<T, B> { + let strategy = if io.is_write_vectored() { + WriteStrategy::Queue + } else { + WriteStrategy::Flatten + }; + let write_buf = WriteBuf::new(strategy); + Buffered { + flush_pipeline: false, + io, + read_blocked: false, + read_buf: BytesMut::with_capacity(0), + read_buf_strategy: ReadStrategy::default(), + write_buf, + } + } + + #[cfg(feature = "server")] + pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) { + debug_assert!(!self.write_buf.has_remaining()); + self.flush_pipeline = enabled; + if enabled { + self.set_write_strategy_flatten(); + } + } + + pub(crate) fn set_max_buf_size(&mut self, max: usize) { + assert!( + max >= MINIMUM_MAX_BUFFER_SIZE, + "The max_buf_size cannot be smaller than {}.", + MINIMUM_MAX_BUFFER_SIZE, + ); + self.read_buf_strategy = ReadStrategy::with_max(max); + self.write_buf.max_buf_size = max; + } + + #[cfg(feature = "client")] + pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) { + self.read_buf_strategy = ReadStrategy::Exact(sz); + } + + pub(crate) fn set_write_strategy_flatten(&mut self) { + // this should always be called only at construction time, + // so this assert is here to catch myself + debug_assert!(self.write_buf.queue.bufs_cnt() == 0); + self.write_buf.set_strategy(WriteStrategy::Flatten); + } + + pub(crate) fn set_write_strategy_queue(&mut self) { + // this should always be called only at construction time, + // so this assert is here to catch myself + debug_assert!(self.write_buf.queue.bufs_cnt() == 0); + self.write_buf.set_strategy(WriteStrategy::Queue); + } + + pub(crate) fn read_buf(&self) -> &[u8] { + self.read_buf.as_ref() + } + + #[cfg(test)] + #[cfg(feature = "nightly")] + pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut { + &mut self.read_buf + } + + /// Return the "allocated" available space, not the potential space + /// that could be allocated in the future. + fn read_buf_remaining_mut(&self) -> usize { + self.read_buf.capacity() - self.read_buf.len() + } + + /// Return whether we can append to the headers buffer. + /// + /// Reasons we can't: + /// - The write buf is in queue mode, and some of the past body is still + /// needing to be flushed. + pub(crate) fn can_headers_buf(&self) -> bool { + !self.write_buf.queue.has_remaining() + } + + pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> { + let buf = self.write_buf.headers_mut(); + &mut buf.bytes + } + + pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> { + &mut self.write_buf + } + + pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) { + self.write_buf.buffer(buf) + } + + pub(crate) fn can_buffer(&self) -> bool { + self.flush_pipeline || self.write_buf.can_buffer() + } + + pub(crate) fn consume_leading_lines(&mut self) { + if !self.read_buf.is_empty() { + let mut i = 0; + while i < self.read_buf.len() { + match self.read_buf[i] { + b'\r' | b'\n' => i += 1, + _ => break, + } + } + self.read_buf.advance(i); + } + } + + pub(super) fn parse<S>( + &mut self, + cx: &mut task::Context<'_>, + parse_ctx: ParseContext<'_>, + ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>> + where + S: Http1Transaction, + { + loop { + match super::role::parse_headers::<S>( + &mut self.read_buf, + ParseContext { + cached_headers: parse_ctx.cached_headers, + req_method: parse_ctx.req_method, + h1_parser_config: parse_ctx.h1_parser_config.clone(), + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout: parse_ctx.h1_header_read_timeout, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_fut: parse_ctx.h1_header_read_timeout_fut, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_running: parse_ctx.h1_header_read_timeout_running, + preserve_header_case: parse_ctx.preserve_header_case, + #[cfg(feature = "ffi")] + preserve_header_order: parse_ctx.preserve_header_order, + h09_responses: parse_ctx.h09_responses, + #[cfg(feature = "ffi")] + on_informational: parse_ctx.on_informational, + #[cfg(feature = "ffi")] + raw_headers: parse_ctx.raw_headers, + }, + )? { + Some(msg) => { + debug!("parsed {} headers", msg.head.headers.len()); + + #[cfg(all(feature = "server", feature = "runtime"))] + { + *parse_ctx.h1_header_read_timeout_running = false; + + if let Some(h1_header_read_timeout_fut) = + parse_ctx.h1_header_read_timeout_fut + { + // Reset the timer in order to avoid woken up when the timeout finishes + h1_header_read_timeout_fut + .as_mut() + .reset(Instant::now() + Duration::from_secs(30 * 24 * 60 * 60)); + } + } + return Poll::Ready(Ok(msg)); + } + None => { + let max = self.read_buf_strategy.max(); + if self.read_buf.len() >= max { + debug!("max_buf_size ({}) reached, closing", max); + return Poll::Ready(Err(crate::Error::new_too_large())); + } + + #[cfg(all(feature = "server", feature = "runtime"))] + if *parse_ctx.h1_header_read_timeout_running { + if let Some(h1_header_read_timeout_fut) = + parse_ctx.h1_header_read_timeout_fut + { + if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() { + *parse_ctx.h1_header_read_timeout_running = false; + + tracing::warn!("read header from client timeout"); + return Poll::Ready(Err(crate::Error::new_header_timeout())); + } + } + } + } + } + if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 { + trace!("parse eof"); + return Poll::Ready(Err(crate::Error::new_incomplete())); + } + } + } + + pub(crate) fn poll_read_from_io( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<io::Result<usize>> { + self.read_blocked = false; + let next = self.read_buf_strategy.next(); + if self.read_buf_remaining_mut() < next { + self.read_buf.reserve(next); + } + + let dst = self.read_buf.chunk_mut(); + let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) }; + let mut buf = ReadBuf::uninit(dst); + match Pin::new(&mut self.io).poll_read(cx, &mut buf) { + Poll::Ready(Ok(_)) => { + let n = buf.filled().len(); + trace!("received {} bytes", n); + unsafe { + // Safety: we just read that many bytes into the + // uninitialized part of the buffer, so this is okay. + // @tokio pls give me back `poll_read_buf` thanks + self.read_buf.advance_mut(n); + } + self.read_buf_strategy.record(n); + Poll::Ready(Ok(n)) + } + Poll::Pending => { + self.read_blocked = true; + Poll::Pending + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } + + pub(crate) fn into_inner(self) -> (T, Bytes) { + (self.io, self.read_buf.freeze()) + } + + pub(crate) fn io_mut(&mut self) -> &mut T { + &mut self.io + } + + pub(crate) fn is_read_blocked(&self) -> bool { + self.read_blocked + } + + pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + if self.flush_pipeline && !self.read_buf.is_empty() { + Poll::Ready(Ok(())) + } else if self.write_buf.remaining() == 0 { + Pin::new(&mut self.io).poll_flush(cx) + } else { + if let WriteStrategy::Flatten = self.write_buf.strategy { + return self.poll_flush_flattened(cx); + } + + const MAX_WRITEV_BUFS: usize = 64; + loop { + let n = { + let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS]; + let len = self.write_buf.chunks_vectored(&mut iovs); + ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))? + }; + // TODO(eliza): we have to do this manually because + // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when + // `poll_write_buf` comes back, the manual advance will need to leave! + self.write_buf.advance(n); + debug!("flushed {} bytes", n); + if self.write_buf.remaining() == 0 { + break; + } else if n == 0 { + trace!( + "write returned zero, but {} bytes remaining", + self.write_buf.remaining() + ); + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + Pin::new(&mut self.io).poll_flush(cx) + } + } + + /// Specialized version of `flush` when strategy is Flatten. + /// + /// Since all buffered bytes are flattened into the single headers buffer, + /// that skips some bookkeeping around using multiple buffers. + fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + loop { + let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?; + debug!("flushed {} bytes", n); + self.write_buf.headers.advance(n); + if self.write_buf.headers.remaining() == 0 { + self.write_buf.headers.reset(); + break; + } else if n == 0 { + trace!( + "write returned zero, but {} bytes remaining", + self.write_buf.remaining() + ); + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + Pin::new(&mut self.io).poll_flush(cx) + } + + #[cfg(test)] + fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a { + futures_util::future::poll_fn(move |cx| self.poll_flush(cx)) + } +} + +// The `B` is a `Buf`, we never project a pin to it +impl<T: Unpin, B> Unpin for Buffered<T, B> {} + +// TODO: This trait is old... at least rename to PollBytes or something... +pub(crate) trait MemRead { + fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>; +} + +impl<T, B> MemRead for Buffered<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf, +{ + fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> { + if !self.read_buf.is_empty() { + let n = std::cmp::min(len, self.read_buf.len()); + Poll::Ready(Ok(self.read_buf.split_to(n).freeze())) + } else { + let n = ready!(self.poll_read_from_io(cx))?; + Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze())) + } + } +} + +#[derive(Clone, Copy, Debug)] +enum ReadStrategy { + Adaptive { + decrease_now: bool, + next: usize, + max: usize, + }, + #[cfg(feature = "client")] + Exact(usize), +} + +impl ReadStrategy { + fn with_max(max: usize) -> ReadStrategy { + ReadStrategy::Adaptive { + decrease_now: false, + next: INIT_BUFFER_SIZE, + max, + } + } + + fn next(&self) -> usize { + match *self { + ReadStrategy::Adaptive { next, .. } => next, + #[cfg(feature = "client")] + ReadStrategy::Exact(exact) => exact, + } + } + + fn max(&self) -> usize { + match *self { + ReadStrategy::Adaptive { max, .. } => max, + #[cfg(feature = "client")] + ReadStrategy::Exact(exact) => exact, + } + } + + fn record(&mut self, bytes_read: usize) { + match *self { + ReadStrategy::Adaptive { + ref mut decrease_now, + ref mut next, + max, + .. + } => { + if bytes_read >= *next { + *next = cmp::min(incr_power_of_two(*next), max); + *decrease_now = false; + } else { + let decr_to = prev_power_of_two(*next); + if bytes_read < decr_to { + if *decrease_now { + *next = cmp::max(decr_to, INIT_BUFFER_SIZE); + *decrease_now = false; + } else { + // Decreasing is a two "record" process. + *decrease_now = true; + } + } else { + // A read within the current range should cancel + // a potential decrease, since we just saw proof + // that we still need this size. + *decrease_now = false; + } + } + } + #[cfg(feature = "client")] + ReadStrategy::Exact(_) => (), + } + } +} + +fn incr_power_of_two(n: usize) -> usize { + n.saturating_mul(2) +} + +fn prev_power_of_two(n: usize) -> usize { + // Only way this shift can underflow is if n is less than 4. + // (Which would means `usize::MAX >> 64` and underflowed!) + debug_assert!(n >= 4); + (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1 +} + +impl Default for ReadStrategy { + fn default() -> ReadStrategy { + ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE) + } +} + +#[derive(Clone)] +pub(crate) struct Cursor<T> { + bytes: T, + pos: usize, +} + +impl<T: AsRef<[u8]>> Cursor<T> { + #[inline] + pub(crate) fn new(bytes: T) -> Cursor<T> { + Cursor { bytes, pos: 0 } + } +} + +impl Cursor<Vec<u8>> { + /// If we've advanced the position a bit in this cursor, and wish to + /// extend the underlying vector, we may wish to unshift the "read" bytes + /// off, and move everything else over. + fn maybe_unshift(&mut self, additional: usize) { + if self.pos == 0 { + // nothing to do + return; + } + + if self.bytes.capacity() - self.bytes.len() >= additional { + // there's room! + return; + } + + self.bytes.drain(0..self.pos); + self.pos = 0; + } + + fn reset(&mut self) { + self.pos = 0; + self.bytes.clear(); + } +} + +impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Cursor") + .field("pos", &self.pos) + .field("len", &self.bytes.as_ref().len()) + .finish() + } +} + +impl<T: AsRef<[u8]>> Buf for Cursor<T> { + #[inline] + fn remaining(&self) -> usize { + self.bytes.as_ref().len() - self.pos + } + + #[inline] + fn chunk(&self) -> &[u8] { + &self.bytes.as_ref()[self.pos..] + } + + #[inline] + fn advance(&mut self, cnt: usize) { + debug_assert!(self.pos + cnt <= self.bytes.as_ref().len()); + self.pos += cnt; + } +} + +// an internal buffer to collect writes before flushes +pub(super) struct WriteBuf<B> { + /// Re-usable buffer that holds message headers + headers: Cursor<Vec<u8>>, + max_buf_size: usize, + /// Deque of user buffers if strategy is Queue + queue: BufList<B>, + strategy: WriteStrategy, +} + +impl<B: Buf> WriteBuf<B> { + fn new(strategy: WriteStrategy) -> WriteBuf<B> { + WriteBuf { + headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), + max_buf_size: DEFAULT_MAX_BUFFER_SIZE, + queue: BufList::new(), + strategy, + } + } +} + +impl<B> WriteBuf<B> +where + B: Buf, +{ + fn set_strategy(&mut self, strategy: WriteStrategy) { + self.strategy = strategy; + } + + pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) { + debug_assert!(buf.has_remaining()); + match self.strategy { + WriteStrategy::Flatten => { + let head = self.headers_mut(); + + head.maybe_unshift(buf.remaining()); + trace!( + self.len = head.remaining(), + buf.len = buf.remaining(), + "buffer.flatten" + ); + //perf: This is a little faster than <Vec as BufMut>>::put, + //but accomplishes the same result. + loop { + let adv = { + let slice = buf.chunk(); + if slice.is_empty() { + return; + } + head.bytes.extend_from_slice(slice); + slice.len() + }; + buf.advance(adv); + } + } + WriteStrategy::Queue => { + trace!( + self.len = self.remaining(), + buf.len = buf.remaining(), + "buffer.queue" + ); + self.queue.push(buf.into()); + } + } + } + + fn can_buffer(&self) -> bool { + match self.strategy { + WriteStrategy::Flatten => self.remaining() < self.max_buf_size, + WriteStrategy::Queue => { + self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size + } + } + } + + fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> { + debug_assert!(!self.queue.has_remaining()); + &mut self.headers + } +} + +impl<B: Buf> fmt::Debug for WriteBuf<B> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WriteBuf") + .field("remaining", &self.remaining()) + .field("strategy", &self.strategy) + .finish() + } +} + +impl<B: Buf> Buf for WriteBuf<B> { + #[inline] + fn remaining(&self) -> usize { + self.headers.remaining() + self.queue.remaining() + } + + #[inline] + fn chunk(&self) -> &[u8] { + let headers = self.headers.chunk(); + if !headers.is_empty() { + headers + } else { + self.queue.chunk() + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + let hrem = self.headers.remaining(); + + match hrem.cmp(&cnt) { + cmp::Ordering::Equal => self.headers.reset(), + cmp::Ordering::Greater => self.headers.advance(cnt), + cmp::Ordering::Less => { + let qcnt = cnt - hrem; + self.headers.reset(); + self.queue.advance(qcnt); + } + } + } + + #[inline] + fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { + let n = self.headers.chunks_vectored(dst); + self.queue.chunks_vectored(&mut dst[n..]) + n + } +} + +#[derive(Debug)] +enum WriteStrategy { + Flatten, + Queue, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + use tokio_test::io::Builder as Mock; + + // #[cfg(feature = "nightly")] + // use test::Bencher; + + /* + impl<T: Read> MemRead for AsyncIo<T> { + fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> { + let mut v = vec![0; len]; + let n = try_nb!(self.read(v.as_mut_slice())); + Ok(Async::Ready(BytesMut::from(&v[..n]).freeze())) + } + } + */ + + #[tokio::test] + #[ignore] + async fn iobuf_write_empty_slice() { + // TODO(eliza): can i have writev back pls T_T + // // First, let's just check that the Mock would normally return an + // // error on an unexpected write, even if the buffer is empty... + // let mut mock = Mock::new().build(); + // futures_util::future::poll_fn(|cx| { + // Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[])) + // }) + // .await + // .expect_err("should be a broken pipe"); + + // // underlying io will return the logic error upon write, + // // so we are testing that the io_buf does not trigger a write + // // when there is nothing to flush + // let mock = Mock::new().build(); + // let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + // io_buf.flush().await.expect("should short-circuit flush"); + } + + #[tokio::test] + async fn parse_reads_until_blocked() { + use crate::proto::h1::ClientTransaction; + + let _ = pretty_env_logger::try_init(); + let mock = Mock::new() + // Split over multiple reads will read all of it + .read(b"HTTP/1.1 200 OK\r\n") + .read(b"Server: hyper\r\n") + // missing last line ending + .wait(Duration::from_secs(1)) + .build(); + + let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + + // We expect a `parse` to be not ready, and so can't await it directly. + // Rather, this `poll_fn` will wrap the `Poll` result. + futures_util::future::poll_fn(|cx| { + let parse_ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + assert!(buffered + .parse::<ClientTransaction>(cx, parse_ctx) + .is_pending()); + Poll::Ready(()) + }) + .await; + + assert_eq!( + buffered.read_buf, + b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..] + ); + } + + #[test] + fn read_strategy_adaptive_increments() { + let mut strategy = ReadStrategy::default(); + assert_eq!(strategy.next(), 8192); + + // Grows if record == next + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(16384); + assert_eq!(strategy.next(), 32768); + + // Enormous records still increment at same rate + strategy.record(::std::usize::MAX); + assert_eq!(strategy.next(), 65536); + + let max = strategy.max(); + while strategy.next() < max { + strategy.record(max); + } + + assert_eq!(strategy.next(), max, "never goes over max"); + strategy.record(max + 1); + assert_eq!(strategy.next(), max, "never goes over max"); + } + + #[test] + fn read_strategy_adaptive_decrements() { + let mut strategy = ReadStrategy::default(); + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(1); + assert_eq!( + strategy.next(), + 16384, + "first smaller record doesn't decrement yet" + ); + strategy.record(8192); + assert_eq!(strategy.next(), 16384, "record was with range"); + + strategy.record(1); + assert_eq!( + strategy.next(), + 16384, + "in-range record should make this the 'first' again" + ); + + strategy.record(1); + assert_eq!(strategy.next(), 8192, "second smaller record decrements"); + + strategy.record(1); + assert_eq!(strategy.next(), 8192, "first doesn't decrement"); + strategy.record(1); + assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum"); + } + + #[test] + fn read_strategy_adaptive_stays_the_same() { + let mut strategy = ReadStrategy::default(); + strategy.record(8192); + assert_eq!(strategy.next(), 16384); + + strategy.record(8193); + assert_eq!( + strategy.next(), + 16384, + "first smaller record doesn't decrement yet" + ); + + strategy.record(8193); + assert_eq!( + strategy.next(), + 16384, + "with current step does not decrement" + ); + } + + #[test] + fn read_strategy_adaptive_max_fuzz() { + fn fuzz(max: usize) { + let mut strategy = ReadStrategy::with_max(max); + while strategy.next() < max { + strategy.record(::std::usize::MAX); + } + let mut next = strategy.next(); + while next > 8192 { + strategy.record(1); + strategy.record(1); + next = strategy.next(); + assert!( + next.is_power_of_two(), + "decrement should be powers of two: {} (max = {})", + next, + max, + ); + } + } + + let mut max = 8192; + while max < std::usize::MAX { + fuzz(max); + max = (max / 2).saturating_mul(3); + } + fuzz(::std::usize::MAX); + } + + #[test] + #[should_panic] + #[cfg(debug_assertions)] // needs to trigger a debug_assert + fn write_buf_requires_non_empty_bufs() { + let mock = Mock::new().build(); + let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + + buffered.buffer(Cursor::new(Vec::new())); + } + + /* + TODO: needs tokio_test::io to allow configure write_buf calls + #[test] + fn write_buf_queue() { + let _ = pretty_env_logger::try_init(); + + let mock = AsyncIo::new_buf(vec![], 1024); + let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + + + buffered.headers_buf().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.buffer(Cursor::new(b"it's ".to_vec())); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3); + buffered.flush().unwrap(); + + assert_eq!(buffered.io, b"hello world, it's hyper!"); + assert_eq!(buffered.io.num_writes(), 1); + assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); + } + */ + + #[tokio::test] + async fn write_buf_flatten() { + let _ = pretty_env_logger::try_init(); + + let mock = Mock::new().write(b"hello world, it's hyper!").build(); + + let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + buffered.write_buf.set_strategy(WriteStrategy::Flatten); + + buffered.headers_buf().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.buffer(Cursor::new(b"it's ".to_vec())); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); + + buffered.flush().await.expect("flush"); + } + + #[test] + fn write_buf_flatten_partially_flushed() { + let _ = pretty_env_logger::try_init(); + + let b = |s: &str| Cursor::new(s.as_bytes().to_vec()); + + let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten); + + write_buf.buffer(b("hello ")); + write_buf.buffer(b("world, ")); + + assert_eq!(write_buf.chunk(), b"hello world, "); + + // advance most of the way, but not all + write_buf.advance(11); + + assert_eq!(write_buf.chunk(), b", "); + assert_eq!(write_buf.headers.pos, 11); + assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE); + + // there's still room in the headers buffer, so just push on the end + write_buf.buffer(b("it's hyper!")); + + assert_eq!(write_buf.chunk(), b", it's hyper!"); + assert_eq!(write_buf.headers.pos, 11); + + let rem1 = write_buf.remaining(); + let cap = write_buf.headers.bytes.capacity(); + + // but when this would go over capacity, don't copy the old bytes + write_buf.buffer(Cursor::new(vec![b'X'; cap])); + assert_eq!(write_buf.remaining(), cap + rem1); + assert_eq!(write_buf.headers.pos, 0); + } + + #[tokio::test] + async fn write_buf_queue_disable_auto() { + let _ = pretty_env_logger::try_init(); + + let mock = Mock::new() + .write(b"hello ") + .write(b"world, ") + .write(b"it's ") + .write(b"hyper!") + .build(); + + let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); + buffered.write_buf.set_strategy(WriteStrategy::Queue); + + // we have 4 buffers, and vec IO disabled, but explicitly said + // don't try to auto detect (via setting strategy above) + + buffered.headers_buf().extend(b"hello "); + buffered.buffer(Cursor::new(b"world, ".to_vec())); + buffered.buffer(Cursor::new(b"it's ".to_vec())); + buffered.buffer(Cursor::new(b"hyper!".to_vec())); + assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3); + + buffered.flush().await.expect("flush"); + + assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); + } + + // #[cfg(feature = "nightly")] + // #[bench] + // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { + // let s = "Hello, World!"; + // b.bytes = s.len() as u64; + + // let mut write_buf = WriteBuf::<bytes::Bytes>::new(); + // write_buf.set_strategy(WriteStrategy::Flatten); + // b.iter(|| { + // let chunk = bytes::Bytes::from(s); + // write_buf.buffer(chunk); + // ::test::black_box(&write_buf); + // write_buf.headers.bytes.clear(); + // }) + // } +} diff --git a/third_party/rust/hyper/src/proto/h1/mod.rs b/third_party/rust/hyper/src/proto/h1/mod.rs new file mode 100644 index 0000000000..5a2587a843 --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/mod.rs @@ -0,0 +1,122 @@ +#[cfg(all(feature = "server", feature = "runtime"))] +use std::{pin::Pin, time::Duration}; + +use bytes::BytesMut; +use http::{HeaderMap, Method}; +use httparse::ParserConfig; +#[cfg(all(feature = "server", feature = "runtime"))] +use tokio::time::Sleep; + +use crate::body::DecodedLength; +use crate::proto::{BodyLength, MessageHead}; + +pub(crate) use self::conn::Conn; +pub(crate) use self::decode::Decoder; +pub(crate) use self::dispatch::Dispatcher; +pub(crate) use self::encode::{EncodedBuf, Encoder}; +//TODO: move out of h1::io +pub(crate) use self::io::MINIMUM_MAX_BUFFER_SIZE; + +mod conn; +mod decode; +pub(crate) mod dispatch; +mod encode; +mod io; +mod role; + +cfg_client! { + pub(crate) type ClientTransaction = role::Client; +} + +cfg_server! { + pub(crate) type ServerTransaction = role::Server; +} + +pub(crate) trait Http1Transaction { + type Incoming; + type Outgoing: Default; + const LOG: &'static str; + fn parse(bytes: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult<Self::Incoming>; + fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder>; + + fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>>; + + fn is_client() -> bool { + !Self::is_server() + } + + fn is_server() -> bool { + !Self::is_client() + } + + fn should_error_on_parse_eof() -> bool { + Self::is_client() + } + + fn should_read_first() -> bool { + Self::is_server() + } + + fn update_date() {} +} + +/// Result newtype for Http1Transaction::parse. +pub(crate) type ParseResult<T> = Result<Option<ParsedMessage<T>>, crate::error::Parse>; + +#[derive(Debug)] +pub(crate) struct ParsedMessage<T> { + head: MessageHead<T>, + decode: DecodedLength, + expect_continue: bool, + keep_alive: bool, + wants_upgrade: bool, +} + +pub(crate) struct ParseContext<'a> { + cached_headers: &'a mut Option<HeaderMap>, + req_method: &'a mut Option<Method>, + h1_parser_config: ParserConfig, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout: Option<Duration>, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_fut: &'a mut Option<Pin<Box<Sleep>>>, + #[cfg(all(feature = "server", feature = "runtime"))] + h1_header_read_timeout_running: &'a mut bool, + preserve_header_case: bool, + #[cfg(feature = "ffi")] + preserve_header_order: bool, + h09_responses: bool, + #[cfg(feature = "ffi")] + on_informational: &'a mut Option<crate::ffi::OnInformational>, + #[cfg(feature = "ffi")] + raw_headers: bool, +} + +/// Passed to Http1Transaction::encode +pub(crate) struct Encode<'a, T> { + head: &'a mut MessageHead<T>, + body: Option<BodyLength>, + #[cfg(feature = "server")] + keep_alive: bool, + req_method: &'a mut Option<Method>, + title_case_headers: bool, +} + +/// Extra flags that a request "wants", like expect-continue or upgrades. +#[derive(Clone, Copy, Debug)] +struct Wants(u8); + +impl Wants { + const EMPTY: Wants = Wants(0b00); + const EXPECT: Wants = Wants(0b01); + const UPGRADE: Wants = Wants(0b10); + + #[must_use] + fn add(self, other: Wants) -> Wants { + Wants(self.0 | other.0) + } + + fn contains(&self, other: Wants) -> bool { + (self.0 & other.0) == other.0 + } +} diff --git a/third_party/rust/hyper/src/proto/h1/role.rs b/third_party/rust/hyper/src/proto/h1/role.rs new file mode 100644 index 0000000000..6252207baf --- /dev/null +++ b/third_party/rust/hyper/src/proto/h1/role.rs @@ -0,0 +1,2847 @@ +use std::fmt::{self, Write}; +use std::mem::MaybeUninit; + +use bytes::Bytes; +use bytes::BytesMut; +#[cfg(feature = "server")] +use http::header::ValueIter; +use http::header::{self, Entry, HeaderName, HeaderValue}; +use http::{HeaderMap, Method, StatusCode, Version}; +#[cfg(all(feature = "server", feature = "runtime"))] +use tokio::time::Instant; +use tracing::{debug, error, trace, trace_span, warn}; + +use crate::body::DecodedLength; +#[cfg(feature = "server")] +use crate::common::date; +use crate::error::Parse; +use crate::ext::HeaderCaseMap; +#[cfg(feature = "ffi")] +use crate::ext::OriginalHeaderOrder; +use crate::headers; +use crate::proto::h1::{ + Encode, Encoder, Http1Transaction, ParseContext, ParseResult, ParsedMessage, +}; +use crate::proto::{BodyLength, MessageHead, RequestHead, RequestLine}; + +const MAX_HEADERS: usize = 100; +const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific +#[cfg(feature = "server")] +const MAX_URI_LEN: usize = (u16::MAX - 1) as usize; + +macro_rules! header_name { + ($bytes:expr) => {{ + { + match HeaderName::from_bytes($bytes) { + Ok(name) => name, + Err(e) => maybe_panic!(e), + } + } + }}; +} + +macro_rules! header_value { + ($bytes:expr) => {{ + { + unsafe { HeaderValue::from_maybe_shared_unchecked($bytes) } + } + }}; +} + +macro_rules! maybe_panic { + ($($arg:tt)*) => ({ + let _err = ($($arg)*); + if cfg!(debug_assertions) { + panic!("{:?}", _err); + } else { + error!("Internal Hyper error, please report {:?}", _err); + return Err(Parse::Internal) + } + }) +} + +pub(super) fn parse_headers<T>( + bytes: &mut BytesMut, + ctx: ParseContext<'_>, +) -> ParseResult<T::Incoming> +where + T: Http1Transaction, +{ + // If the buffer is empty, don't bother entering the span, it's just noise. + if bytes.is_empty() { + return Ok(None); + } + + let span = trace_span!("parse_headers"); + let _s = span.enter(); + + #[cfg(all(feature = "server", feature = "runtime"))] + if !*ctx.h1_header_read_timeout_running { + if let Some(h1_header_read_timeout) = ctx.h1_header_read_timeout { + let deadline = Instant::now() + h1_header_read_timeout; + *ctx.h1_header_read_timeout_running = true; + match ctx.h1_header_read_timeout_fut { + Some(h1_header_read_timeout_fut) => { + debug!("resetting h1 header read timeout timer"); + h1_header_read_timeout_fut.as_mut().reset(deadline); + } + None => { + debug!("setting h1 header read timeout timer"); + *ctx.h1_header_read_timeout_fut = + Some(Box::pin(tokio::time::sleep_until(deadline))); + } + } + } + } + + T::parse(bytes, ctx) +} + +pub(super) fn encode_headers<T>( + enc: Encode<'_, T::Outgoing>, + dst: &mut Vec<u8>, +) -> crate::Result<Encoder> +where + T: Http1Transaction, +{ + let span = trace_span!("encode_headers"); + let _s = span.enter(); + T::encode(enc, dst) +} + +// There are 2 main roles, Client and Server. + +#[cfg(feature = "client")] +pub(crate) enum Client {} + +#[cfg(feature = "server")] +pub(crate) enum Server {} + +#[cfg(feature = "server")] +impl Http1Transaction for Server { + type Incoming = RequestLine; + type Outgoing = StatusCode; + const LOG: &'static str = "{role=server}"; + + fn parse(buf: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult<RequestLine> { + debug_assert!(!buf.is_empty(), "parse called with empty buf"); + + let mut keep_alive; + let is_http_11; + let subject; + let version; + let len; + let headers_len; + + // Unsafe: both headers_indices and headers are using uninitialized memory, + // but we *never* read any of it until after httparse has assigned + // values into it. By not zeroing out the stack memory, this saves + // a good ~5% on pipeline benchmarks. + let mut headers_indices: [MaybeUninit<HeaderIndices>; MAX_HEADERS] = unsafe { + // SAFETY: We can go safely from MaybeUninit array to array of MaybeUninit + MaybeUninit::uninit().assume_init() + }; + { + /* SAFETY: it is safe to go from MaybeUninit array to array of MaybeUninit */ + let mut headers: [MaybeUninit<httparse::Header<'_>>; MAX_HEADERS] = + unsafe { MaybeUninit::uninit().assume_init() }; + trace!(bytes = buf.len(), "Request.parse"); + let mut req = httparse::Request::new(&mut []); + let bytes = buf.as_ref(); + match req.parse_with_uninit_headers(bytes, &mut headers) { + Ok(httparse::Status::Complete(parsed_len)) => { + trace!("Request.parse Complete({})", parsed_len); + len = parsed_len; + let uri = req.path.unwrap(); + if uri.len() > MAX_URI_LEN { + return Err(Parse::UriTooLong); + } + subject = RequestLine( + Method::from_bytes(req.method.unwrap().as_bytes())?, + uri.parse()?, + ); + version = if req.version.unwrap() == 1 { + keep_alive = true; + is_http_11 = true; + Version::HTTP_11 + } else { + keep_alive = false; + is_http_11 = false; + Version::HTTP_10 + }; + + record_header_indices(bytes, &req.headers, &mut headers_indices)?; + headers_len = req.headers.len(); + } + Ok(httparse::Status::Partial) => return Ok(None), + Err(err) => { + return Err(match err { + // if invalid Token, try to determine if for method or path + httparse::Error::Token => { + if req.method.is_none() { + Parse::Method + } else { + debug_assert!(req.path.is_none()); + Parse::Uri + } + } + other => other.into(), + }); + } + } + }; + + let slice = buf.split_to(len).freeze(); + + // According to https://tools.ietf.org/html/rfc7230#section-3.3.3 + // 1. (irrelevant to Request) + // 2. (irrelevant to Request) + // 3. Transfer-Encoding: chunked has a chunked body. + // 4. If multiple differing Content-Length headers or invalid, close connection. + // 5. Content-Length header has a sized body. + // 6. Length 0. + // 7. (irrelevant to Request) + + let mut decoder = DecodedLength::ZERO; + let mut expect_continue = false; + let mut con_len = None; + let mut is_te = false; + let mut is_te_chunked = false; + let mut wants_upgrade = subject.0 == Method::CONNECT; + + let mut header_case_map = if ctx.preserve_header_case { + Some(HeaderCaseMap::default()) + } else { + None + }; + + #[cfg(feature = "ffi")] + let mut header_order = if ctx.preserve_header_order { + Some(OriginalHeaderOrder::default()) + } else { + None + }; + + let mut headers = ctx.cached_headers.take().unwrap_or_else(HeaderMap::new); + + headers.reserve(headers_len); + + for header in &headers_indices[..headers_len] { + // SAFETY: array is valid up to `headers_len` + let header = unsafe { &*header.as_ptr() }; + let name = header_name!(&slice[header.name.0..header.name.1]); + let value = header_value!(slice.slice(header.value.0..header.value.1)); + + match name { + header::TRANSFER_ENCODING => { + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + // If Transfer-Encoding header is present, and 'chunked' is + // not the final encoding, and this is a Request, then it is + // malformed. A server should respond with 400 Bad Request. + if !is_http_11 { + debug!("HTTP/1.0 cannot have Transfer-Encoding header"); + return Err(Parse::transfer_encoding_unexpected()); + } + is_te = true; + if headers::is_chunked_(&value) { + is_te_chunked = true; + decoder = DecodedLength::CHUNKED; + } else { + is_te_chunked = false; + } + } + header::CONTENT_LENGTH => { + if is_te { + continue; + } + let len = headers::content_length_parse(&value) + .ok_or_else(Parse::content_length_invalid)?; + if let Some(prev) = con_len { + if prev != len { + debug!( + "multiple Content-Length headers with different values: [{}, {}]", + prev, len, + ); + return Err(Parse::content_length_invalid()); + } + // we don't need to append this secondary length + continue; + } + decoder = DecodedLength::checked_new(len)?; + con_len = Some(len); + } + header::CONNECTION => { + // keep_alive was previously set to default for Version + if keep_alive { + // HTTP/1.1 + keep_alive = !headers::connection_close(&value); + } else { + // HTTP/1.0 + keep_alive = headers::connection_keep_alive(&value); + } + } + header::EXPECT => { + // According to https://datatracker.ietf.org/doc/html/rfc2616#section-14.20 + // Comparison of expectation values is case-insensitive for unquoted tokens + // (including the 100-continue token) + expect_continue = value.as_bytes().eq_ignore_ascii_case(b"100-continue"); + } + header::UPGRADE => { + // Upgrades are only allowed with HTTP/1.1 + wants_upgrade = is_http_11; + } + + _ => (), + } + + if let Some(ref mut header_case_map) = header_case_map { + header_case_map.append(&name, slice.slice(header.name.0..header.name.1)); + } + + #[cfg(feature = "ffi")] + if let Some(ref mut header_order) = header_order { + header_order.append(&name); + } + + headers.append(name, value); + } + + if is_te && !is_te_chunked { + debug!("request with transfer-encoding header, but not chunked, bad request"); + return Err(Parse::transfer_encoding_invalid()); + } + + let mut extensions = http::Extensions::default(); + + if let Some(header_case_map) = header_case_map { + extensions.insert(header_case_map); + } + + #[cfg(feature = "ffi")] + if let Some(header_order) = header_order { + extensions.insert(header_order); + } + + *ctx.req_method = Some(subject.0.clone()); + + Ok(Some(ParsedMessage { + head: MessageHead { + version, + subject, + headers, + extensions, + }, + decode: decoder, + expect_continue, + keep_alive, + wants_upgrade, + })) + } + + fn encode(mut msg: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder> { + trace!( + "Server::encode status={:?}, body={:?}, req_method={:?}", + msg.head.subject, + msg.body, + msg.req_method + ); + + let mut wrote_len = false; + + // hyper currently doesn't support returning 1xx status codes as a Response + // This is because Service only allows returning a single Response, and + // so if you try to reply with a e.g. 100 Continue, you have no way of + // replying with the latter status code response. + let (ret, is_last) = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { + (Ok(()), true) + } else if msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success() { + // Sending content-length or transfer-encoding header on 2xx response + // to CONNECT is forbidden in RFC 7231. + wrote_len = true; + (Ok(()), true) + } else if msg.head.subject.is_informational() { + warn!("response with 1xx status code not supported"); + *msg.head = MessageHead::default(); + msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; + msg.body = None; + (Err(crate::Error::new_user_unsupported_status_code()), true) + } else { + (Ok(()), !msg.keep_alive) + }; + + // In some error cases, we don't know about the invalid message until already + // pushing some bytes onto the `dst`. In those cases, we don't want to send + // the half-pushed message, so rewind to before. + let orig_len = dst.len(); + + let init_cap = 30 + msg.head.headers.len() * AVERAGE_HEADER_SIZE; + dst.reserve(init_cap); + + let custom_reason_phrase = msg.head.extensions.get::<crate::ext::ReasonPhrase>(); + + if msg.head.version == Version::HTTP_11 + && msg.head.subject == StatusCode::OK + && custom_reason_phrase.is_none() + { + extend(dst, b"HTTP/1.1 200 OK\r\n"); + } else { + match msg.head.version { + Version::HTTP_10 => extend(dst, b"HTTP/1.0 "), + Version::HTTP_11 => extend(dst, b"HTTP/1.1 "), + Version::HTTP_2 => { + debug!("response with HTTP2 version coerced to HTTP/1.1"); + extend(dst, b"HTTP/1.1 "); + } + other => panic!("unexpected response version: {:?}", other), + } + + extend(dst, msg.head.subject.as_str().as_bytes()); + extend(dst, b" "); + + if let Some(reason) = custom_reason_phrase { + extend(dst, reason.as_bytes()); + } else { + // a reason MUST be written, as many parsers will expect it. + extend( + dst, + msg.head + .subject + .canonical_reason() + .unwrap_or("<none>") + .as_bytes(), + ); + } + + extend(dst, b"\r\n"); + } + + let orig_headers; + let extensions = std::mem::take(&mut msg.head.extensions); + let orig_headers = match extensions.get::<HeaderCaseMap>() { + None if msg.title_case_headers => { + orig_headers = HeaderCaseMap::default(); + Some(&orig_headers) + } + orig_headers => orig_headers, + }; + let encoder = if let Some(orig_headers) = orig_headers { + Self::encode_headers_with_original_case( + msg, + dst, + is_last, + orig_len, + wrote_len, + orig_headers, + )? + } else { + Self::encode_headers_with_lower_case(msg, dst, is_last, orig_len, wrote_len)? + }; + + ret.map(|()| encoder) + } + + fn on_error(err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> { + use crate::error::Kind; + let status = match *err.kind() { + Kind::Parse(Parse::Method) + | Kind::Parse(Parse::Header(_)) + | Kind::Parse(Parse::Uri) + | Kind::Parse(Parse::Version) => StatusCode::BAD_REQUEST, + Kind::Parse(Parse::TooLarge) => StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, + Kind::Parse(Parse::UriTooLong) => StatusCode::URI_TOO_LONG, + _ => return None, + }; + + debug!("sending automatic response ({}) for parse error", status); + let mut msg = MessageHead::default(); + msg.subject = status; + Some(msg) + } + + fn is_server() -> bool { + true + } + + fn update_date() { + date::update(); + } +} + +#[cfg(feature = "server")] +impl Server { + fn can_have_body(method: &Option<Method>, status: StatusCode) -> bool { + Server::can_chunked(method, status) + } + + fn can_chunked(method: &Option<Method>, status: StatusCode) -> bool { + if method == &Some(Method::HEAD) || method == &Some(Method::CONNECT) && status.is_success() + { + false + } else if status.is_informational() { + false + } else { + match status { + StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED => false, + _ => true, + } + } + } + + fn can_have_content_length(method: &Option<Method>, status: StatusCode) -> bool { + if status.is_informational() || method == &Some(Method::CONNECT) && status.is_success() { + false + } else { + match status { + StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED => false, + _ => true, + } + } + } + + fn can_have_implicit_zero_content_length(method: &Option<Method>, status: StatusCode) -> bool { + Server::can_have_content_length(method, status) && method != &Some(Method::HEAD) + } + + fn encode_headers_with_lower_case( + msg: Encode<'_, StatusCode>, + dst: &mut Vec<u8>, + is_last: bool, + orig_len: usize, + wrote_len: bool, + ) -> crate::Result<Encoder> { + struct LowercaseWriter; + + impl HeaderNameWriter for LowercaseWriter { + #[inline] + fn write_full_header_line( + &mut self, + dst: &mut Vec<u8>, + line: &str, + _: (HeaderName, &str), + ) { + extend(dst, line.as_bytes()) + } + + #[inline] + fn write_header_name_with_colon( + &mut self, + dst: &mut Vec<u8>, + name_with_colon: &str, + _: HeaderName, + ) { + extend(dst, name_with_colon.as_bytes()) + } + + #[inline] + fn write_header_name(&mut self, dst: &mut Vec<u8>, name: &HeaderName) { + extend(dst, name.as_str().as_bytes()) + } + } + + Self::encode_headers(msg, dst, is_last, orig_len, wrote_len, LowercaseWriter) + } + + #[cold] + #[inline(never)] + fn encode_headers_with_original_case( + msg: Encode<'_, StatusCode>, + dst: &mut Vec<u8>, + is_last: bool, + orig_len: usize, + wrote_len: bool, + orig_headers: &HeaderCaseMap, + ) -> crate::Result<Encoder> { + struct OrigCaseWriter<'map> { + map: &'map HeaderCaseMap, + current: Option<(HeaderName, ValueIter<'map, Bytes>)>, + title_case_headers: bool, + } + + impl HeaderNameWriter for OrigCaseWriter<'_> { + #[inline] + fn write_full_header_line( + &mut self, + dst: &mut Vec<u8>, + _: &str, + (name, rest): (HeaderName, &str), + ) { + self.write_header_name(dst, &name); + extend(dst, rest.as_bytes()); + } + + #[inline] + fn write_header_name_with_colon( + &mut self, + dst: &mut Vec<u8>, + _: &str, + name: HeaderName, + ) { + self.write_header_name(dst, &name); + extend(dst, b": "); + } + + #[inline] + fn write_header_name(&mut self, dst: &mut Vec<u8>, name: &HeaderName) { + let Self { + map, + ref mut current, + title_case_headers, + } = *self; + if current.as_ref().map_or(true, |(last, _)| last != name) { + *current = None; + } + let (_, values) = + current.get_or_insert_with(|| (name.clone(), map.get_all_internal(name))); + + if let Some(orig_name) = values.next() { + extend(dst, orig_name); + } else if title_case_headers { + title_case(dst, name.as_str().as_bytes()); + } else { + extend(dst, name.as_str().as_bytes()); + } + } + } + + let header_name_writer = OrigCaseWriter { + map: orig_headers, + current: None, + title_case_headers: msg.title_case_headers, + }; + + Self::encode_headers(msg, dst, is_last, orig_len, wrote_len, header_name_writer) + } + + #[inline] + fn encode_headers<W>( + msg: Encode<'_, StatusCode>, + dst: &mut Vec<u8>, + mut is_last: bool, + orig_len: usize, + mut wrote_len: bool, + mut header_name_writer: W, + ) -> crate::Result<Encoder> + where + W: HeaderNameWriter, + { + // In some error cases, we don't know about the invalid message until already + // pushing some bytes onto the `dst`. In those cases, we don't want to send + // the half-pushed message, so rewind to before. + let rewind = |dst: &mut Vec<u8>| { + dst.truncate(orig_len); + }; + + let mut encoder = Encoder::length(0); + let mut wrote_date = false; + let mut cur_name = None; + let mut is_name_written = false; + let mut must_write_chunked = false; + let mut prev_con_len = None; + + macro_rules! handle_is_name_written { + () => {{ + if is_name_written { + // we need to clean up and write the newline + debug_assert_ne!( + &dst[dst.len() - 2..], + b"\r\n", + "previous header wrote newline but set is_name_written" + ); + + if must_write_chunked { + extend(dst, b", chunked\r\n"); + } else { + extend(dst, b"\r\n"); + } + } + }}; + } + + 'headers: for (opt_name, value) in msg.head.headers.drain() { + if let Some(n) = opt_name { + cur_name = Some(n); + handle_is_name_written!(); + is_name_written = false; + } + let name = cur_name.as_ref().expect("current header name"); + match *name { + header::CONTENT_LENGTH => { + if wrote_len && !is_name_written { + warn!("unexpected content-length found, canceling"); + rewind(dst); + return Err(crate::Error::new_user_header()); + } + match msg.body { + Some(BodyLength::Known(known_len)) => { + // The HttpBody claims to know a length, and + // the headers are already set. For performance + // reasons, we are just going to trust that + // the values match. + // + // In debug builds, we'll assert they are the + // same to help developers find bugs. + #[cfg(debug_assertions)] + { + if let Some(len) = headers::content_length_parse(&value) { + assert!( + len == known_len, + "payload claims content-length of {}, custom content-length header claims {}", + known_len, + len, + ); + } + } + + if !is_name_written { + encoder = Encoder::length(known_len); + header_name_writer.write_header_name_with_colon( + dst, + "content-length: ", + header::CONTENT_LENGTH, + ); + extend(dst, value.as_bytes()); + wrote_len = true; + is_name_written = true; + } + continue 'headers; + } + Some(BodyLength::Unknown) => { + // The HttpBody impl didn't know how long the + // body is, but a length header was included. + // We have to parse the value to return our + // Encoder... + + if let Some(len) = headers::content_length_parse(&value) { + if let Some(prev) = prev_con_len { + if prev != len { + warn!( + "multiple Content-Length values found: [{}, {}]", + prev, len + ); + rewind(dst); + return Err(crate::Error::new_user_header()); + } + debug_assert!(is_name_written); + continue 'headers; + } else { + // we haven't written content-length yet! + encoder = Encoder::length(len); + header_name_writer.write_header_name_with_colon( + dst, + "content-length: ", + header::CONTENT_LENGTH, + ); + extend(dst, value.as_bytes()); + wrote_len = true; + is_name_written = true; + prev_con_len = Some(len); + continue 'headers; + } + } else { + warn!("illegal Content-Length value: {:?}", value); + rewind(dst); + return Err(crate::Error::new_user_header()); + } + } + None => { + // We have no body to actually send, + // but the headers claim a content-length. + // There's only 2 ways this makes sense: + // + // - The header says the length is `0`. + // - This is a response to a `HEAD` request. + if msg.req_method == &Some(Method::HEAD) { + debug_assert_eq!(encoder, Encoder::length(0)); + } else { + if value.as_bytes() != b"0" { + warn!( + "content-length value found, but empty body provided: {:?}", + value + ); + } + continue 'headers; + } + } + } + wrote_len = true; + } + header::TRANSFER_ENCODING => { + if wrote_len && !is_name_written { + warn!("unexpected transfer-encoding found, canceling"); + rewind(dst); + return Err(crate::Error::new_user_header()); + } + // check that we actually can send a chunked body... + if msg.head.version == Version::HTTP_10 + || !Server::can_chunked(msg.req_method, msg.head.subject) + { + continue; + } + wrote_len = true; + // Must check each value, because `chunked` needs to be the + // last encoding, or else we add it. + must_write_chunked = !headers::is_chunked_(&value); + + if !is_name_written { + encoder = Encoder::chunked(); + is_name_written = true; + header_name_writer.write_header_name_with_colon( + dst, + "transfer-encoding: ", + header::TRANSFER_ENCODING, + ); + extend(dst, value.as_bytes()); + } else { + extend(dst, b", "); + extend(dst, value.as_bytes()); + } + continue 'headers; + } + header::CONNECTION => { + if !is_last && headers::connection_close(&value) { + is_last = true; + } + if !is_name_written { + is_name_written = true; + header_name_writer.write_header_name_with_colon( + dst, + "connection: ", + header::CONNECTION, + ); + extend(dst, value.as_bytes()); + } else { + extend(dst, b", "); + extend(dst, value.as_bytes()); + } + continue 'headers; + } + header::DATE => { + wrote_date = true; + } + _ => (), + } + //TODO: this should perhaps instead combine them into + //single lines, as RFC7230 suggests is preferable. + + // non-special write Name and Value + debug_assert!( + !is_name_written, + "{:?} set is_name_written and didn't continue loop", + name, + ); + header_name_writer.write_header_name(dst, name); + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } + + handle_is_name_written!(); + + if !wrote_len { + encoder = match msg.body { + Some(BodyLength::Unknown) => { + if msg.head.version == Version::HTTP_10 + || !Server::can_chunked(msg.req_method, msg.head.subject) + { + Encoder::close_delimited() + } else { + header_name_writer.write_full_header_line( + dst, + "transfer-encoding: chunked\r\n", + (header::TRANSFER_ENCODING, ": chunked\r\n"), + ); + Encoder::chunked() + } + } + None | Some(BodyLength::Known(0)) => { + if Server::can_have_implicit_zero_content_length( + msg.req_method, + msg.head.subject, + ) { + header_name_writer.write_full_header_line( + dst, + "content-length: 0\r\n", + (header::CONTENT_LENGTH, ": 0\r\n"), + ) + } + Encoder::length(0) + } + Some(BodyLength::Known(len)) => { + if !Server::can_have_content_length(msg.req_method, msg.head.subject) { + Encoder::length(0) + } else { + header_name_writer.write_header_name_with_colon( + dst, + "content-length: ", + header::CONTENT_LENGTH, + ); + extend(dst, ::itoa::Buffer::new().format(len).as_bytes()); + extend(dst, b"\r\n"); + Encoder::length(len) + } + } + }; + } + + if !Server::can_have_body(msg.req_method, msg.head.subject) { + trace!( + "server body forced to 0; method={:?}, status={:?}", + msg.req_method, + msg.head.subject + ); + encoder = Encoder::length(0); + } + + // cached date is much faster than formatting every request + if !wrote_date { + dst.reserve(date::DATE_VALUE_LENGTH + 8); + header_name_writer.write_header_name_with_colon(dst, "date: ", header::DATE); + date::extend(dst); + extend(dst, b"\r\n\r\n"); + } else { + extend(dst, b"\r\n"); + } + + Ok(encoder.set_last(is_last)) + } +} + +#[cfg(feature = "server")] +trait HeaderNameWriter { + fn write_full_header_line( + &mut self, + dst: &mut Vec<u8>, + line: &str, + name_value_pair: (HeaderName, &str), + ); + fn write_header_name_with_colon( + &mut self, + dst: &mut Vec<u8>, + name_with_colon: &str, + name: HeaderName, + ); + fn write_header_name(&mut self, dst: &mut Vec<u8>, name: &HeaderName); +} + +#[cfg(feature = "client")] +impl Http1Transaction for Client { + type Incoming = StatusCode; + type Outgoing = RequestLine; + const LOG: &'static str = "{role=client}"; + + fn parse(buf: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult<StatusCode> { + debug_assert!(!buf.is_empty(), "parse called with empty buf"); + + // Loop to skip information status code headers (100 Continue, etc). + loop { + // Unsafe: see comment in Server Http1Transaction, above. + let mut headers_indices: [MaybeUninit<HeaderIndices>; MAX_HEADERS] = unsafe { + // SAFETY: We can go safely from MaybeUninit array to array of MaybeUninit + MaybeUninit::uninit().assume_init() + }; + let (len, status, reason, version, headers_len) = { + // SAFETY: We can go safely from MaybeUninit array to array of MaybeUninit + let mut headers: [MaybeUninit<httparse::Header<'_>>; MAX_HEADERS] = + unsafe { MaybeUninit::uninit().assume_init() }; + trace!(bytes = buf.len(), "Response.parse"); + let mut res = httparse::Response::new(&mut []); + let bytes = buf.as_ref(); + match ctx.h1_parser_config.parse_response_with_uninit_headers( + &mut res, + bytes, + &mut headers, + ) { + Ok(httparse::Status::Complete(len)) => { + trace!("Response.parse Complete({})", len); + let status = StatusCode::from_u16(res.code.unwrap())?; + + let reason = { + let reason = res.reason.unwrap(); + // Only save the reason phrase if it isn't the canonical reason + if Some(reason) != status.canonical_reason() { + Some(Bytes::copy_from_slice(reason.as_bytes())) + } else { + None + } + }; + + let version = if res.version.unwrap() == 1 { + Version::HTTP_11 + } else { + Version::HTTP_10 + }; + record_header_indices(bytes, &res.headers, &mut headers_indices)?; + let headers_len = res.headers.len(); + (len, status, reason, version, headers_len) + } + Ok(httparse::Status::Partial) => return Ok(None), + Err(httparse::Error::Version) if ctx.h09_responses => { + trace!("Response.parse accepted HTTP/0.9 response"); + + (0, StatusCode::OK, None, Version::HTTP_09, 0) + } + Err(e) => return Err(e.into()), + } + }; + + let mut slice = buf.split_to(len); + + if ctx + .h1_parser_config + .obsolete_multiline_headers_in_responses_are_allowed() + { + for header in &headers_indices[..headers_len] { + // SAFETY: array is valid up to `headers_len` + let header = unsafe { &*header.as_ptr() }; + for b in &mut slice[header.value.0..header.value.1] { + if *b == b'\r' || *b == b'\n' { + *b = b' '; + } + } + } + } + + let slice = slice.freeze(); + + let mut headers = ctx.cached_headers.take().unwrap_or_else(HeaderMap::new); + + let mut keep_alive = version == Version::HTTP_11; + + let mut header_case_map = if ctx.preserve_header_case { + Some(HeaderCaseMap::default()) + } else { + None + }; + + #[cfg(feature = "ffi")] + let mut header_order = if ctx.preserve_header_order { + Some(OriginalHeaderOrder::default()) + } else { + None + }; + + headers.reserve(headers_len); + for header in &headers_indices[..headers_len] { + // SAFETY: array is valid up to `headers_len` + let header = unsafe { &*header.as_ptr() }; + let name = header_name!(&slice[header.name.0..header.name.1]); + let value = header_value!(slice.slice(header.value.0..header.value.1)); + + if let header::CONNECTION = name { + // keep_alive was previously set to default for Version + if keep_alive { + // HTTP/1.1 + keep_alive = !headers::connection_close(&value); + } else { + // HTTP/1.0 + keep_alive = headers::connection_keep_alive(&value); + } + } + + if let Some(ref mut header_case_map) = header_case_map { + header_case_map.append(&name, slice.slice(header.name.0..header.name.1)); + } + + #[cfg(feature = "ffi")] + if let Some(ref mut header_order) = header_order { + header_order.append(&name); + } + + headers.append(name, value); + } + + let mut extensions = http::Extensions::default(); + + if let Some(header_case_map) = header_case_map { + extensions.insert(header_case_map); + } + + #[cfg(feature = "ffi")] + if let Some(header_order) = header_order { + extensions.insert(header_order); + } + + if let Some(reason) = reason { + // Safety: httparse ensures that only valid reason phrase bytes are present in this + // field. + let reason = unsafe { crate::ext::ReasonPhrase::from_bytes_unchecked(reason) }; + extensions.insert(reason); + } + + #[cfg(feature = "ffi")] + if ctx.raw_headers { + extensions.insert(crate::ffi::RawHeaders(crate::ffi::hyper_buf(slice))); + } + + let head = MessageHead { + version, + subject: status, + headers, + extensions, + }; + if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? { + return Ok(Some(ParsedMessage { + head, + decode, + expect_continue: false, + // a client upgrade means the connection can't be used + // again, as it is definitely upgrading. + keep_alive: keep_alive && !is_upgrade, + wants_upgrade: is_upgrade, + })); + } + + #[cfg(feature = "ffi")] + if head.subject.is_informational() { + if let Some(callback) = ctx.on_informational { + callback.call(head.into_response(crate::Body::empty())); + } + } + + // Parsing a 1xx response could have consumed the buffer, check if + // it is empty now... + if buf.is_empty() { + return Ok(None); + } + } + } + + fn encode(msg: Encode<'_, Self::Outgoing>, dst: &mut Vec<u8>) -> crate::Result<Encoder> { + trace!( + "Client::encode method={:?}, body={:?}", + msg.head.subject.0, + msg.body + ); + + *msg.req_method = Some(msg.head.subject.0.clone()); + + let body = Client::set_length(msg.head, msg.body); + + let init_cap = 30 + msg.head.headers.len() * AVERAGE_HEADER_SIZE; + dst.reserve(init_cap); + + extend(dst, msg.head.subject.0.as_str().as_bytes()); + extend(dst, b" "); + //TODO: add API to http::Uri to encode without std::fmt + let _ = write!(FastWrite(dst), "{} ", msg.head.subject.1); + + match msg.head.version { + Version::HTTP_10 => extend(dst, b"HTTP/1.0"), + Version::HTTP_11 => extend(dst, b"HTTP/1.1"), + Version::HTTP_2 => { + debug!("request with HTTP2 version coerced to HTTP/1.1"); + extend(dst, b"HTTP/1.1"); + } + other => panic!("unexpected request version: {:?}", other), + } + extend(dst, b"\r\n"); + + if let Some(orig_headers) = msg.head.extensions.get::<HeaderCaseMap>() { + write_headers_original_case( + &msg.head.headers, + orig_headers, + dst, + msg.title_case_headers, + ); + } else if msg.title_case_headers { + write_headers_title_case(&msg.head.headers, dst); + } else { + write_headers(&msg.head.headers, dst); + } + + extend(dst, b"\r\n"); + msg.head.headers.clear(); //TODO: remove when switching to drain() + + Ok(body) + } + + fn on_error(_err: &crate::Error) -> Option<MessageHead<Self::Outgoing>> { + // we can't tell the server about any errors it creates + None + } + + fn is_client() -> bool { + true + } +} + +#[cfg(feature = "client")] +impl Client { + /// Returns Some(length, wants_upgrade) if successful. + /// + /// Returns None if this message head should be skipped (like a 100 status). + fn decoder( + inc: &MessageHead<StatusCode>, + method: &mut Option<Method>, + ) -> Result<Option<(DecodedLength, bool)>, Parse> { + // According to https://tools.ietf.org/html/rfc7230#section-3.3.3 + // 1. HEAD responses, and Status 1xx, 204, and 304 cannot have a body. + // 2. Status 2xx to a CONNECT cannot have a body. + // 3. Transfer-Encoding: chunked has a chunked body. + // 4. If multiple differing Content-Length headers or invalid, close connection. + // 5. Content-Length header has a sized body. + // 6. (irrelevant to Response) + // 7. Read till EOF. + + match inc.subject.as_u16() { + 101 => { + return Ok(Some((DecodedLength::ZERO, true))); + } + 100 | 102..=199 => { + trace!("ignoring informational response: {}", inc.subject.as_u16()); + return Ok(None); + } + 204 | 304 => return Ok(Some((DecodedLength::ZERO, false))), + _ => (), + } + match *method { + Some(Method::HEAD) => { + return Ok(Some((DecodedLength::ZERO, false))); + } + Some(Method::CONNECT) => { + if let 200..=299 = inc.subject.as_u16() { + return Ok(Some((DecodedLength::ZERO, true))); + } + } + Some(_) => {} + None => { + trace!("Client::decoder is missing the Method"); + } + } + + if inc.headers.contains_key(header::TRANSFER_ENCODING) { + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + // If Transfer-Encoding header is present, and 'chunked' is + // not the final encoding, and this is a Request, then it is + // malformed. A server should respond with 400 Bad Request. + if inc.version == Version::HTTP_10 { + debug!("HTTP/1.0 cannot have Transfer-Encoding header"); + Err(Parse::transfer_encoding_unexpected()) + } else if headers::transfer_encoding_is_chunked(&inc.headers) { + Ok(Some((DecodedLength::CHUNKED, false))) + } else { + trace!("not chunked, read till eof"); + Ok(Some((DecodedLength::CLOSE_DELIMITED, false))) + } + } else if let Some(len) = headers::content_length_parse_all(&inc.headers) { + Ok(Some((DecodedLength::checked_new(len)?, false))) + } else if inc.headers.contains_key(header::CONTENT_LENGTH) { + debug!("illegal Content-Length header"); + Err(Parse::content_length_invalid()) + } else { + trace!("neither Transfer-Encoding nor Content-Length"); + Ok(Some((DecodedLength::CLOSE_DELIMITED, false))) + } + } + fn set_length(head: &mut RequestHead, body: Option<BodyLength>) -> Encoder { + let body = if let Some(body) = body { + body + } else { + head.headers.remove(header::TRANSFER_ENCODING); + return Encoder::length(0); + }; + + // HTTP/1.0 doesn't know about chunked + let can_chunked = head.version == Version::HTTP_11; + let headers = &mut head.headers; + + // If the user already set specific headers, we should respect them, regardless + // of what the HttpBody knows about itself. They set them for a reason. + + // Because of the borrow checker, we can't check the for an existing + // Content-Length header while holding an `Entry` for the Transfer-Encoding + // header, so unfortunately, we must do the check here, first. + + let existing_con_len = headers::content_length_parse_all(headers); + let mut should_remove_con_len = false; + + if !can_chunked { + // Chunked isn't legal, so if it is set, we need to remove it. + if headers.remove(header::TRANSFER_ENCODING).is_some() { + trace!("removing illegal transfer-encoding header"); + } + + return if let Some(len) = existing_con_len { + Encoder::length(len) + } else if let BodyLength::Known(len) = body { + set_content_length(headers, len) + } else { + // HTTP/1.0 client requests without a content-length + // cannot have any body at all. + Encoder::length(0) + }; + } + + // If the user set a transfer-encoding, respect that. Let's just + // make sure `chunked` is the final encoding. + let encoder = match headers.entry(header::TRANSFER_ENCODING) { + Entry::Occupied(te) => { + should_remove_con_len = true; + if headers::is_chunked(te.iter()) { + Some(Encoder::chunked()) + } else { + warn!("user provided transfer-encoding does not end in 'chunked'"); + + // There's a Transfer-Encoding, but it doesn't end in 'chunked'! + // An example that could trigger this: + // + // Transfer-Encoding: gzip + // + // This can be bad, depending on if this is a request or a + // response. + // + // - A request is illegal if there is a `Transfer-Encoding` + // but it doesn't end in `chunked`. + // - A response that has `Transfer-Encoding` but doesn't + // end in `chunked` isn't illegal, it just forces this + // to be close-delimited. + // + // We can try to repair this, by adding `chunked` ourselves. + + headers::add_chunked(te); + Some(Encoder::chunked()) + } + } + Entry::Vacant(te) => { + if let Some(len) = existing_con_len { + Some(Encoder::length(len)) + } else if let BodyLength::Unknown = body { + // GET, HEAD, and CONNECT almost never have bodies. + // + // So instead of sending a "chunked" body with a 0-chunk, + // assume no body here. If you *must* send a body, + // set the headers explicitly. + match head.subject.0 { + Method::GET | Method::HEAD | Method::CONNECT => Some(Encoder::length(0)), + _ => { + te.insert(HeaderValue::from_static("chunked")); + Some(Encoder::chunked()) + } + } + } else { + None + } + } + }; + + // This is because we need a second mutable borrow to remove + // content-length header. + if let Some(encoder) = encoder { + if should_remove_con_len && existing_con_len.is_some() { + headers.remove(header::CONTENT_LENGTH); + } + return encoder; + } + + // User didn't set transfer-encoding, AND we know body length, + // so we can just set the Content-Length automatically. + + let len = if let BodyLength::Known(len) = body { + len + } else { + unreachable!("BodyLength::Unknown would set chunked"); + }; + + set_content_length(headers, len) + } +} + +fn set_content_length(headers: &mut HeaderMap, len: u64) -> Encoder { + // At this point, there should not be a valid Content-Length + // header. However, since we'll be indexing in anyways, we can + // warn the user if there was an existing illegal header. + // + // Or at least, we can in theory. It's actually a little bit slower, + // so perhaps only do that while the user is developing/testing. + + if cfg!(debug_assertions) { + match headers.entry(header::CONTENT_LENGTH) { + Entry::Occupied(mut cl) => { + // Internal sanity check, we should have already determined + // that the header was illegal before calling this function. + debug_assert!(headers::content_length_parse_all_values(cl.iter()).is_none()); + // Uh oh, the user set `Content-Length` headers, but set bad ones. + // This would be an illegal message anyways, so let's try to repair + // with our known good length. + error!("user provided content-length header was invalid"); + + cl.insert(HeaderValue::from(len)); + Encoder::length(len) + } + Entry::Vacant(cl) => { + cl.insert(HeaderValue::from(len)); + Encoder::length(len) + } + } + } else { + headers.insert(header::CONTENT_LENGTH, HeaderValue::from(len)); + Encoder::length(len) + } +} + +#[derive(Clone, Copy)] +struct HeaderIndices { + name: (usize, usize), + value: (usize, usize), +} + +fn record_header_indices( + bytes: &[u8], + headers: &[httparse::Header<'_>], + indices: &mut [MaybeUninit<HeaderIndices>], +) -> Result<(), crate::error::Parse> { + let bytes_ptr = bytes.as_ptr() as usize; + + for (header, indices) in headers.iter().zip(indices.iter_mut()) { + if header.name.len() >= (1 << 16) { + debug!("header name larger than 64kb: {:?}", header.name); + return Err(crate::error::Parse::TooLarge); + } + let name_start = header.name.as_ptr() as usize - bytes_ptr; + let name_end = name_start + header.name.len(); + let value_start = header.value.as_ptr() as usize - bytes_ptr; + let value_end = value_start + header.value.len(); + + // FIXME(maybe_uninit_extra) + // FIXME(addr_of) + // Currently we don't have `ptr::addr_of_mut` in stable rust or + // MaybeUninit::write, so this is some way of assigning into a MaybeUninit + // safely + let new_header_indices = HeaderIndices { + name: (name_start, name_end), + value: (value_start, value_end), + }; + *indices = MaybeUninit::new(new_header_indices); + } + + Ok(()) +} + +// Write header names as title case. The header name is assumed to be ASCII. +fn title_case(dst: &mut Vec<u8>, name: &[u8]) { + dst.reserve(name.len()); + + // Ensure first character is uppercased + let mut prev = b'-'; + for &(mut c) in name { + if prev == b'-' { + c.make_ascii_uppercase(); + } + dst.push(c); + prev = c; + } +} + +fn write_headers_title_case(headers: &HeaderMap, dst: &mut Vec<u8>) { + for (name, value) in headers { + title_case(dst, name.as_str().as_bytes()); + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } +} + +fn write_headers(headers: &HeaderMap, dst: &mut Vec<u8>) { + for (name, value) in headers { + extend(dst, name.as_str().as_bytes()); + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } +} + +#[cold] +fn write_headers_original_case( + headers: &HeaderMap, + orig_case: &HeaderCaseMap, + dst: &mut Vec<u8>, + title_case_headers: bool, +) { + // For each header name/value pair, there may be a value in the casemap + // that corresponds to the HeaderValue. So, we iterator all the keys, + // and for each one, try to pair the originally cased name with the value. + // + // TODO: consider adding http::HeaderMap::entries() iterator + for name in headers.keys() { + let mut names = orig_case.get_all(name); + + for value in headers.get_all(name) { + if let Some(orig_name) = names.next() { + extend(dst, orig_name.as_ref()); + } else if title_case_headers { + title_case(dst, name.as_str().as_bytes()); + } else { + extend(dst, name.as_str().as_bytes()); + } + + // Wanted for curl test cases that send `X-Custom-Header:\r\n` + if value.is_empty() { + extend(dst, b":\r\n"); + } else { + extend(dst, b": "); + extend(dst, value.as_bytes()); + extend(dst, b"\r\n"); + } + } + } +} + +struct FastWrite<'a>(&'a mut Vec<u8>); + +impl<'a> fmt::Write for FastWrite<'a> { + #[inline] + fn write_str(&mut self, s: &str) -> fmt::Result { + extend(self.0, s.as_bytes()); + Ok(()) + } + + #[inline] + fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { + fmt::write(self, args) + } +} + +#[inline] +fn extend(dst: &mut Vec<u8>, data: &[u8]) { + dst.extend_from_slice(data); +} + +#[cfg(test)] +mod tests { + use bytes::BytesMut; + + use super::*; + + #[test] + fn test_parse_request() { + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from("GET /echo HTTP/1.1\r\nHost: hyper.rs\r\n\r\n"); + let mut method = None; + let msg = Server::parse( + &mut raw, + ParseContext { + cached_headers: &mut None, + req_method: &mut method, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .unwrap() + .unwrap(); + assert_eq!(raw.len(), 0); + assert_eq!(msg.head.subject.0, crate::Method::GET); + assert_eq!(msg.head.subject.1, "/echo"); + assert_eq!(msg.head.version, crate::Version::HTTP_11); + assert_eq!(msg.head.headers.len(), 1); + assert_eq!(msg.head.headers["Host"], "hyper.rs"); + assert_eq!(method, Some(crate::Method::GET)); + } + + #[test] + fn test_parse_response() { + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut Some(crate::Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); + assert_eq!(raw.len(), 0); + assert_eq!(msg.head.subject, crate::StatusCode::OK); + assert_eq!(msg.head.version, crate::Version::HTTP_11); + assert_eq!(msg.head.headers.len(), 1); + assert_eq!(msg.head.headers["Content-Length"], "0"); + } + + #[test] + fn test_parse_request_errors() { + let mut raw = BytesMut::from("GET htt:p// HTTP/1.1\r\nHost: hyper.rs\r\n\r\n"); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + Server::parse(&mut raw, ctx).unwrap_err(); + } + + const H09_RESPONSE: &'static str = "Baguettes are super delicious, don't you agree?"; + + #[test] + fn test_parse_response_h09_allowed() { + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from(H09_RESPONSE); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut Some(crate::Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: true, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); + assert_eq!(raw, H09_RESPONSE); + assert_eq!(msg.head.subject, crate::StatusCode::OK); + assert_eq!(msg.head.version, crate::Version::HTTP_09); + assert_eq!(msg.head.headers.len(), 0); + } + + #[test] + fn test_parse_response_h09_rejected() { + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from(H09_RESPONSE); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut Some(crate::Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + Client::parse(&mut raw, ctx).unwrap_err(); + assert_eq!(raw, H09_RESPONSE); + } + + const RESPONSE_WITH_WHITESPACE_BETWEEN_HEADER_NAME_AND_COLON: &'static str = + "HTTP/1.1 200 OK\r\nAccess-Control-Allow-Credentials : true\r\n\r\n"; + + #[test] + fn test_parse_allow_response_with_spaces_before_colons() { + use httparse::ParserConfig; + + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from(RESPONSE_WITH_WHITESPACE_BETWEEN_HEADER_NAME_AND_COLON); + let mut h1_parser_config = ParserConfig::default(); + h1_parser_config.allow_spaces_after_header_name_in_responses(true); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut Some(crate::Method::GET), + h1_parser_config, + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + let msg = Client::parse(&mut raw, ctx).unwrap().unwrap(); + assert_eq!(raw.len(), 0); + assert_eq!(msg.head.subject, crate::StatusCode::OK); + assert_eq!(msg.head.version, crate::Version::HTTP_11); + assert_eq!(msg.head.headers.len(), 1); + assert_eq!(msg.head.headers["Access-Control-Allow-Credentials"], "true"); + } + + #[test] + fn test_parse_reject_response_with_spaces_before_colons() { + let _ = pretty_env_logger::try_init(); + let mut raw = BytesMut::from(RESPONSE_WITH_WHITESPACE_BETWEEN_HEADER_NAME_AND_COLON); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut Some(crate::Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + Client::parse(&mut raw, ctx).unwrap_err(); + } + + #[test] + fn test_parse_preserve_header_case_in_request() { + let mut raw = + BytesMut::from("GET / HTTP/1.1\r\nHost: hyper.rs\r\nX-BREAD: baguette\r\n\r\n"); + let ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: true, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }; + let parsed_message = Server::parse(&mut raw, ctx).unwrap().unwrap(); + let orig_headers = parsed_message + .head + .extensions + .get::<HeaderCaseMap>() + .unwrap(); + assert_eq!( + orig_headers + .get_all_internal(&HeaderName::from_static("host")) + .into_iter() + .collect::<Vec<_>>(), + vec![&Bytes::from("Host")] + ); + assert_eq!( + orig_headers + .get_all_internal(&HeaderName::from_static("x-bread")) + .into_iter() + .collect::<Vec<_>>(), + vec![&Bytes::from("X-BREAD")] + ); + } + + #[test] + fn test_decoder_request() { + fn parse(s: &str) -> ParsedMessage<RequestLine> { + let mut bytes = BytesMut::from(s); + Server::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .expect("parse ok") + .expect("parse complete") + } + + fn parse_err(s: &str, comment: &str) -> crate::error::Parse { + let mut bytes = BytesMut::from(s); + Server::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .expect_err(comment) + } + + // no length or transfer-encoding means 0-length body + assert_eq!( + parse( + "\ + GET / HTTP/1.1\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::ZERO + ); + + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::ZERO + ); + + // transfer-encoding: chunked + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: gzip, chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: gzip\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + // content-length + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + content-length: 10\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::new(10) + ); + + // transfer-encoding and content-length = chunked + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + content-length: 10\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: chunked\r\n\ + content-length: 10\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: gzip\r\n\ + content-length: 10\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + // multiple content-lengths of same value are fine + assert_eq!( + parse( + "\ + POST / HTTP/1.1\r\n\ + content-length: 10\r\n\ + content-length: 10\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::new(10) + ); + + // multiple content-lengths with different values is an error + parse_err( + "\ + POST / HTTP/1.1\r\n\ + content-length: 10\r\n\ + content-length: 11\r\n\ + \r\n\ + ", + "multiple content-lengths", + ); + + // content-length with prefix is not allowed + parse_err( + "\ + POST / HTTP/1.1\r\n\ + content-length: +10\r\n\ + \r\n\ + ", + "prefixed content-length", + ); + + // transfer-encoding that isn't chunked is an error + parse_err( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: gzip\r\n\ + \r\n\ + ", + "transfer-encoding but not chunked", + ); + + parse_err( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: chunked, gzip\r\n\ + \r\n\ + ", + "transfer-encoding doesn't end in chunked", + ); + + parse_err( + "\ + POST / HTTP/1.1\r\n\ + transfer-encoding: chunked\r\n\ + transfer-encoding: afterlol\r\n\ + \r\n\ + ", + "transfer-encoding multiple lines doesn't end in chunked", + ); + + // http/1.0 + + assert_eq!( + parse( + "\ + POST / HTTP/1.0\r\n\ + content-length: 10\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::new(10) + ); + + // 1.0 doesn't understand chunked, so its an error + parse_err( + "\ + POST / HTTP/1.0\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + ", + "1.0 chunked", + ); + } + + #[test] + fn test_decoder_response() { + fn parse(s: &str) -> ParsedMessage<StatusCode> { + parse_with_method(s, Method::GET) + } + + fn parse_ignores(s: &str) { + let mut bytes = BytesMut::from(s); + assert!(Client::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut Some(Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + } + ) + .expect("parse ok") + .is_none()) + } + + fn parse_with_method(s: &str, m: Method) -> ParsedMessage<StatusCode> { + let mut bytes = BytesMut::from(s); + Client::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut Some(m), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .expect("parse ok") + .expect("parse complete") + } + + fn parse_err(s: &str) -> crate::error::Parse { + let mut bytes = BytesMut::from(s); + Client::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut Some(Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .expect_err("parse should err") + } + + // no content-length or transfer-encoding means close-delimited + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CLOSE_DELIMITED + ); + + // 204 and 304 never have a body + assert_eq!( + parse( + "\ + HTTP/1.1 204 No Content\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::ZERO + ); + + assert_eq!( + parse( + "\ + HTTP/1.1 304 Not Modified\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::ZERO + ); + + // content-length + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 8\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::new(8) + ); + + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 8\r\n\ + content-length: 8\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::new(8) + ); + + parse_err( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 8\r\n\ + content-length: 9\r\n\ + \r\n\ + ", + ); + + parse_err( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: +8\r\n\ + \r\n\ + ", + ); + + // transfer-encoding: chunked + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + // transfer-encoding not-chunked is close-delimited + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: yolo\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CLOSE_DELIMITED + ); + + // transfer-encoding and content-length = chunked + assert_eq!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 10\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CHUNKED + ); + + // HEAD can have content-length, but not body + assert_eq!( + parse_with_method( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 8\r\n\ + \r\n\ + ", + Method::HEAD + ) + .decode, + DecodedLength::ZERO + ); + + // CONNECT with 200 never has body + { + let msg = parse_with_method( + "\ + HTTP/1.1 200 OK\r\n\ + \r\n\ + ", + Method::CONNECT, + ); + assert_eq!(msg.decode, DecodedLength::ZERO); + assert!(!msg.keep_alive, "should be upgrade"); + assert!(msg.wants_upgrade, "should be upgrade"); + } + + // CONNECT receiving non 200 can have a body + assert_eq!( + parse_with_method( + "\ + HTTP/1.1 400 Bad Request\r\n\ + \r\n\ + ", + Method::CONNECT + ) + .decode, + DecodedLength::CLOSE_DELIMITED + ); + + // 1xx status codes + parse_ignores( + "\ + HTTP/1.1 100 Continue\r\n\ + \r\n\ + ", + ); + + parse_ignores( + "\ + HTTP/1.1 103 Early Hints\r\n\ + \r\n\ + ", + ); + + // 101 upgrade not supported yet + { + let msg = parse( + "\ + HTTP/1.1 101 Switching Protocols\r\n\ + \r\n\ + ", + ); + assert_eq!(msg.decode, DecodedLength::ZERO); + assert!(!msg.keep_alive, "should be last"); + assert!(msg.wants_upgrade, "should be upgrade"); + } + + // http/1.0 + assert_eq!( + parse( + "\ + HTTP/1.0 200 OK\r\n\ + \r\n\ + " + ) + .decode, + DecodedLength::CLOSE_DELIMITED + ); + + // 1.0 doesn't understand chunked + parse_err( + "\ + HTTP/1.0 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + ", + ); + + // keep-alive + assert!( + parse( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 0\r\n\ + \r\n\ + " + ) + .keep_alive, + "HTTP/1.1 keep-alive is default" + ); + + assert!( + !parse( + "\ + HTTP/1.1 200 OK\r\n\ + content-length: 0\r\n\ + connection: foo, close, bar\r\n\ + \r\n\ + " + ) + .keep_alive, + "connection close is always close" + ); + + assert!( + !parse( + "\ + HTTP/1.0 200 OK\r\n\ + content-length: 0\r\n\ + \r\n\ + " + ) + .keep_alive, + "HTTP/1.0 close is default" + ); + + assert!( + parse( + "\ + HTTP/1.0 200 OK\r\n\ + content-length: 0\r\n\ + connection: foo, keep-alive, bar\r\n\ + \r\n\ + " + ) + .keep_alive, + "connection keep-alive is always keep-alive" + ); + } + + #[test] + fn test_client_request_encode_title_case() { + use crate::proto::BodyLength; + use http::header::HeaderValue; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + head.headers.insert("*-*", HeaderValue::from_static("o_o")); + + let mut vec = Vec::new(); + Client::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: true, + }, + &mut vec, + ) + .unwrap(); + + assert_eq!(vec, b"GET / HTTP/1.1\r\nContent-Length: 10\r\nContent-Type: application/json\r\n*-*: o_o\r\n\r\n".to_vec()); + } + + #[test] + fn test_client_request_encode_orig_case() { + use crate::proto::BodyLength; + use http::header::{HeaderValue, CONTENT_LENGTH}; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + + let mut orig_headers = HeaderCaseMap::default(); + orig_headers.insert(CONTENT_LENGTH, "CONTENT-LENGTH".into()); + head.extensions.insert(orig_headers); + + let mut vec = Vec::new(); + Client::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: false, + }, + &mut vec, + ) + .unwrap(); + + assert_eq!( + &*vec, + b"GET / HTTP/1.1\r\nCONTENT-LENGTH: 10\r\ncontent-type: application/json\r\n\r\n" + .as_ref(), + ); + } + #[test] + fn test_client_request_encode_orig_and_title_case() { + use crate::proto::BodyLength; + use http::header::{HeaderValue, CONTENT_LENGTH}; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + + let mut orig_headers = HeaderCaseMap::default(); + orig_headers.insert(CONTENT_LENGTH, "CONTENT-LENGTH".into()); + head.extensions.insert(orig_headers); + + let mut vec = Vec::new(); + Client::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: true, + }, + &mut vec, + ) + .unwrap(); + + assert_eq!( + &*vec, + b"GET / HTTP/1.1\r\nCONTENT-LENGTH: 10\r\nContent-Type: application/json\r\n\r\n" + .as_ref(), + ); + } + + #[test] + fn test_server_encode_connect_method() { + let mut head = MessageHead::default(); + + let mut vec = Vec::new(); + let encoder = Server::encode( + Encode { + head: &mut head, + body: None, + keep_alive: true, + req_method: &mut Some(Method::CONNECT), + title_case_headers: false, + }, + &mut vec, + ) + .unwrap(); + + assert!(encoder.is_last()); + } + + #[test] + fn test_server_response_encode_title_case() { + use crate::proto::BodyLength; + use http::header::HeaderValue; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + head.headers + .insert("weird--header", HeaderValue::from_static("")); + + let mut vec = Vec::new(); + Server::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: true, + }, + &mut vec, + ) + .unwrap(); + + let expected_response = + b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\nContent-Type: application/json\r\nWeird--Header: \r\n"; + + assert_eq!(&vec[..expected_response.len()], &expected_response[..]); + } + + #[test] + fn test_server_response_encode_orig_case() { + use crate::proto::BodyLength; + use http::header::{HeaderValue, CONTENT_LENGTH}; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + + let mut orig_headers = HeaderCaseMap::default(); + orig_headers.insert(CONTENT_LENGTH, "CONTENT-LENGTH".into()); + head.extensions.insert(orig_headers); + + let mut vec = Vec::new(); + Server::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: false, + }, + &mut vec, + ) + .unwrap(); + + let expected_response = + b"HTTP/1.1 200 OK\r\nCONTENT-LENGTH: 10\r\ncontent-type: application/json\r\ndate: "; + + assert_eq!(&vec[..expected_response.len()], &expected_response[..]); + } + + #[test] + fn test_server_response_encode_orig_and_title_case() { + use crate::proto::BodyLength; + use http::header::{HeaderValue, CONTENT_LENGTH}; + + let mut head = MessageHead::default(); + head.headers + .insert("content-length", HeaderValue::from_static("10")); + head.headers + .insert("content-type", HeaderValue::from_static("application/json")); + + let mut orig_headers = HeaderCaseMap::default(); + orig_headers.insert(CONTENT_LENGTH, "CONTENT-LENGTH".into()); + head.extensions.insert(orig_headers); + + let mut vec = Vec::new(); + Server::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut None, + title_case_headers: true, + }, + &mut vec, + ) + .unwrap(); + + let expected_response = + b"HTTP/1.1 200 OK\r\nCONTENT-LENGTH: 10\r\nContent-Type: application/json\r\nDate: "; + + assert_eq!(&vec[..expected_response.len()], &expected_response[..]); + } + + #[test] + fn parse_header_htabs() { + let mut bytes = BytesMut::from("HTTP/1.1 200 OK\r\nserver: hello\tworld\r\n\r\n"); + let parsed = Client::parse( + &mut bytes, + ParseContext { + cached_headers: &mut None, + req_method: &mut Some(Method::GET), + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .expect("parse ok") + .expect("parse complete"); + + assert_eq!(parsed.head.headers["server"], "hello\tworld"); + } + + #[test] + fn test_write_headers_orig_case_empty_value() { + let mut headers = HeaderMap::new(); + let name = http::header::HeaderName::from_static("x-empty"); + headers.insert(&name, "".parse().expect("parse empty")); + let mut orig_cases = HeaderCaseMap::default(); + orig_cases.insert(name, Bytes::from_static(b"X-EmptY")); + + let mut dst = Vec::new(); + super::write_headers_original_case(&headers, &orig_cases, &mut dst, false); + + assert_eq!( + dst, b"X-EmptY:\r\n", + "there should be no space between the colon and CRLF" + ); + } + + #[test] + fn test_write_headers_orig_case_multiple_entries() { + let mut headers = HeaderMap::new(); + let name = http::header::HeaderName::from_static("x-empty"); + headers.insert(&name, "a".parse().unwrap()); + headers.append(&name, "b".parse().unwrap()); + + let mut orig_cases = HeaderCaseMap::default(); + orig_cases.insert(name.clone(), Bytes::from_static(b"X-Empty")); + orig_cases.append(name, Bytes::from_static(b"X-EMPTY")); + + let mut dst = Vec::new(); + super::write_headers_original_case(&headers, &orig_cases, &mut dst, false); + + assert_eq!(dst, b"X-Empty: a\r\nX-EMPTY: b\r\n"); + } + + #[cfg(feature = "nightly")] + use test::Bencher; + + #[cfg(feature = "nightly")] + #[bench] + fn bench_parse_incoming(b: &mut Bencher) { + let mut raw = BytesMut::from( + &b"GET /super_long_uri/and_whatever?what_should_we_talk_about/\ + I_wonder/Hard_to_write_in_an_uri_after_all/you_have_to_make\ + _up_the_punctuation_yourself/how_fun_is_that?test=foo&test1=\ + foo1&test2=foo2&test3=foo3&test4=foo4 HTTP/1.1\r\nHost: \ + hyper.rs\r\nAccept: a lot of things\r\nAccept-Charset: \ + utf8\r\nAccept-Encoding: *\r\nAccess-Control-Allow-\ + Credentials: None\r\nAccess-Control-Allow-Origin: None\r\n\ + Access-Control-Allow-Methods: None\r\nAccess-Control-Allow-\ + Headers: None\r\nContent-Encoding: utf8\r\nContent-Security-\ + Policy: None\r\nContent-Type: text/html\r\nOrigin: hyper\ + \r\nSec-Websocket-Extensions: It looks super important!\r\n\ + Sec-Websocket-Origin: hyper\r\nSec-Websocket-Version: 4.3\r\ + \nStrict-Transport-Security: None\r\nUser-Agent: hyper\r\n\ + X-Content-Duration: None\r\nX-Content-Security-Policy: None\ + \r\nX-DNSPrefetch-Control: None\r\nX-Frame-Options: \ + Something important obviously\r\nX-Requested-With: Nothing\ + \r\n\r\n"[..], + ); + let len = raw.len(); + let mut headers = Some(HeaderMap::new()); + + b.bytes = len as u64; + b.iter(|| { + let mut msg = Server::parse( + &mut raw, + ParseContext { + cached_headers: &mut headers, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .unwrap() + .unwrap(); + ::test::black_box(&msg); + msg.head.headers.clear(); + headers = Some(msg.head.headers); + restart(&mut raw, len); + }); + + fn restart(b: &mut BytesMut, len: usize) { + b.reserve(1); + unsafe { + b.set_len(len); + } + } + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_parse_short(b: &mut Bencher) { + let s = &b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n"[..]; + let mut raw = BytesMut::from(s); + let len = raw.len(); + let mut headers = Some(HeaderMap::new()); + + b.bytes = len as u64; + b.iter(|| { + let mut msg = Server::parse( + &mut raw, + ParseContext { + cached_headers: &mut headers, + req_method: &mut None, + h1_parser_config: Default::default(), + #[cfg(feature = "runtime")] + h1_header_read_timeout: None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_fut: &mut None, + #[cfg(feature = "runtime")] + h1_header_read_timeout_running: &mut false, + preserve_header_case: false, + #[cfg(feature = "ffi")] + preserve_header_order: false, + h09_responses: false, + #[cfg(feature = "ffi")] + on_informational: &mut None, + #[cfg(feature = "ffi")] + raw_headers: false, + }, + ) + .unwrap() + .unwrap(); + ::test::black_box(&msg); + msg.head.headers.clear(); + headers = Some(msg.head.headers); + restart(&mut raw, len); + }); + + fn restart(b: &mut BytesMut, len: usize) { + b.reserve(1); + unsafe { + b.set_len(len); + } + } + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_server_encode_headers_preset(b: &mut Bencher) { + use crate::proto::BodyLength; + use http::header::HeaderValue; + + let len = 108; + b.bytes = len as u64; + + let mut head = MessageHead::default(); + let mut headers = HeaderMap::new(); + headers.insert("content-length", HeaderValue::from_static("10")); + headers.insert("content-type", HeaderValue::from_static("application/json")); + + b.iter(|| { + let mut vec = Vec::new(); + head.headers = headers.clone(); + Server::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut Some(Method::GET), + title_case_headers: false, + }, + &mut vec, + ) + .unwrap(); + assert_eq!(vec.len(), len); + ::test::black_box(vec); + }) + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_server_encode_no_headers(b: &mut Bencher) { + use crate::proto::BodyLength; + + let len = 76; + b.bytes = len as u64; + + let mut head = MessageHead::default(); + let mut vec = Vec::with_capacity(128); + + b.iter(|| { + Server::encode( + Encode { + head: &mut head, + body: Some(BodyLength::Known(10)), + keep_alive: true, + req_method: &mut Some(Method::GET), + title_case_headers: false, + }, + &mut vec, + ) + .unwrap(); + assert_eq!(vec.len(), len); + ::test::black_box(&vec); + + vec.clear(); + }) + } +} diff --git a/third_party/rust/hyper/src/proto/h2/client.rs b/third_party/rust/hyper/src/proto/h2/client.rs new file mode 100644 index 0000000000..bac8eceb3a --- /dev/null +++ b/third_party/rust/hyper/src/proto/h2/client.rs @@ -0,0 +1,450 @@ +use std::error::Error as StdError; +#[cfg(feature = "runtime")] +use std::time::Duration; + +use bytes::Bytes; +use futures_channel::{mpsc, oneshot}; +use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; +use futures_util::stream::StreamExt as _; +use h2::client::{Builder, SendRequest}; +use h2::SendStream; +use http::{Method, StatusCode}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, trace, warn}; + +use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; +use crate::body::HttpBody; +use crate::client::dispatch::Callback; +use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; +use crate::ext::Protocol; +use crate::headers; +use crate::proto::h2::UpgradedSendStream; +use crate::proto::Dispatched; +use crate::upgrade::Upgraded; +use crate::{Body, Request, Response}; +use h2::client::ResponseFuture; + +type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; + +///// An mpsc channel is used to help notify the `Connection` task when *all* +///// other handles to it have been dropped, so that it can shutdown. +type ConnDropRef = mpsc::Sender<Never>; + +///// A oneshot channel watches the `Connection` task, and when it completes, +///// the "dispatch" task will be notified and can shutdown sooner. +type ConnEof = oneshot::Receiver<Never>; + +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb +const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) max_frame_size: u32, + #[cfg(feature = "runtime")] + pub(crate) keep_alive_interval: Option<Duration>, + #[cfg(feature = "runtime")] + pub(crate) keep_alive_timeout: Duration, + #[cfg(feature = "runtime")] + pub(crate) keep_alive_while_idle: bool, + pub(crate) max_concurrent_reset_streams: Option<usize>, + pub(crate) max_send_buffer_size: usize, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + max_frame_size: DEFAULT_MAX_FRAME_SIZE, + #[cfg(feature = "runtime")] + keep_alive_interval: None, + #[cfg(feature = "runtime")] + keep_alive_timeout: Duration::from_secs(20), + #[cfg(feature = "runtime")] + keep_alive_while_idle: false, + max_concurrent_reset_streams: None, + max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, + } + } +} + +fn new_builder(config: &Config) -> Builder { + let mut builder = Builder::default(); + builder + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .max_frame_size(config.max_frame_size) + .max_send_buffer_size(config.max_send_buffer_size) + .enable_push(false); + if let Some(max) = config.max_concurrent_reset_streams { + builder.max_concurrent_reset_streams(max); + } + builder +} + +fn new_ping_config(config: &Config) -> ping::Config { + ping::Config { + bdp_initial_window: if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }, + #[cfg(feature = "runtime")] + keep_alive_interval: config.keep_alive_interval, + #[cfg(feature = "runtime")] + keep_alive_timeout: config.keep_alive_timeout, + #[cfg(feature = "runtime")] + keep_alive_while_idle: config.keep_alive_while_idle, + } +} + +pub(crate) async fn handshake<T, B>( + io: T, + req_rx: ClientRx<B>, + config: &Config, + exec: Exec, +) -> crate::Result<ClientTask<B>> +where + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, + B: HttpBody, + B::Data: Send + 'static, +{ + let (h2_tx, mut conn) = new_builder(config) + .handshake::<_, SendBuf<B::Data>>(io) + .await + .map_err(crate::Error::new_h2)?; + + // An mpsc channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (conn_drop_ref, rx) = mpsc::channel(1); + let (cancel_tx, conn_eof) = oneshot::channel(); + + let conn_drop_rx = rx.into_future().map(|(item, _rx)| { + if let Some(never) = item { + match never {} + } + }); + + let ping_config = new_ping_config(&config); + + let (conn, ping) = if ping_config.is_enabled() { + let pp = conn.ping_pong().expect("conn.ping_pong"); + let (recorder, mut ponger) = ping::channel(pp, ping_config); + + let conn = future::poll_fn(move |cx| { + match ponger.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + conn.set_target_window_size(wnd); + conn.set_initial_window_size(wnd)?; + } + #[cfg(feature = "runtime")] + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("connection keep-alive timed out"); + return Poll::Ready(Ok(())); + } + Poll::Pending => {} + } + + Pin::new(&mut conn).poll(cx) + }); + (Either::Left(conn), recorder) + } else { + (Either::Right(conn), ping::disabled()) + }; + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + + Ok(ClientTask { + ping, + conn_drop_ref, + conn_eof, + executor: exec, + h2_tx, + req_rx, + fut_ctx: None, + }) +} + +async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Never>) +where + C: Future + Unpin, + D: Future<Output = ()> + Unpin, +{ + match future::select(conn, drop_rx).await { + Either::Left(_) => { + // ok or err, the `conn` has finished + } + Either::Right(((), conn)) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + drop(cancel_tx); + let _ = conn.await; + } + } +} + +struct FutCtx<B> +where + B: HttpBody, +{ + is_connect: bool, + eos: bool, + fut: ResponseFuture, + body_tx: SendStream<SendBuf<B::Data>>, + body: B, + cb: Callback<Request<B>, Response<Body>>, +} + +impl<B: HttpBody> Unpin for FutCtx<B> {} + +pub(crate) struct ClientTask<B> +where + B: HttpBody, +{ + ping: ping::Recorder, + conn_drop_ref: ConnDropRef, + conn_eof: ConnEof, + executor: Exec, + h2_tx: SendRequest<SendBuf<B::Data>>, + req_rx: ClientRx<B>, + fut_ctx: Option<FutCtx<B>>, +} + +impl<B> ClientTask<B> +where + B: HttpBody + 'static, +{ + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.h2_tx.is_extended_connect_protocol_enabled() + } +} + +impl<B> ClientTask<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut task::Context<'_>) { + let ping = self.ping.clone(); + let send_stream = if !f.is_connect { + if !f.eos { + let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| { + if let Err(e) = res { + debug!("client request body error: {}", e); + } + }); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match Pin::new(&mut pipe).poll(cx) { + Poll::Ready(_) => (), + Poll::Pending => { + let conn_drop_ref = self.conn_drop_ref.clone(); + // keep the ping recorder's knowledge of an + // "open stream" alive while this body is + // still sending... + let ping = ping.clone(); + let pipe = pipe.map(move |x| { + drop(conn_drop_ref); + drop(ping); + x + }); + // Clear send task + self.executor.execute(pipe); + } + } + } + + None + } else { + Some(f.body_tx) + }; + + let fut = f.fut.map(move |result| match result { + Ok(res) => { + // record that we got the response headers + ping.record_non_data(); + + let content_length = headers::content_length_parse_all(res.headers()); + if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect response with non-zero body not supported"); + + send_stream.send_reset(h2::Reason::INTERNAL_ERROR); + return Err(( + crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + None, + )); + } + let (parts, recv_stream) = res.into_parts(); + let mut res = Response::from_parts(parts, Body::empty()); + + let (pending, on_upgrade) = crate::upgrade::pending(); + let io = H2Upgraded { + ping, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + recv_stream, + buf: Bytes::new(), + }; + let upgraded = Upgraded::new(io, Bytes::new()); + + pending.fulfill(upgraded); + res.extensions_mut().insert(on_upgrade); + + Ok(res) + } else { + let res = res.map(|stream| { + let ping = ping.for_stream(&stream); + crate::Body::h2(stream, content_length.into(), ping) + }); + Ok(res) + } + } + Err(err) => { + ping.ensure_not_timed_out().map_err(|e| (e, None))?; + + debug!("client response error: {}", err); + Err((crate::Error::new_h2(err), None)) + } + }); + self.executor.execute(f.cb.send_when(fut)); + } +} + +impl<B> Future for ClientTask<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<Dispatched>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + loop { + match ready!(self.h2_tx.poll_ready(cx)) { + Ok(()) => (), + Err(err) => { + self.ping.ensure_not_timed_out()?; + return if err.reason() == Some(::h2::Reason::NO_ERROR) { + trace!("connection gracefully shutdown"); + Poll::Ready(Ok(Dispatched::Shutdown)) + } else { + Poll::Ready(Err(crate::Error::new_h2(err))) + }; + } + }; + + match self.fut_ctx.take() { + // If we were waiting on pending open + // continue where we left off. + Some(f) => { + self.poll_pipe(f, cx); + continue; + } + None => (), + } + + match self.req_rx.poll_recv(cx) { + Poll::Ready(Some((req, cb))) => { + // check that future hasn't been canceled already + if cb.is_canceled() { + trace!("request callback is canceled"); + continue; + } + let (head, body) = req.into_parts(); + let mut req = ::http::Request::from_parts(head, ()); + super::strip_connection_headers(req.headers_mut(), true); + if let Some(len) = body.size_hint().exact() { + if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { + headers::set_content_length_if_missing(req.headers_mut(), len); + } + } + + let is_connect = req.method() == Method::CONNECT; + let eos = body.is_end_stream(); + + if is_connect { + if headers::content_length_parse_all(req.headers()) + .map_or(false, |len| len != 0) + { + warn!("h2 connect request with non-zero body not supported"); + cb.send(Err(( + crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + None, + ))); + continue; + } + } + + if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { + req.extensions_mut().insert(protocol.into_inner()); + } + + let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { + Ok(ok) => ok, + Err(err) => { + debug!("client send request error: {}", err); + cb.send(Err((crate::Error::new_h2(err), None))); + continue; + } + }; + + let f = FutCtx { + is_connect, + eos, + fut, + body_tx, + body, + cb, + }; + + // Check poll_ready() again. + // If the call to send_request() resulted in the new stream being pending open + // we have to wait for the open to complete before accepting new requests. + match self.h2_tx.poll_ready(cx) { + Poll::Pending => { + // Save Context + self.fut_ctx = Some(f); + return Poll::Pending; + } + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(err)) => { + f.cb.send(Err((crate::Error::new_h2(err), None))); + continue; + } + } + self.poll_pipe(f, cx); + continue; + } + + Poll::Ready(None) => { + trace!("client::dispatch::Sender dropped"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + + Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { + Ok(never) => match never {}, + Err(_conn_is_eof) => { + trace!("connection task is closed, closing dispatch task"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + }, + } + } + } +} diff --git a/third_party/rust/hyper/src/proto/h2/mod.rs b/third_party/rust/hyper/src/proto/h2/mod.rs new file mode 100644 index 0000000000..5857c919d1 --- /dev/null +++ b/third_party/rust/hyper/src/proto/h2/mod.rs @@ -0,0 +1,471 @@ +use bytes::{Buf, Bytes}; +use h2::{Reason, RecvStream, SendStream}; +use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE}; +use http::HeaderMap; +use pin_project_lite::pin_project; +use std::error::Error as StdError; +use std::io::{self, Cursor, IoSlice}; +use std::mem; +use std::task::Context; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tracing::{debug, trace, warn}; + +use crate::body::HttpBody; +use crate::common::{task, Future, Pin, Poll}; +use crate::proto::h2::ping::Recorder; + +pub(crate) mod ping; + +cfg_client! { + pub(crate) mod client; + pub(crate) use self::client::ClientTask; +} + +cfg_server! { + pub(crate) mod server; + pub(crate) use self::server::Server; +} + +/// Default initial stream window size defined in HTTP2 spec. +pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; + +fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { + // List of connection headers from: + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection + // + // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're + // tested separately. + let connection_headers = [ + HeaderName::from_lowercase(b"keep-alive").unwrap(), + HeaderName::from_lowercase(b"proxy-connection").unwrap(), + TRAILER, + TRANSFER_ENCODING, + UPGRADE, + ]; + + for header in connection_headers.iter() { + if headers.remove(header).is_some() { + warn!("Connection header illegal in HTTP/2: {}", header.as_str()); + } + } + + if is_request { + if headers + .get(TE) + .map(|te_header| te_header != "trailers") + .unwrap_or(false) + { + warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests"); + headers.remove(TE); + } + } else if headers.remove(TE).is_some() { + warn!("TE headers illegal in HTTP/2 responses"); + } + + if let Some(header) = headers.remove(CONNECTION) { + warn!( + "Connection header illegal in HTTP/2: {}", + CONNECTION.as_str() + ); + let header_contents = header.to_str().unwrap(); + + // A `Connection` header may have a comma-separated list of names of other headers that + // are meant for only this specific connection. + // + // Iterate these names and remove them as headers. Connection-specific headers are + // forbidden in HTTP2, as that information has been moved into frame types of the h2 + // protocol. + for name in header_contents.split(',') { + let name = name.trim(); + headers.remove(name); + } + } +} + +// body adapters used by both Client and Server + +pin_project! { + struct PipeToSendStream<S> + where + S: HttpBody, + { + body_tx: SendStream<SendBuf<S::Data>>, + data_done: bool, + #[pin] + stream: S, + } +} + +impl<S> PipeToSendStream<S> +where + S: HttpBody, +{ + fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { + PipeToSendStream { + body_tx: tx, + data_done: false, + stream, + } + } +} + +impl<S> Future for PipeToSendStream<S> +where + S: HttpBody, + S::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + if !*me.data_done { + // we don't have the next chunk of data yet, so just reserve 1 byte to make + // sure there's some capacity available. h2 will handle the capacity management + // for the actual body chunk. + me.body_tx.reserve_capacity(1); + + if me.body_tx.capacity() == 0 { + loop { + match ready!(me.body_tx.poll_capacity(cx)) { + Some(Ok(0)) => {} + Some(Ok(_)) => break, + Some(Err(e)) => { + return Poll::Ready(Err(crate::Error::new_body_write(e))) + } + None => { + // None means the stream is no longer in a + // streaming state, we either finished it + // somehow, or the remote reset us. + return Poll::Ready(Err(crate::Error::new_body_write( + "send stream capacity unexpectedly closed", + ))); + } + } + } + } else if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( + reason, + )))); + } + + match ready!(me.stream.as_mut().poll_data(cx)) { + Some(Ok(chunk)) => { + let is_eos = me.stream.is_end_stream(); + trace!( + "send body chunk: {} bytes, eos={}", + chunk.remaining(), + is_eos, + ); + + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; + + if is_eos { + return Poll::Ready(Ok(())); + } + } + Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), + None => { + me.body_tx.reserve_capacity(0); + let is_eos = me.stream.is_end_stream(); + if is_eos { + return Poll::Ready(me.body_tx.send_eos_frame()); + } else { + *me.data_done = true; + // loop again to poll_trailers + } + } + } + } else { + if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from( + reason, + )))); + } + + match ready!(me.stream.poll_trailers(cx)) { + Ok(Some(trailers)) => { + me.body_tx + .send_trailers(trailers) + .map_err(crate::Error::new_body_write)?; + return Poll::Ready(Ok(())); + } + Ok(None) => { + // There were no trailers, so send an empty DATA frame... + return Poll::Ready(me.body_tx.send_eos_frame()); + } + Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), + } + } + } + } +} + +trait SendStreamExt { + fn on_user_err<E>(&mut self, err: E) -> crate::Error + where + E: Into<Box<dyn std::error::Error + Send + Sync>>; + fn send_eos_frame(&mut self) -> crate::Result<()>; +} + +impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { + fn on_user_err<E>(&mut self, err: E) -> crate::Error + where + E: Into<Box<dyn std::error::Error + Send + Sync>>, + { + let err = crate::Error::new_user_body(err); + debug!("send body user stream error: {}", err); + self.send_reset(err.h2_reason()); + err + } + + fn send_eos_frame(&mut self) -> crate::Result<()> { + trace!("send body eos"); + self.send_data(SendBuf::None, true) + .map_err(crate::Error::new_body_write) + } +} + +#[repr(usize)] +enum SendBuf<B> { + Buf(B), + Cursor(Cursor<Box<[u8]>>), + None, +} + +impl<B: Buf> Buf for SendBuf<B> { + #[inline] + fn remaining(&self) -> usize { + match *self { + Self::Buf(ref b) => b.remaining(), + Self::Cursor(ref c) => Buf::remaining(c), + Self::None => 0, + } + } + + #[inline] + fn chunk(&self) -> &[u8] { + match *self { + Self::Buf(ref b) => b.chunk(), + Self::Cursor(ref c) => c.chunk(), + Self::None => &[], + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match *self { + Self::Buf(ref mut b) => b.advance(cnt), + Self::Cursor(ref mut c) => c.advance(cnt), + Self::None => {} + } + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + match *self { + Self::Buf(ref b) => b.chunks_vectored(dst), + Self::Cursor(ref c) => c.chunks_vectored(dst), + Self::None => 0, + } + } +} + +struct H2Upgraded<B> +where + B: Buf, +{ + ping: Recorder, + send_stream: UpgradedSendStream<B>, + recv_stream: RecvStream, + buf: Bytes, +} + +impl<B> AsyncRead for H2Upgraded<B> +where + B: Buf, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + read_buf: &mut ReadBuf<'_>, + ) -> Poll<Result<(), io::Error>> { + if self.buf.is_empty() { + self.buf = loop { + match ready!(self.recv_stream.poll_data(cx)) { + None => return Poll::Ready(Ok(())), + Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { + continue + } + Some(Ok(buf)) => { + self.ping.record_data(buf.len()); + break buf; + } + Some(Err(e)) => { + return Poll::Ready(match e.reason() { + Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), + Some(Reason::STREAM_CLOSED) => { + Err(io::Error::new(io::ErrorKind::BrokenPipe, e)) + } + _ => Err(h2_to_io_error(e)), + }) + } + } + }; + } + let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); + read_buf.put_slice(&self.buf[..cnt]); + self.buf.advance(cnt); + let _ = self.recv_stream.flow_control().release_capacity(cnt); + Poll::Ready(Ok(())) + } +} + +impl<B> AsyncWrite for H2Upgraded<B> +where + B: Buf, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, io::Error>> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + self.send_stream.reserve_capacity(buf.len()); + + // We ignore all errors returned by `poll_capacity` and `write`, as we + // will get the correct from `poll_reset` anyway. + let cnt = match ready!(self.send_stream.poll_capacity(cx)) { + None => Some(0), + Some(Ok(cnt)) => self + .send_stream + .write(&buf[..cnt], false) + .ok() + .map(|()| cnt), + Some(Err(_)) => None, + }; + + if let Some(cnt) = cnt { + return Poll::Ready(Ok(cnt)); + } + + Poll::Ready(Err(h2_to_io_error( + match ready!(self.send_stream.poll_reset(cx)) { + Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) + } + Ok(reason) => reason.into(), + Err(e) => e, + }, + ))) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), io::Error>> { + if self.send_stream.write(&[], true).is_ok() { + return Poll::Ready(Ok(())) + } + + Poll::Ready(Err(h2_to_io_error( + match ready!(self.send_stream.poll_reset(cx)) { + Ok(Reason::NO_ERROR) => { + return Poll::Ready(Ok(())) + } + Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) + } + Ok(reason) => reason.into(), + Err(e) => e, + }, + ))) + } +} + +fn h2_to_io_error(e: h2::Error) -> io::Error { + if e.is_io() { + e.into_io().unwrap() + } else { + io::Error::new(io::ErrorKind::Other, e) + } +} + +struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); + +impl<B> UpgradedSendStream<B> +where + B: Buf, +{ + unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { + assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); + Self(mem::transmute(inner)) + } + + fn reserve_capacity(&mut self, cnt: usize) { + unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } + } + + fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { + unsafe { self.as_inner_unchecked().poll_capacity(cx) } + } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { + unsafe { self.as_inner_unchecked().poll_reset(cx) } + } + + fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> { + let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); + unsafe { + self.as_inner_unchecked() + .send_data(send_buf, end_of_stream) + .map_err(h2_to_io_error) + } + } + + unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { + &mut *(&mut self.0 as *mut _ as *mut _) + } +} + +#[repr(transparent)] +struct Neutered<B> { + _inner: B, + impossible: Impossible, +} + +enum Impossible {} + +unsafe impl<B> Send for Neutered<B> {} + +impl<B> Buf for Neutered<B> { + fn remaining(&self) -> usize { + match self.impossible {} + } + + fn chunk(&self) -> &[u8] { + match self.impossible {} + } + + fn advance(&mut self, _cnt: usize) { + match self.impossible {} + } +} diff --git a/third_party/rust/hyper/src/proto/h2/ping.rs b/third_party/rust/hyper/src/proto/h2/ping.rs new file mode 100644 index 0000000000..1e8386497c --- /dev/null +++ b/third_party/rust/hyper/src/proto/h2/ping.rs @@ -0,0 +1,555 @@ +/// HTTP2 Ping usage +/// +/// hyper uses HTTP2 pings for two purposes: +/// +/// 1. Adaptive flow control using BDP +/// 2. Connection keep-alive +/// +/// Both cases are optional. +/// +/// # BDP Algorithm +/// +/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: +/// 1a. Record current time. +/// 1b. Send a BDP ping. +/// 2. Increment the number of received bytes. +/// 3. When the BDP ping ack is received: +/// 3a. Record duration from sent time. +/// 3b. Merge RTT with a running average. +/// 3c. Calculate bdp as bytes/rtt. +/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. + +#[cfg(feature = "runtime")] +use std::fmt; +#[cfg(feature = "runtime")] +use std::future::Future; +#[cfg(feature = "runtime")] +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{self, Poll}; +use std::time::Duration; +#[cfg(not(feature = "runtime"))] +use std::time::Instant; + +use h2::{Ping, PingPong}; +#[cfg(feature = "runtime")] +use tokio::time::{Instant, Sleep}; +use tracing::{debug, trace}; + +type WindowSize = u32; + +pub(super) fn disabled() -> Recorder { + Recorder { shared: None } +} + +pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) { + debug_assert!( + config.is_enabled(), + "ping channel requires bdp or keep-alive config", + ); + + let bdp = config.bdp_initial_window.map(|wnd| Bdp { + bdp: wnd, + max_bandwidth: 0.0, + rtt: 0.0, + ping_delay: Duration::from_millis(100), + stable_count: 0, + }); + + let (bytes, next_bdp_at) = if bdp.is_some() { + (Some(0), Some(Instant::now())) + } else { + (None, None) + }; + + #[cfg(feature = "runtime")] + let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { + interval, + timeout: config.keep_alive_timeout, + while_idle: config.keep_alive_while_idle, + timer: Box::pin(tokio::time::sleep(interval)), + state: KeepAliveState::Init, + }); + + #[cfg(feature = "runtime")] + let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); + + let shared = Arc::new(Mutex::new(Shared { + bytes, + #[cfg(feature = "runtime")] + last_read_at, + #[cfg(feature = "runtime")] + is_keep_alive_timed_out: false, + ping_pong, + ping_sent_at: None, + next_bdp_at, + })); + + ( + Recorder { + shared: Some(shared.clone()), + }, + Ponger { + bdp, + #[cfg(feature = "runtime")] + keep_alive, + shared, + }, + ) +} + +#[derive(Clone)] +pub(super) struct Config { + pub(super) bdp_initial_window: Option<WindowSize>, + /// If no frames are received in this amount of time, a PING frame is sent. + #[cfg(feature = "runtime")] + pub(super) keep_alive_interval: Option<Duration>, + /// After sending a keepalive PING, the connection will be closed if + /// a pong is not received in this amount of time. + #[cfg(feature = "runtime")] + pub(super) keep_alive_timeout: Duration, + /// If true, sends pings even when there are no active streams. + #[cfg(feature = "runtime")] + pub(super) keep_alive_while_idle: bool, +} + +#[derive(Clone)] +pub(crate) struct Recorder { + shared: Option<Arc<Mutex<Shared>>>, +} + +pub(super) struct Ponger { + bdp: Option<Bdp>, + #[cfg(feature = "runtime")] + keep_alive: Option<KeepAlive>, + shared: Arc<Mutex<Shared>>, +} + +struct Shared { + ping_pong: PingPong, + ping_sent_at: Option<Instant>, + + // bdp + /// If `Some`, bdp is enabled, and this tracks how many bytes have been + /// read during the current sample. + bytes: Option<usize>, + /// We delay a variable amount of time between BDP pings. This allows us + /// to send less pings as the bandwidth stabilizes. + next_bdp_at: Option<Instant>, + + // keep-alive + /// If `Some`, keep-alive is enabled, and the Instant is how long ago + /// the connection read the last frame. + #[cfg(feature = "runtime")] + last_read_at: Option<Instant>, + + #[cfg(feature = "runtime")] + is_keep_alive_timed_out: bool, +} + +struct Bdp { + /// Current BDP in bytes + bdp: u32, + /// Largest bandwidth we've seen so far. + max_bandwidth: f64, + /// Round trip time in seconds + rtt: f64, + /// Delay the next ping by this amount. + /// + /// This will change depending on how stable the current bandwidth is. + ping_delay: Duration, + /// The count of ping round trips where BDP has stayed the same. + stable_count: u32, +} + +#[cfg(feature = "runtime")] +struct KeepAlive { + /// If no frames are received in this amount of time, a PING frame is sent. + interval: Duration, + /// After sending a keepalive PING, the connection will be closed if + /// a pong is not received in this amount of time. + timeout: Duration, + /// If true, sends pings even when there are no active streams. + while_idle: bool, + + state: KeepAliveState, + timer: Pin<Box<Sleep>>, +} + +#[cfg(feature = "runtime")] +enum KeepAliveState { + Init, + Scheduled, + PingSent, +} + +pub(super) enum Ponged { + SizeUpdate(WindowSize), + #[cfg(feature = "runtime")] + KeepAliveTimedOut, +} + +#[cfg(feature = "runtime")] +#[derive(Debug)] +pub(super) struct KeepAliveTimedOut; + +// ===== impl Config ===== + +impl Config { + pub(super) fn is_enabled(&self) -> bool { + #[cfg(feature = "runtime")] + { + self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() + } + + #[cfg(not(feature = "runtime"))] + { + self.bdp_initial_window.is_some() + } + } +} + +// ===== impl Recorder ===== + +impl Recorder { + pub(crate) fn record_data(&self, len: usize) { + let shared = if let Some(ref shared) = self.shared { + shared + } else { + return; + }; + + let mut locked = shared.lock().unwrap(); + + #[cfg(feature = "runtime")] + locked.update_last_read_at(); + + // are we ready to send another bdp ping? + // if not, we don't need to record bytes either + + if let Some(ref next_bdp_at) = locked.next_bdp_at { + if Instant::now() < *next_bdp_at { + return; + } else { + locked.next_bdp_at = None; + } + } + + if let Some(ref mut bytes) = locked.bytes { + *bytes += len; + } else { + // no need to send bdp ping if bdp is disabled + return; + } + + if !locked.is_ping_sent() { + locked.send_ping(); + } + } + + pub(crate) fn record_non_data(&self) { + #[cfg(feature = "runtime")] + { + let shared = if let Some(ref shared) = self.shared { + shared + } else { + return; + }; + + let mut locked = shared.lock().unwrap(); + + locked.update_last_read_at(); + } + } + + /// If the incoming stream is already closed, convert self into + /// a disabled reporter. + #[cfg(feature = "client")] + pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { + if stream.is_end_stream() { + disabled() + } else { + self + } + } + + pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { + #[cfg(feature = "runtime")] + { + if let Some(ref shared) = self.shared { + let locked = shared.lock().unwrap(); + if locked.is_keep_alive_timed_out { + return Err(KeepAliveTimedOut.crate_error()); + } + } + } + + // else + Ok(()) + } +} + +// ===== impl Ponger ===== + +impl Ponger { + pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> { + let now = Instant::now(); + let mut locked = self.shared.lock().unwrap(); + #[cfg(feature = "runtime")] + let is_idle = self.is_idle(); + + #[cfg(feature = "runtime")] + { + if let Some(ref mut ka) = self.keep_alive { + ka.schedule(is_idle, &locked); + ka.maybe_ping(cx, &mut locked); + } + } + + if !locked.is_ping_sent() { + // XXX: this doesn't register a waker...? + return Poll::Pending; + } + + match locked.ping_pong.poll_pong(cx) { + Poll::Ready(Ok(_pong)) => { + let start = locked + .ping_sent_at + .expect("pong received implies ping_sent_at"); + locked.ping_sent_at = None; + let rtt = now - start; + trace!("recv pong"); + + #[cfg(feature = "runtime")] + { + if let Some(ref mut ka) = self.keep_alive { + locked.update_last_read_at(); + ka.schedule(is_idle, &locked); + } + } + + if let Some(ref mut bdp) = self.bdp { + let bytes = locked.bytes.expect("bdp enabled implies bytes"); + locked.bytes = Some(0); // reset + trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); + + let update = bdp.calculate(bytes, rtt); + locked.next_bdp_at = Some(now + bdp.ping_delay); + if let Some(update) = update { + return Poll::Ready(Ponged::SizeUpdate(update)) + } + } + } + Poll::Ready(Err(e)) => { + debug!("pong error: {}", e); + } + Poll::Pending => { + #[cfg(feature = "runtime")] + { + if let Some(ref mut ka) = self.keep_alive { + if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { + self.keep_alive = None; + locked.is_keep_alive_timed_out = true; + return Poll::Ready(Ponged::KeepAliveTimedOut); + } + } + } + } + } + + // XXX: this doesn't register a waker...? + Poll::Pending + } + + #[cfg(feature = "runtime")] + fn is_idle(&self) -> bool { + Arc::strong_count(&self.shared) <= 2 + } +} + +// ===== impl Shared ===== + +impl Shared { + fn send_ping(&mut self) { + match self.ping_pong.send_ping(Ping::opaque()) { + Ok(()) => { + self.ping_sent_at = Some(Instant::now()); + trace!("sent ping"); + } + Err(err) => { + debug!("error sending ping: {}", err); + } + } + } + + fn is_ping_sent(&self) -> bool { + self.ping_sent_at.is_some() + } + + #[cfg(feature = "runtime")] + fn update_last_read_at(&mut self) { + if self.last_read_at.is_some() { + self.last_read_at = Some(Instant::now()); + } + } + + #[cfg(feature = "runtime")] + fn last_read_at(&self) -> Instant { + self.last_read_at.expect("keep_alive expects last_read_at") + } +} + +// ===== impl Bdp ===== + +/// Any higher than this likely will be hitting the TCP flow control. +const BDP_LIMIT: usize = 1024 * 1024 * 16; + +impl Bdp { + fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { + // No need to do any math if we're at the limit. + if self.bdp as usize == BDP_LIMIT { + self.stabilize_delay(); + return None; + } + + // average the rtt + let rtt = seconds(rtt); + if self.rtt == 0.0 { + // First sample means rtt is first rtt. + self.rtt = rtt; + } else { + // Weigh this rtt as 1/8 for a moving average. + self.rtt += (rtt - self.rtt) * 0.125; + } + + // calculate the current bandwidth + let bw = (bytes as f64) / (self.rtt * 1.5); + trace!("current bandwidth = {:.1}B/s", bw); + + if bw < self.max_bandwidth { + // not a faster bandwidth, so don't update + self.stabilize_delay(); + return None; + } else { + self.max_bandwidth = bw; + } + + // if the current `bytes` sample is at least 2/3 the previous + // bdp, increase to double the current sample. + if bytes >= self.bdp as usize * 2 / 3 { + self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; + trace!("BDP increased to {}", self.bdp); + + self.stable_count = 0; + self.ping_delay /= 2; + Some(self.bdp) + } else { + self.stabilize_delay(); + None + } + } + + fn stabilize_delay(&mut self) { + if self.ping_delay < Duration::from_secs(10) { + self.stable_count += 1; + + if self.stable_count >= 2 { + self.ping_delay *= 4; + self.stable_count = 0; + } + } + } +} + +fn seconds(dur: Duration) -> f64 { + const NANOS_PER_SEC: f64 = 1_000_000_000.0; + let secs = dur.as_secs() as f64; + secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC +} + +// ===== impl KeepAlive ===== + +#[cfg(feature = "runtime")] +impl KeepAlive { + fn schedule(&mut self, is_idle: bool, shared: &Shared) { + match self.state { + KeepAliveState::Init => { + if !self.while_idle && is_idle { + return; + } + + self.state = KeepAliveState::Scheduled; + let interval = shared.last_read_at() + self.interval; + self.timer.as_mut().reset(interval); + } + KeepAliveState::PingSent => { + if shared.is_ping_sent() { + return; + } + + self.state = KeepAliveState::Scheduled; + let interval = shared.last_read_at() + self.interval; + self.timer.as_mut().reset(interval); + } + KeepAliveState::Scheduled => (), + } + } + + fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) { + match self.state { + KeepAliveState::Scheduled => { + if Pin::new(&mut self.timer).poll(cx).is_pending() { + return; + } + // check if we've received a frame while we were scheduled + if shared.last_read_at() + self.interval > self.timer.deadline() { + self.state = KeepAliveState::Init; + cx.waker().wake_by_ref(); // schedule us again + return; + } + trace!("keep-alive interval ({:?}) reached", self.interval); + shared.send_ping(); + self.state = KeepAliveState::PingSent; + let timeout = Instant::now() + self.timeout; + self.timer.as_mut().reset(timeout); + } + KeepAliveState::Init | KeepAliveState::PingSent => (), + } + } + + fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { + match self.state { + KeepAliveState::PingSent => { + if Pin::new(&mut self.timer).poll(cx).is_pending() { + return Ok(()); + } + trace!("keep-alive timeout ({:?}) reached", self.timeout); + Err(KeepAliveTimedOut) + } + KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()), + } + } +} + +// ===== impl KeepAliveTimedOut ===== + +#[cfg(feature = "runtime")] +impl KeepAliveTimedOut { + pub(super) fn crate_error(self) -> crate::Error { + crate::Error::new(crate::error::Kind::Http2).with(self) + } +} + +#[cfg(feature = "runtime")] +impl fmt::Display for KeepAliveTimedOut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("keep-alive timed out") + } +} + +#[cfg(feature = "runtime")] +impl std::error::Error for KeepAliveTimedOut { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&crate::error::TimedOut) + } +} diff --git a/third_party/rust/hyper/src/proto/h2/server.rs b/third_party/rust/hyper/src/proto/h2/server.rs new file mode 100644 index 0000000000..d24e6bac5f --- /dev/null +++ b/third_party/rust/hyper/src/proto/h2/server.rs @@ -0,0 +1,548 @@ +use std::error::Error as StdError; +use std::marker::Unpin; +#[cfg(feature = "runtime")] +use std::time::Duration; + +use bytes::Bytes; +use h2::server::{Connection, Handshake, SendResponse}; +use h2::{Reason, RecvStream}; +use http::{Method, Request}; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{debug, trace, warn}; + +use super::{ping, PipeToSendStream, SendBuf}; +use crate::body::HttpBody; +use crate::common::exec::ConnStreamExec; +use crate::common::{date, task, Future, Pin, Poll}; +use crate::ext::Protocol; +use crate::headers; +use crate::proto::h2::ping::Recorder; +use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; +use crate::proto::Dispatched; +use crate::service::HttpService; + +use crate::upgrade::{OnUpgrade, Pending, Upgraded}; +use crate::{Body, Response}; + +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +// +// At the same time, a server more often has multiple clients connected, and +// so is more likely to use more resources than a client would. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb +// 16 MB "sane default" taken from golang http2 +const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) max_frame_size: u32, + pub(crate) enable_connect_protocol: bool, + pub(crate) max_concurrent_streams: Option<u32>, + #[cfg(feature = "runtime")] + pub(crate) keep_alive_interval: Option<Duration>, + #[cfg(feature = "runtime")] + pub(crate) keep_alive_timeout: Duration, + pub(crate) max_send_buffer_size: usize, + pub(crate) max_header_list_size: u32, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + max_frame_size: DEFAULT_MAX_FRAME_SIZE, + enable_connect_protocol: false, + max_concurrent_streams: None, + #[cfg(feature = "runtime")] + keep_alive_interval: None, + #[cfg(feature = "runtime")] + keep_alive_timeout: Duration::from_secs(20), + max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, + max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, + } + } +} + +pin_project! { + pub(crate) struct Server<T, S, B, E> + where + S: HttpService<Body>, + B: HttpBody, + { + exec: E, + service: S, + state: State<T, B>, + } +} + +enum State<T, B> +where + B: HttpBody, +{ + Handshaking { + ping_config: ping::Config, + hs: Handshake<T, SendBuf<B::Data>>, + }, + Serving(Serving<T, B>), + Closed, +} + +struct Serving<T, B> +where + B: HttpBody, +{ + ping: Option<(ping::Recorder, ping::Ponger)>, + conn: Connection<T, SendBuf<B::Data>>, + closing: Option<crate::Error>, +} + +impl<T, S, B, E> Server<T, S, B, E> +where + T: AsyncRead + AsyncWrite + Unpin, + S: HttpService<Body, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + B: HttpBody + 'static, + E: ConnStreamExec<S::Future, B>, +{ + pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> { + let mut builder = h2::server::Builder::default(); + builder + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .max_frame_size(config.max_frame_size) + .max_header_list_size(config.max_header_list_size) + .max_send_buffer_size(config.max_send_buffer_size); + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } + if config.enable_connect_protocol { + builder.enable_connect_protocol(); + } + let handshake = builder.handshake(io); + + let bdp = if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }; + + let ping_config = ping::Config { + bdp_initial_window: bdp, + #[cfg(feature = "runtime")] + keep_alive_interval: config.keep_alive_interval, + #[cfg(feature = "runtime")] + keep_alive_timeout: config.keep_alive_timeout, + // If keep-alive is enabled for servers, always enabled while + // idle, so it can more aggressively close dead connections. + #[cfg(feature = "runtime")] + keep_alive_while_idle: true, + }; + + Server { + exec, + state: State::Handshaking { + ping_config, + hs: handshake, + }, + service, + } + } + + pub(crate) fn graceful_shutdown(&mut self) { + trace!("graceful_shutdown"); + match self.state { + State::Handshaking { .. } => { + // fall-through, to replace state with Closed + } + State::Serving(ref mut srv) => { + if srv.closing.is_none() { + srv.conn.graceful_shutdown(); + } + return; + } + State::Closed => { + return; + } + } + self.state = State::Closed; + } +} + +impl<T, S, B, E> Future for Server<T, S, B, E> +where + T: AsyncRead + AsyncWrite + Unpin, + S: HttpService<Body, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + B: HttpBody + 'static, + E: ConnStreamExec<S::Future, B>, +{ + type Output = crate::Result<Dispatched>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let me = &mut *self; + loop { + let next = match me.state { + State::Handshaking { + ref mut hs, + ref ping_config, + } => { + let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; + let ping = if ping_config.is_enabled() { + let pp = conn.ping_pong().expect("conn.ping_pong"); + Some(ping::channel(pp, ping_config.clone())) + } else { + None + }; + State::Serving(Serving { + ping, + conn, + closing: None, + }) + } + State::Serving(ref mut srv) => { + ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + State::Closed => { + // graceful_shutdown was called before handshaking finished, + // nothing to do here... + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + }; + me.state = next; + } + } +} + +impl<T, B> Serving<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: HttpBody + 'static, +{ + fn poll_server<S, E>( + &mut self, + cx: &mut task::Context<'_>, + service: &mut S, + exec: &mut E, + ) -> Poll<crate::Result<()>> + where + S: HttpService<Body, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + E: ConnStreamExec<S::Future, B>, + { + if self.closing.is_none() { + loop { + self.poll_ping(cx); + + // Check that the service is ready to accept a new request. + // + // - If not, just drive the connection some. + // - If ready, try to accept a new request from the connection. + match service.poll_ready(cx) { + Poll::Ready(Ok(())) => (), + Poll::Pending => { + // use `poll_closed` instead of `poll_accept`, + // in order to avoid accepting a request. + ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; + trace!("incoming connection complete"); + return Poll::Ready(Ok(())); + } + Poll::Ready(Err(err)) => { + let err = crate::Error::new_user_service(err); + debug!("service closed: {}", err); + + let reason = err.h2_reason(); + if reason == Reason::NO_ERROR { + // NO_ERROR is only used for graceful shutdowns... + trace!("interpreting NO_ERROR user error as graceful_shutdown"); + self.conn.graceful_shutdown(); + } else { + trace!("abruptly shutting down with {:?}", reason); + self.conn.abrupt_shutdown(reason); + } + self.closing = Some(err); + break; + } + } + + // When the service is ready, accepts an incoming request. + match ready!(self.conn.poll_accept(cx)) { + Some(Ok((req, mut respond))) => { + trace!("incoming request"); + let content_length = headers::content_length_parse_all(req.headers()); + let ping = self + .ping + .as_ref() + .map(|ping| ping.0.clone()) + .unwrap_or_else(ping::disabled); + + // Record the headers received + ping.record_non_data(); + + let is_connect = req.method() == Method::CONNECT; + let (mut parts, stream) = req.into_parts(); + let (mut req, connect_parts) = if !is_connect { + ( + Request::from_parts( + parts, + crate::Body::h2(stream, content_length.into(), ping), + ), + None, + ) + } else { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect request with non-zero body not supported"); + respond.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Ok(())); + } + let (pending, upgrade) = crate::upgrade::pending(); + debug_assert!(parts.extensions.get::<OnUpgrade>().is_none()); + parts.extensions.insert(upgrade); + ( + Request::from_parts(parts, crate::Body::empty()), + Some(ConnectParts { + pending, + ping, + recv_stream: stream, + }), + ) + }; + + if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() { + req.extensions_mut().insert(Protocol::from_inner(protocol)); + } + + let fut = H2Stream::new(service.call(req), connect_parts, respond); + exec.execute_h2stream(fut); + } + Some(Err(e)) => { + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + None => { + // no more incoming streams... + if let Some((ref ping, _)) = self.ping { + ping.ensure_not_timed_out()?; + } + + trace!("incoming connection complete"); + return Poll::Ready(Ok(())); + } + } + } + } + + debug_assert!( + self.closing.is_some(), + "poll_server broke loop without closing" + ); + + ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; + + Poll::Ready(Err(self.closing.take().expect("polled after error"))) + } + + fn poll_ping(&mut self, cx: &mut task::Context<'_>) { + if let Some((_, ref mut estimator)) = self.ping { + match estimator.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + self.conn.set_target_window_size(wnd); + let _ = self.conn.set_initial_window_size(wnd); + } + #[cfg(feature = "runtime")] + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("keep-alive timed out, closing connection"); + self.conn.abrupt_shutdown(h2::Reason::NO_ERROR); + } + Poll::Pending => {} + } + } + } +} + +pin_project! { + #[allow(missing_debug_implementations)] + pub struct H2Stream<F, B> + where + B: HttpBody, + { + reply: SendResponse<SendBuf<B::Data>>, + #[pin] + state: H2StreamState<F, B>, + } +} + +pin_project! { + #[project = H2StreamStateProj] + enum H2StreamState<F, B> + where + B: HttpBody, + { + Service { + #[pin] + fut: F, + connect_parts: Option<ConnectParts>, + }, + Body { + #[pin] + pipe: PipeToSendStream<B>, + }, + } +} + +struct ConnectParts { + pending: Pending, + ping: Recorder, + recv_stream: RecvStream, +} + +impl<F, B> H2Stream<F, B> +where + B: HttpBody, +{ + fn new( + fut: F, + connect_parts: Option<ConnectParts>, + respond: SendResponse<SendBuf<B::Data>>, + ) -> H2Stream<F, B> { + H2Stream { + reply: respond, + state: H2StreamState::Service { fut, connect_parts }, + } + } +} + +macro_rules! reply { + ($me:expr, $res:expr, $eos:expr) => {{ + match $me.reply.send_response($res, $eos) { + Ok(tx) => tx, + Err(e) => { + debug!("send response error: {}", e); + $me.reply.send_reset(Reason::INTERNAL_ERROR); + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + } + }}; +} + +impl<F, B, E> H2Stream<F, B> +where + F: Future<Output = Result<Response<B>, E>>, + B: HttpBody, + B::Data: 'static, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + E: Into<Box<dyn StdError + Send + Sync>>, +{ + fn poll2(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + let mut me = self.project(); + loop { + let next = match me.state.as_mut().project() { + H2StreamStateProj::Service { + fut: h, + connect_parts, + } => { + let res = match h.poll(cx) { + Poll::Ready(Ok(r)) => r, + Poll::Pending => { + // Response is not yet ready, so we want to check if the client has sent a + // RST_STREAM frame which would cancel the current request. + if let Poll::Ready(reason) = + me.reply.poll_reset(cx).map_err(crate::Error::new_h2)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); + } + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + let err = crate::Error::new_user_service(e); + warn!("http2 service errored: {}", err); + me.reply.send_reset(err.h2_reason()); + return Poll::Ready(Err(err)); + } + }; + + let (head, body) = res.into_parts(); + let mut res = ::http::Response::from_parts(head, ()); + super::strip_connection_headers(res.headers_mut(), false); + + // set Date header if it isn't already set... + res.headers_mut() + .entry(::http::header::DATE) + .or_insert_with(date::update_and_header_value); + + if let Some(connect_parts) = connect_parts.take() { + if res.status().is_success() { + if headers::content_length_parse_all(res.headers()) + .map_or(false, |len| len != 0) + { + warn!("h2 successful response to CONNECT request with body not supported"); + me.reply.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Err(crate::Error::new_user_header())); + } + let send_stream = reply!(me, res, false); + connect_parts.pending.fulfill(Upgraded::new( + H2Upgraded { + ping: connect_parts.ping, + recv_stream: connect_parts.recv_stream, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + buf: Bytes::new(), + }, + Bytes::new(), + )); + return Poll::Ready(Ok(())); + } + } + + + if !body.is_end_stream() { + // automatically set Content-Length from body... + if let Some(len) = body.size_hint().exact() { + headers::set_content_length_if_missing(res.headers_mut(), len); + } + + let body_tx = reply!(me, res, false); + H2StreamState::Body { + pipe: PipeToSendStream::new(body, body_tx), + } + } else { + reply!(me, res, true); + return Poll::Ready(Ok(())); + } + } + H2StreamStateProj::Body { pipe } => { + return pipe.poll(cx); + } + }; + me.state.set(next); + } + } +} + +impl<F, B, E> Future for H2Stream<F, B> +where + F: Future<Output = Result<Response<B>, E>>, + B: HttpBody, + B::Data: 'static, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + E: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.poll2(cx).map(|res| { + if let Err(e) = res { + debug!("stream error: {}", e); + } + }) + } +} diff --git a/third_party/rust/hyper/src/proto/mod.rs b/third_party/rust/hyper/src/proto/mod.rs new file mode 100644 index 0000000000..f938bf532b --- /dev/null +++ b/third_party/rust/hyper/src/proto/mod.rs @@ -0,0 +1,71 @@ +//! Pieces pertaining to the HTTP message protocol. + +cfg_feature! { + #![feature = "http1"] + + pub(crate) mod h1; + + pub(crate) use self::h1::Conn; + + #[cfg(feature = "client")] + pub(crate) use self::h1::dispatch; + #[cfg(feature = "server")] + pub(crate) use self::h1::ServerTransaction; +} + +#[cfg(feature = "http2")] +pub(crate) mod h2; + +/// An Incoming Message head. Includes request/status line, and headers. +#[derive(Debug, Default)] +pub(crate) struct MessageHead<S> { + /// HTTP version of the message. + pub(crate) version: http::Version, + /// Subject (request line or status line) of Incoming message. + pub(crate) subject: S, + /// Headers of the Incoming message. + pub(crate) headers: http::HeaderMap, + /// Extensions. + extensions: http::Extensions, +} + +/// An incoming request message. +#[cfg(feature = "http1")] +pub(crate) type RequestHead = MessageHead<RequestLine>; + +#[derive(Debug, Default, PartialEq)] +#[cfg(feature = "http1")] +pub(crate) struct RequestLine(pub(crate) http::Method, pub(crate) http::Uri); + +/// An incoming response message. +#[cfg(all(feature = "http1", feature = "client"))] +pub(crate) type ResponseHead = MessageHead<http::StatusCode>; + +#[derive(Debug)] +#[cfg(feature = "http1")] +pub(crate) enum BodyLength { + /// Content-Length + Known(u64), + /// Transfer-Encoding: chunked (if h1) + Unknown, +} + +/// Status of when a Disaptcher future completes. +pub(crate) enum Dispatched { + /// Dispatcher completely shutdown connection. + Shutdown, + /// Dispatcher has pending upgrade, and so did not shutdown. + #[cfg(feature = "http1")] + Upgrade(crate::upgrade::Pending), +} + +impl MessageHead<http::StatusCode> { + fn into_response<B>(self, body: B) -> http::Response<B> { + let mut res = http::Response::new(body); + *res.status_mut() = self.subject; + *res.headers_mut() = self.headers; + *res.version_mut() = self.version; + *res.extensions_mut() = self.extensions; + res + } +} |