diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/h2/src/server.rs | 1552 |
1 files changed, 1552 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/server.rs b/third_party/rust/h2/src/server.rs new file mode 100644 index 0000000000..9f56f184a5 --- /dev/null +++ b/third_party/rust/h2/src/server.rs @@ -0,0 +1,1552 @@ +//! Server implementation of the HTTP/2 protocol. +//! +//! # Getting started +//! +//! Running an HTTP/2 server requires the caller to manage accepting the +//! connections as well as getting the connections to a state that is ready to +//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more +//! details. +//! +//! This could be as basic as using Tokio's [`TcpListener`] to accept +//! connections, but usually it means using either ALPN or HTTP/1.1 protocol +//! upgrades. +//! +//! Once a connection is obtained, it is passed to [`handshake`], +//! which will begin the [HTTP/2 handshake]. This returns a future that +//! completes once the handshake process is performed and HTTP/2 streams may +//! be received. +//! +//! [`handshake`] uses default configuration values. There are a number of +//! settings that can be changed by using [`Builder`] instead. +//! +//! # Inbound streams +//! +//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It +//! does this by implementing [`futures::Stream`]. When a new stream is +//! received, a call to [`Connection::accept`] will return `(request, response)`. +//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the +//! HTTP request head as well as provides a way to receive the inbound data +//! stream and the trailers. The `response` handle (of type [`SendResponse`]) +//! allows responding to the request, stream the response payload, send +//! trailers, and send push promises. +//! +//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream +//! can be operated independently. +//! +//! # Managing the connection +//! +//! The [`Connection`] instance is used to manage connection state. The caller +//! is required to call either [`Connection::accept`] or +//! [`Connection::poll_close`] in order to advance the connection state. Simply +//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the +//! connection state is advanced. +//! +//! It is not required to call **both** [`Connection::accept`] and +//! [`Connection::poll_close`]. If the caller is ready to accept a new stream, +//! then only [`Connection::accept`] should be called. When the caller **does +//! not** want to accept a new stream, [`Connection::poll_close`] should be +//! called. +//! +//! The [`Connection`] instance should only be dropped once +//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`] +//! returns `Ready(None)`, there will no longer be any more inbound streams. At +//! this point, only [`Connection::poll_close`] should be called. +//! +//! # Shutting down the server +//! +//! Graceful shutdown of the server is [not yet +//! implemented](https://github.com/hyperium/h2/issues/69). +//! +//! # Example +//! +//! A basic HTTP/2 server example that runs over TCP and assumes [prior +//! knowledge], i.e. both the client and the server assume that the TCP socket +//! will use the HTTP/2 protocol without prior negotiation. +//! +//! ```no_run +//! use h2::server; +//! use http::{Response, StatusCode}; +//! use tokio::net::TcpListener; +//! +//! #[tokio::main] +//! pub async fn main() { +//! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap(); +//! +//! // Accept all incoming TCP connections. +//! loop { +//! if let Ok((socket, _peer_addr)) = listener.accept().await { +//! // Spawn a new task to process each connection. +//! tokio::spawn(async { +//! // Start the HTTP/2 connection handshake +//! let mut h2 = server::handshake(socket).await.unwrap(); +//! // Accept all inbound HTTP/2 streams sent over the +//! // connection. +//! while let Some(request) = h2.accept().await { +//! let (request, mut respond) = request.unwrap(); +//! println!("Received request: {:?}", request); +//! +//! // Build a response with no body +//! let response = Response::builder() +//! .status(StatusCode::OK) +//! .body(()) +//! .unwrap(); +//! +//! // Send the response back to the client +//! respond.send_response(response, true) +//! .unwrap(); +//! } +//! +//! }); +//! } +//! } +//! } +//! ``` +//! +//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http +//! [`handshake`]: fn.handshake.html +//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader +//! [`Builder`]: struct.Builder.html +//! [`Connection`]: struct.Connection.html +//! [`Connection::poll`]: struct.Connection.html#method.poll +//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close +//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html +//! [`RecvStream`]: ../struct.RecvStream.html +//! [`SendStream`]: ../struct.SendStream.html +//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html + +use crate::codec::{Codec, UserError}; +use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; +use crate::proto::{self, Config, Error, Prioritized}; +use crate::{FlowControl, PingPong, RecvStream, SendStream}; + +use bytes::{Buf, Bytes}; +use http::{HeaderMap, Method, Request, Response}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; +use std::{fmt, io}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tracing::instrument::{Instrument, Instrumented}; + +/// In progress HTTP/2 connection handshake future. +/// +/// This type implements `Future`, yielding a `Connection` instance once the +/// handshake has completed. +/// +/// The handshake is completed once the connection preface is fully received +/// from the client **and** the initial settings frame is sent to the client. +/// +/// The handshake future does not wait for the initial settings frame from the +/// client. +/// +/// See [module] level docs for more details. +/// +/// [module]: index.html +#[must_use = "futures do nothing unless polled"] +pub struct Handshake<T, B: Buf = Bytes> { + /// The config to pass to Connection::new after handshake succeeds. + builder: Builder, + /// The current state of the handshake. + state: Handshaking<T, B>, + /// Span tracking the handshake + span: tracing::Span, +} + +/// Accepts inbound HTTP/2 streams on a connection. +/// +/// A `Connection` is backed by an I/O resource (usually a TCP socket) and +/// implements the HTTP/2 server logic for that connection. It is responsible +/// for receiving inbound streams initiated by the client as well as driving the +/// internal state forward. +/// +/// `Connection` values are created by calling [`handshake`]. Once a +/// `Connection` value is obtained, the caller must call [`poll`] or +/// [`poll_close`] in order to drive the internal connection state forward. +/// +/// See [module level] documentation for more details +/// +/// [module level]: index.html +/// [`handshake`]: struct.Connection.html#method.handshake +/// [`poll`]: struct.Connection.html#method.poll +/// [`poll_close`]: struct.Connection.html#method.poll_close +/// +/// # Examples +/// +/// ``` +/// # use tokio::io::{AsyncRead, AsyncWrite}; +/// # use h2::server; +/// # use h2::server::*; +/// # +/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) { +/// let mut server = server::handshake(my_io).await.unwrap(); +/// while let Some(request) = server.accept().await { +/// tokio::spawn(async move { +/// let (request, respond) = request.unwrap(); +/// // Process the request and send the response back to the client +/// // using `respond`. +/// }); +/// } +/// # } +/// # +/// # pub fn main() {} +/// ``` +#[must_use = "streams do nothing unless polled"] +pub struct Connection<T, B: Buf> { + connection: proto::Connection<T, Peer, B>, +} + +/// Builds server connections with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. +/// +/// The server is constructed by calling [`handshake`] and passing the I/O +/// handle that will back the HTTP/2 server. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various server +/// configuration settings. +/// +/// [`Builder::new`]: struct.Builder.html#method.new +/// [`handshake`]: struct.Builder.html#method.handshake +/// +/// # Examples +/// +/// ``` +/// # use tokio::io::{AsyncRead, AsyncWrite}; +/// # use h2::server::*; +/// # +/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) +/// # -> Handshake<T> +/// # { +/// // `server_fut` is a future representing the completion of the HTTP/2 +/// // handshake. +/// let server_fut = Builder::new() +/// .initial_window_size(1_000_000) +/// .max_concurrent_streams(1000) +/// .handshake(my_io); +/// # server_fut +/// # } +/// # +/// # pub fn main() {} +/// ``` +#[derive(Clone, Debug)] +pub struct Builder { + /// Time to keep locally reset streams around before reaping. + reset_stream_duration: Duration, + + /// Maximum number of locally reset streams to keep at a time. + reset_stream_max: usize, + + /// Initial `Settings` frame to send as part of the handshake. + settings: Settings, + + /// Initial target window size for new connections. + initial_target_connection_window_size: Option<u32>, + + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, +} + +/// Send a response back to the client +/// +/// A `SendResponse` instance is provided when receiving a request and is used +/// to send the associated response back to the client. It is also used to +/// explicitly reset the stream with a custom reason. +/// +/// It will also be used to initiate push promises linked with the associated +/// stream. +/// +/// If the `SendResponse` instance is dropped without sending a response, then +/// the HTTP/2 stream will be reset. +/// +/// See [module] level docs for more details. +/// +/// [module]: index.html +#[derive(Debug)] +pub struct SendResponse<B: Buf> { + inner: proto::StreamRef<B>, +} + +/// Send a response to a promised request +/// +/// A `SendPushedResponse` instance is provided when promising a request and is used +/// to send the associated response to the client. It is also used to +/// explicitly reset the stream with a custom reason. +/// +/// It can not be used to initiate push promises. +/// +/// If the `SendPushedResponse` instance is dropped without sending a response, then +/// the HTTP/2 stream will be reset. +/// +/// See [module] level docs for more details. +/// +/// [module]: index.html +pub struct SendPushedResponse<B: Buf> { + inner: SendResponse<B>, +} + +// Manual implementation necessary because of rust-lang/rust#26925 +impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SendPushedResponse {{ {:?} }}", self.inner) + } +} + +/// Stages of an in-progress handshake. +enum Handshaking<T, B: Buf> { + /// State 1. Connection is flushing pending SETTINGS frame. + Flushing(Instrumented<Flush<T, Prioritized<B>>>), + /// State 2. Connection is waiting for the client preface. + ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>), + /// State 3. Handshake is done, polling again would panic. + Done, +} + +/// Flush a Sink +struct Flush<T, B> { + codec: Option<Codec<T, B>>, +} + +/// Read the client connection preface +struct ReadPreface<T, B> { + codec: Option<Codec<T, B>>, + pos: usize, +} + +#[derive(Debug)] +pub(crate) struct Peer; + +const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// Creates a new configured HTTP/2 server with default configuration +/// values backed by `io`. +/// +/// It is expected that `io` already be in an appropriate state to commence +/// the [HTTP/2 handshake]. See [Handshake] for more details. +/// +/// Returns a future which resolves to the [`Connection`] instance once the +/// HTTP/2 handshake has been completed. The returned [`Connection`] +/// instance will be using default configuration values. Use [`Builder`] to +/// customize the configuration values used by a [`Connection`] instance. +/// +/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader +/// [Handshake]: ../index.html#handshake +/// [`Connection`]: struct.Connection.html +/// +/// # Examples +/// +/// ``` +/// # use tokio::io::{AsyncRead, AsyncWrite}; +/// # use h2::server; +/// # use h2::server::*; +/// # +/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) +/// # { +/// let connection = server::handshake(my_io).await.unwrap(); +/// // The HTTP/2 handshake has completed, now use `connection` to +/// // accept inbound HTTP/2 streams. +/// # } +/// # +/// # pub fn main() {} +/// ``` +pub fn handshake<T>(io: T) -> Handshake<T, Bytes> +where + T: AsyncRead + AsyncWrite + Unpin, +{ + Builder::new().handshake(io) +} + +// ===== impl Connection ===== + +impl<T, B> Connection<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf + 'static, +{ + fn handshake2(io: T, builder: Builder) -> Handshake<T, B> { + let span = tracing::trace_span!("server_handshake"); + let entered = span.enter(); + + // Create the codec. + let mut codec = Codec::new(io); + + if let Some(max) = builder.settings.max_frame_size() { + codec.set_max_recv_frame_size(max as usize); + } + + if let Some(max) = builder.settings.max_header_list_size() { + codec.set_max_recv_header_list_size(max as usize); + } + + // Send initial settings frame. + codec + .buffer(builder.settings.clone().into()) + .expect("invalid SETTINGS frame"); + + // Create the handshake future. + let state = + Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush"))); + + drop(entered); + + Handshake { + builder, + state, + span, + } + } + + /// Accept the next incoming request on this connection. + pub async fn accept( + &mut self, + ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> { + futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await + } + + #[doc(hidden)] + pub fn poll_accept( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> { + // Always try to advance the internal state. Getting Pending also is + // needed to allow this function to return Pending. + if let Poll::Ready(_) = self.poll_closed(cx)? { + // If the socket is closed, don't return anything + // TODO: drop any pending streams + return Poll::Ready(None); + } + + if let Some(inner) = self.connection.next_incoming() { + tracing::trace!("received incoming"); + let (head, _) = inner.take_request().into_parts(); + let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque())); + + let request = Request::from_parts(head, body); + let respond = SendResponse { inner }; + + return Poll::Ready(Some(Ok((request, respond)))); + } + + Poll::Pending + } + + /// Sets the target window size for the whole connection. + /// + /// If `size` is greater than the current value, then a `WINDOW_UPDATE` + /// frame will be immediately sent to the remote, increasing the connection + /// level window by `size - current_value`. + /// + /// If `size` is less than the current value, nothing will happen + /// immediately. However, as window capacity is released by + /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent + /// out until the number of "in flight" bytes drops below `size`. + /// + /// The default value is 65,535. + /// + /// See [`FlowControl`] documentation for more details. + /// + /// [`FlowControl`]: ../struct.FlowControl.html + /// [library level]: ../index.html#flow-control + pub fn set_target_window_size(&mut self, size: u32) { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.connection.set_target_window_size(size); + } + + /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level + /// flow control for received data. + /// + /// The `SETTINGS` will be sent to the remote, and only applied once the + /// remote acknowledges the change. + /// + /// This can be used to increase or decrease the window size for existing + /// streams. + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.connection.set_initial_window_size(size)?; + Ok(()) + } + + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> { + self.connection.set_enable_connect_protocol()?; + Ok(()) + } + + /// Returns `Ready` when the underlying connection has closed. + /// + /// If any new inbound streams are received during a call to `poll_closed`, + /// they will be queued and returned on the next call to [`poll_accept`]. + /// + /// This function will advance the internal connection state, driving + /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]). + /// + /// See [here](index.html#managing-the-connection) for more details. + /// + /// [`poll_accept`]: struct.Connection.html#method.poll_accept + /// [`RecvStream`]: ../struct.RecvStream.html + /// [`SendStream`]: ../struct.SendStream.html + pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { + self.connection.poll(cx).map_err(Into::into) + } + + #[doc(hidden)] + #[deprecated(note = "renamed to poll_closed")] + pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { + self.poll_closed(cx) + } + + /// Sets the connection to a GOAWAY state. + /// + /// Does not terminate the connection. Must continue being polled to close + /// connection. + /// + /// After flushing the GOAWAY frame, the connection is closed. Any + /// outstanding streams do not prevent the connection from closing. This + /// should usually be reserved for shutting down when something bad + /// external to `h2` has happened, and open streams cannot be properly + /// handled. + /// + /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown). + pub fn abrupt_shutdown(&mut self, reason: Reason) { + self.connection.go_away_from_user(reason); + } + + /// Starts a [graceful shutdown][1] process. + /// + /// Must continue being polled to close connection. + /// + /// It's possible to receive more requests after calling this method, since + /// they might have been in-flight from the client already. After about + /// 1 RTT, no new requests should be accepted. Once all active streams + /// have completed, the connection is closed. + /// + /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY + pub fn graceful_shutdown(&mut self) { + self.connection.go_away_gracefully(); + } + + /// Takes a `PingPong` instance from the connection. + /// + /// # Note + /// + /// This may only be called once. Calling multiple times will return `None`. + pub fn ping_pong(&mut self) -> Option<PingPong> { + self.connection.take_user_pings().map(PingPong::new) + } + + /// Returns the maximum number of concurrent streams that may be initiated + /// by the server on this connection. + /// + /// This limit is configured by the client peer by sending the + /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value received from the + /// remote. + /// + /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 + pub fn max_concurrent_send_streams(&self) -> usize { + self.connection.max_send_streams() + } + + /// Returns the maximum number of concurrent streams that may be initiated + /// by the client on this connection. + /// + /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS` + /// parameter][1] sent in a `SETTINGS` frame that has been + /// acknowledged by the remote peer. The value to be sent is configured by + /// the [`Builder::max_concurrent_streams`][2] method before handshaking + /// with the remote peer. + /// + /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 + /// [2]: ../struct.Builder.html#method.max_concurrent_streams + pub fn max_concurrent_recv_streams(&self) -> usize { + self.connection.max_recv_streams() + } +} + +#[cfg(feature = "stream")] +impl<T, B> futures_core::Stream for Connection<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf + 'static, +{ + type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.poll_accept(cx) + } +} + +impl<T, B> fmt::Debug for Connection<T, B> +where + T: fmt::Debug, + B: fmt::Debug + Buf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Connection") + .field("connection", &self.connection) + .finish() + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Returns a new server builder instance initialized with default + /// configuration values. + /// + /// Configuration methods can be chained on the return value. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .initial_window_size(1_000_000) + /// .max_concurrent_streams(1000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), + reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + settings: Settings::default(), + initial_target_connection_window_size: None, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, + } + } + + /// Indicates the initial window size (in octets) for stream-level + /// flow control for received data. + /// + /// The initial window of a stream is used as part of flow control. For more + /// details, see [`FlowControl`]. + /// + /// The default value is 65,535. + /// + /// [`FlowControl`]: ../struct.FlowControl.html + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .initial_window_size(1_000_000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn initial_window_size(&mut self, size: u32) -> &mut Self { + self.settings.set_initial_window_size(Some(size)); + self + } + + /// Indicates the initial window size (in octets) for connection-level flow control + /// for received data. + /// + /// The initial window of a connection is used as part of flow control. For more details, + /// see [`FlowControl`]. + /// + /// The default value is 65,535. + /// + /// [`FlowControl`]: ../struct.FlowControl.html + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .initial_connection_window_size(1_000_000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { + self.initial_target_connection_window_size = Some(size); + self + } + + /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the + /// configured server is able to accept. + /// + /// The sender may send data frames that are **smaller** than this value, + /// but any data larger than `max` will be broken up into multiple `DATA` + /// frames. + /// + /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_frame_size(1_000_000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if `max` is not within the legal range specified + /// above. + pub fn max_frame_size(&mut self, max: u32) -> &mut Self { + self.settings.set_max_frame_size(Some(max)); + self + } + + /// Sets the max size of received header frames. + /// + /// This advisory setting informs a peer of the maximum size of header list + /// that the sender is prepared to accept, in octets. The value is based on + /// the uncompressed size of header fields, including the length of the name + /// and value in octets plus an overhead of 32 octets for each header field. + /// + /// This setting is also used to limit the maximum amount of data that is + /// buffered to decode HEADERS frames. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_header_list_size(16 * 1024) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { + self.settings.set_max_header_list_size(Some(max)); + self + } + + /// Sets the maximum number of concurrent streams. + /// + /// The maximum concurrent streams setting only controls the maximum number + /// of streams that can be initiated by the remote peer. In other words, + /// when this setting is set to 100, this does not limit the number of + /// concurrent streams that can be created by the caller. + /// + /// It is recommended that this value be no smaller than 100, so as to not + /// unnecessarily limit parallelism. However, any value is legal, including + /// 0. If `max` is set to 0, then the remote will not be permitted to + /// initiate streams. + /// + /// Note that streams in the reserved state, i.e., push promises that have + /// been reserved but the stream has not started, do not count against this + /// setting. + /// + /// Also note that if the remote *does* exceed the value set here, it is not + /// a protocol level error. Instead, the `h2` library will immediately reset + /// the stream. + /// + /// See [Section 5.1.2] in the HTTP/2 spec for more details. + /// + /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_concurrent_streams(1000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { + self.settings.set_max_concurrent_streams(Some(max)); + self + } + + /// Sets the maximum number of concurrent locally reset streams. + /// + /// When a stream is explicitly reset by either calling + /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance + /// before completing the stream, the HTTP/2 specification requires that + /// any further frames received for that stream must be ignored for "some + /// time". + /// + /// In order to satisfy the specification, internal state must be maintained + /// to implement the behavior. This state grows linearly with the number of + /// streams that are locally reset. + /// + /// The `max_concurrent_reset_streams` setting configures sets an upper + /// bound on the amount of state that is maintained. When this max value is + /// reached, the oldest reset stream is purged from memory. + /// + /// Once the stream has been fully purged from memory, any additional frames + /// received for that stream will result in a connection level protocol + /// error, forcing the connection to terminate. + /// + /// The default value is 10. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_concurrent_reset_streams(1000) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.reset_stream_max = max; + self + } + + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + + /// Sets the maximum number of concurrent locally reset streams. + /// + /// When a stream is explicitly reset by either calling + /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance + /// before completing the stream, the HTTP/2 specification requires that + /// any further frames received for that stream must be ignored for "some + /// time". + /// + /// In order to satisfy the specification, internal state must be maintained + /// to implement the behavior. This state grows linearly with the number of + /// streams that are locally reset. + /// + /// The `reset_stream_duration` setting configures the max amount of time + /// this state will be maintained in memory. Once the duration elapses, the + /// stream state is purged from memory. + /// + /// Once the stream has been fully purged from memory, any additional frames + /// received for that stream will result in a connection level protocol + /// error, forcing the connection to terminate. + /// + /// The default value is 30 seconds. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # use std::time::Duration; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .reset_stream_duration(Duration::from_secs(10)) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { + self.reset_stream_duration = dur; + self + } + + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + pub fn enable_connect_protocol(&mut self) -> &mut Self { + self.settings.set_enable_connect_protocol(Some(1)); + self + } + + /// Creates a new configured HTTP/2 server backed by `io`. + /// + /// It is expected that `io` already be in an appropriate state to commence + /// the [HTTP/2 handshake]. See [Handshake] for more details. + /// + /// Returns a future which resolves to the [`Connection`] instance once the + /// HTTP/2 handshake has been completed. + /// + /// This function also allows the caller to configure the send payload data + /// type. See [Outbound data type] for more details. + /// + /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader + /// [Handshake]: ../index.html#handshake + /// [`Connection`]: struct.Connection.html + /// [Outbound data type]: ../index.html#outbound-data-type. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + /// + /// Configures the send-payload data type. In this case, the outbound data + /// type will be `&'static [u8]`. + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Handshake<T, &'static [u8]> + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut: Handshake<_, &'static [u8]> = Builder::new() + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B> + where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf + 'static, + { + Connection::handshake2(io, self.clone()) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder::new() + } +} + +// ===== impl SendResponse ===== + +impl<B: Buf> SendResponse<B> { + /// Send a response to a client request. + /// + /// On success, a [`SendStream`] instance is returned. This instance can be + /// used to stream the response body and send trailers. + /// + /// If a body or trailers will be sent on the returned [`SendStream`] + /// instance, then `end_of_stream` must be set to `false` when calling this + /// function. + /// + /// The [`SendResponse`] instance is already associated with a received + /// request. This function may only be called once per instance and only if + /// [`send_reset`] has not been previously called. + /// + /// [`SendResponse`]: # + /// [`SendStream`]: ../struct.SendStream.html + /// [`send_reset`]: #method.send_reset + pub fn send_response( + &mut self, + response: Response<()>, + end_of_stream: bool, + ) -> Result<SendStream<B>, crate::Error> { + self.inner + .send_response(response, end_of_stream) + .map(|_| SendStream::new(self.inner.clone())) + .map_err(Into::into) + } + + /// Push a request and response to the client + /// + /// On success, a [`SendResponse`] instance is returned. + /// + /// [`SendResponse`]: # + pub fn push_request( + &mut self, + request: Request<()>, + ) -> Result<SendPushedResponse<B>, crate::Error> { + self.inner + .send_push_promise(request) + .map(|inner| SendPushedResponse { + inner: SendResponse { inner }, + }) + .map_err(Into::into) + } + + /// Send a stream reset to the peer. + /// + /// This essentially cancels the stream, including any inbound or outbound + /// data streams. + /// + /// If this function is called before [`send_response`], a call to + /// [`send_response`] will result in an error. + /// + /// If this function is called while a [`SendStream`] instance is active, + /// any further use of the instance will result in an error. + /// + /// This function should only be called once. + /// + /// [`send_response`]: #method.send_response + /// [`SendStream`]: ../struct.SendStream.html + pub fn send_reset(&mut self, reason: Reason) { + self.inner.send_reset(reason) + } + + /// Polls to be notified when the client resets this stream. + /// + /// If stream is still open, this returns `Poll::Pending`, and + /// registers the task to be notified if a `RST_STREAM` is received. + /// + /// If a `RST_STREAM` frame is received for this stream, calling this + /// method will yield the `Reason` for the reset. + /// + /// # Error + /// + /// Calling this method after having called `send_response` will return + /// a user error. + pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { + self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders) + } + + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> crate::StreamId { + crate::StreamId::from_internal(self.inner.stream_id()) + } +} + +// ===== impl SendPushedResponse ===== + +impl<B: Buf> SendPushedResponse<B> { + /// Send a response to a promised request. + /// + /// On success, a [`SendStream`] instance is returned. This instance can be + /// used to stream the response body and send trailers. + /// + /// If a body or trailers will be sent on the returned [`SendStream`] + /// instance, then `end_of_stream` must be set to `false` when calling this + /// function. + /// + /// The [`SendPushedResponse`] instance is associated with a promised + /// request. This function may only be called once per instance and only if + /// [`send_reset`] has not been previously called. + /// + /// [`SendPushedResponse`]: # + /// [`SendStream`]: ../struct.SendStream.html + /// [`send_reset`]: #method.send_reset + pub fn send_response( + &mut self, + response: Response<()>, + end_of_stream: bool, + ) -> Result<SendStream<B>, crate::Error> { + self.inner.send_response(response, end_of_stream) + } + + /// Send a stream reset to the peer. + /// + /// This essentially cancels the stream, including any inbound or outbound + /// data streams. + /// + /// If this function is called before [`send_response`], a call to + /// [`send_response`] will result in an error. + /// + /// If this function is called while a [`SendStream`] instance is active, + /// any further use of the instance will result in an error. + /// + /// This function should only be called once. + /// + /// [`send_response`]: #method.send_response + /// [`SendStream`]: ../struct.SendStream.html + pub fn send_reset(&mut self, reason: Reason) { + self.inner.send_reset(reason) + } + + /// Polls to be notified when the client resets this stream. + /// + /// If stream is still open, this returns `Poll::Pending`, and + /// registers the task to be notified if a `RST_STREAM` is received. + /// + /// If a `RST_STREAM` frame is received for this stream, calling this + /// method will yield the `Reason` for the reset. + /// + /// # Error + /// + /// Calling this method after having called `send_response` will return + /// a user error. + pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { + self.inner.poll_reset(cx) + } + + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> crate::StreamId { + self.inner.stream_id() + } +} + +// ===== impl Flush ===== + +impl<T, B: Buf> Flush<T, B> { + fn new(codec: Codec<T, B>) -> Self { + Flush { codec: Some(codec) } + } +} + +impl<T, B> Future for Flush<T, B> +where + T: AsyncWrite + Unpin, + B: Buf, +{ + type Output = Result<Codec<T, B>, crate::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Flush the codec + ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?; + + // Return the codec + Poll::Ready(Ok(self.codec.take().unwrap())) + } +} + +impl<T, B: Buf> ReadPreface<T, B> { + fn new(codec: Codec<T, B>) -> Self { + ReadPreface { + codec: Some(codec), + pos: 0, + } + } + + fn inner_mut(&mut self) -> &mut T { + self.codec.as_mut().unwrap().get_mut() + } +} + +impl<T, B> Future for ReadPreface<T, B> +where + T: AsyncRead + Unpin, + B: Buf, +{ + type Output = Result<Codec<T, B>, crate::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut buf = [0; 24]; + let mut rem = PREFACE.len() - self.pos; + + while rem > 0 { + let mut buf = ReadBuf::new(&mut buf[..rem]); + ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf)) + .map_err(crate::Error::from_io)?; + let n = buf.filled().len(); + if n == 0 { + return Poll::Ready(Err(crate::Error::from_io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "connection closed before reading preface", + )))); + } + + if &PREFACE[self.pos..self.pos + n] != buf.filled() { + proto_err!(conn: "read_preface: invalid preface"); + // TODO: Should this just write the GO_AWAY frame directly? + return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into())); + } + + self.pos += n; + rem -= n; // TODO test + } + + Poll::Ready(Ok(self.codec.take().unwrap())) + } +} + +// ===== impl Handshake ===== + +impl<T, B: Buf> Future for Handshake<T, B> +where + T: AsyncRead + AsyncWrite + Unpin, + B: Buf + 'static, +{ + type Output = Result<Connection<T, B>, crate::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let span = self.span.clone(); // XXX(eliza): T_T + let _e = span.enter(); + tracing::trace!(state = ?self.state); + + loop { + match &mut self.state { + Handshaking::Flushing(flush) => { + // We're currently flushing a pending SETTINGS frame. Poll the + // flush future, and, if it's completed, advance our state to wait + // for the client preface. + let codec = match Pin::new(flush).poll(cx)? { + Poll::Pending => { + tracing::trace!(flush.poll = %"Pending"); + return Poll::Pending; + } + Poll::Ready(flushed) => { + tracing::trace!(flush.poll = %"Ready"); + flushed + } + }; + self.state = Handshaking::ReadingPreface( + ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")), + ); + } + Handshaking::ReadingPreface(read) => { + let codec = ready!(Pin::new(read).poll(cx)?); + + self.state = Handshaking::Done; + + let connection = proto::Connection::new( + codec, + Config { + next_stream_id: 2.into(), + // Server does not need to locally initiate any streams + initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }, + ); + + tracing::trace!("connection established!"); + let mut c = Connection { connection }; + if let Some(sz) = self.builder.initial_target_connection_window_size { + c.set_target_window_size(sz); + } + + return Poll::Ready(Ok(c)); + } + Handshaking::Done => { + panic!("Handshaking::poll() called again after handshaking was complete") + } + } + } + } +} + +impl<T, B> fmt::Debug for Handshake<T, B> +where + T: AsyncRead + AsyncWrite + fmt::Debug, + B: fmt::Debug + Buf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "server::Handshake") + } +} + +impl Peer { + pub fn convert_send_message( + id: StreamId, + response: Response<()>, + end_of_stream: bool, + ) -> frame::Headers { + use http::response::Parts; + + // Extract the components of the HTTP request + let ( + Parts { + status, headers, .. + }, + _, + ) = response.into_parts(); + + // Build the set pseudo header set. All requests will include `method` + // and `path`. + let pseudo = Pseudo::response(status); + + // Create the HEADERS frame + let mut frame = frame::Headers::new(id, pseudo, headers); + + if end_of_stream { + frame.set_end_stream() + } + + frame + } + + pub fn convert_push_message( + stream_id: StreamId, + promised_id: StreamId, + request: Request<()>, + ) -> Result<frame::PushPromise, UserError> { + use http::request::Parts; + + if let Err(e) = frame::PushPromise::validate_request(&request) { + use PushPromiseHeaderError::*; + match e { + NotSafeAndCacheable => tracing::debug!( + ?promised_id, + "convert_push_message: method {} is not safe and cacheable", + request.method(), + ), + InvalidContentLength(e) => tracing::debug!( + ?promised_id, + "convert_push_message; promised request has invalid content-length {:?}", + e, + ), + } + return Err(UserError::MalformedHeaders); + } + + // Extract the components of the HTTP request + let ( + Parts { + method, + uri, + headers, + .. + }, + _, + ) = request.into_parts(); + + let pseudo = Pseudo::request(method, uri, None); + + Ok(frame::PushPromise::new( + stream_id, + promised_id, + pseudo, + headers, + )) + } +} + +impl proto::Peer for Peer { + type Poll = Request<()>; + + const NAME: &'static str = "Server"; + + fn is_server() -> bool { + true + } + + fn r#dyn() -> proto::DynPeer { + proto::DynPeer::Server + } + + fn convert_poll_message( + pseudo: Pseudo, + fields: HeaderMap, + stream_id: StreamId, + ) -> Result<Self::Poll, Error> { + use http::{uri, Version}; + + let mut b = Request::builder(); + + macro_rules! malformed { + ($($arg:tt)*) => {{ + tracing::debug!($($arg)*); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); + }} + } + + b = b.version(Version::HTTP_2); + + let is_connect; + if let Some(method) = pseudo.method { + is_connect = method == Method::CONNECT; + b = b.method(method); + } else { + malformed!("malformed headers: missing method"); + } + + let has_protocol = pseudo.protocol.is_some(); + if !is_connect && has_protocol { + malformed!("malformed headers: :protocol on non-CONNECT request"); + } + + if pseudo.status.is_some() { + malformed!("malformed headers: :status field on request"); + } + + // Convert the URI + let mut parts = uri::Parts::default(); + + // A request translated from HTTP/1 must not include the :authority + // header + if let Some(authority) = pseudo.authority { + let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner()); + parts.authority = Some(maybe_authority.or_else(|why| { + malformed!( + "malformed headers: malformed authority ({:?}): {}", + authority, + why, + ) + })?); + } + + // A :scheme is required, except CONNECT. + if let Some(scheme) = pseudo.scheme { + if is_connect && !has_protocol { + malformed!(":scheme in CONNECT"); + } + let maybe_scheme = scheme.parse(); + let scheme = maybe_scheme.or_else(|why| { + malformed!( + "malformed headers: malformed scheme ({:?}): {}", + scheme, + why, + ) + })?; + + // It's not possible to build an `Uri` from a scheme and path. So, + // after validating is was a valid scheme, we just have to drop it + // if there isn't an :authority. + if parts.authority.is_some() { + parts.scheme = Some(scheme); + } + } else if !is_connect || has_protocol { + malformed!("malformed headers: missing scheme"); + } + + if let Some(path) = pseudo.path { + if is_connect && !has_protocol { + malformed!(":path in CONNECT"); + } + + // This cannot be empty + if path.is_empty() { + malformed!("malformed headers: missing path"); + } + + let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner()); + parts.path_and_query = Some(maybe_path.or_else(|why| { + malformed!("malformed headers: malformed path ({:?}): {}", path, why,) + })?); + } else if is_connect && has_protocol { + malformed!("malformed headers: missing path in extended CONNECT"); + } + + b = b.uri(parts); + + let mut request = match b.body(()) { + Ok(request) => request, + Err(e) => { + // TODO: Should there be more specialized handling for different + // kinds of errors + proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id); + return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); + } + }; + + *request.headers_mut() = fields; + + Ok(request) + } +} + +// ===== impl Handshaking ===== + +impl<T, B> fmt::Debug for Handshaking<T, B> +where + B: Buf, +{ + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Handshaking::Flushing(_) => f.write_str("Flushing(_)"), + Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"), + Handshaking::Done => f.write_str("Done"), + } + } +} |