//! Client implementation of the HTTP/2 protocol. //! //! # Getting started //! //! Running an HTTP/2 client requires the caller to establish the underlying //! connection as well as get the connection to a state that is ready to begin //! the HTTP/2 handshake. See [here](../index.html#handshake) for more //! details. //! //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades. //! //! Once a connection is obtained, it is passed to [`handshake`], which will //! begin the [HTTP/2 handshake]. This returns a future that completes once //! the handshake process is performed and HTTP/2 streams may be initialized. //! //! [`handshake`] uses default configuration values. There are a number of //! settings that can be changed by using [`Builder`] instead. //! //! Once the handshake future completes, the caller is provided with a //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`] //! instance is used to drive the connection (see [Managing the connection]). //! The [`SendRequest`] instance is used to initialize new streams (see [Making //! requests]). //! //! # Making requests //! //! Requests are made using the [`SendRequest`] handle provided by the handshake //! future. Once a request is submitted, an HTTP/2 stream is initialized and //! the request is sent to the server. //! //! A request body and request trailers are sent using [`SendRequest`] and the //! server's response is returned once the [`ResponseFuture`] future completes. //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream //! initialized by the sent request. //! //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2 //! stream can be created, i.e. as long as the current number of active streams //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the //! caller will be notified once an existing stream closes, freeing capacity for //! the caller. The caller should use [`SendRequest::poll_ready`] to check for //! capacity before sending a request to the server. //! //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user //! must not send a request if `poll_ready` does not return `Ready`. Attempting //! to do so will result in an [`Error`] being returned. //! //! # Managing the connection //! //! The [`Connection`] instance is used to manage connection state. The caller //! is required to call [`Connection::poll`] in order to advance state. //! [`SendRequest::send_request`] and other functions have no effect unless //! [`Connection::poll`] is called. //! //! The [`Connection`] instance should only be dropped once [`Connection::poll`] //! returns `Ready`. At this point, the underlying socket has been closed and no //! further work needs to be done. //! //! The easiest way to ensure that the [`Connection`] instance gets polled is to //! submit the [`Connection`] instance to an [executor]. The executor will then //! manage polling the connection until the connection is complete. //! Alternatively, the caller can call `poll` manually. //! //! # Example //! //! ```rust, no_run //! //! use h2::client; //! //! use http::{Request, Method}; //! use std::error::Error; //! use tokio::net::TcpStream; //! //! #[tokio::main] //! pub async fn main() -> Result<(), Box> { //! // Establish TCP connection to the server. //! let tcp = TcpStream::connect("127.0.0.1:5928").await?; //! let (h2, connection) = client::handshake(tcp).await?; //! tokio::spawn(async move { //! connection.await.unwrap(); //! }); //! //! let mut h2 = h2.ready().await?; //! // Prepare the HTTP request to send to the server. //! let request = Request::builder() //! .method(Method::GET) //! .uri("https://www.example.com/") //! .body(()) //! .unwrap(); //! //! // Send the request. The second tuple item allows the caller //! // to stream a request body. //! let (response, _) = h2.send_request(request, true).unwrap(); //! //! let (head, mut body) = response.await?.into_parts(); //! //! println!("Received response: {:?}", head); //! //! // The `flow_control` handle allows the caller to manage //! // flow control. //! // //! // Whenever data is received, the caller is responsible for //! // releasing capacity back to the server once it has freed //! // the data from memory. //! let mut flow_control = body.flow_control().clone(); //! //! while let Some(chunk) = body.data().await { //! let chunk = chunk?; //! println!("RX: {:?}", chunk); //! //! // Let the server send more data. //! let _ = flow_control.release_capacity(chunk.len()); //! } //! //! Ok(()) //! } //! ``` //! //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html //! [`handshake`]: fn.handshake.html //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html //! [`SendRequest`]: struct.SendRequest.html //! [`SendStream`]: ../struct.SendStream.html //! [Making requests]: #making-requests //! [Managing the connection]: #managing-the-connection //! [`Connection`]: struct.Connection.html //! [`Connection::poll`]: struct.Connection.html#method.poll //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues //! [`SendRequest`]: struct.SendRequest.html //! [`ResponseFuture`]: struct.ResponseFuture.html //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader //! [`Builder`]: struct.Builder.html //! [`Error`]: ../struct.Error.html use crate::codec::{Codec, SendError, UserError}; use crate::ext::Protocol; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::proto::{self, Error}; use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes}; use http::{uri, HeaderMap, Method, Request, Response, Version}; use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::usize; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing::Instrument; /// Initializes new HTTP/2 streams on a connection by sending a request. /// /// This type does no work itself. Instead, it is a handle to the inner /// connection state held by [`Connection`]. If the associated connection /// instance is dropped, all `SendRequest` functions will return [`Error`]. /// /// [`SendRequest`] instances are able to move to and operate on separate tasks /// / threads than their associated [`Connection`] instance. Internally, there /// is a buffer used to stage requests before they get written to the /// connection. There is no guarantee that requests get written to the /// connection in FIFO order as HTTP/2 prioritization logic can play a role. /// /// [`SendRequest`] implements [`Clone`], enabling the creation of many /// instances that are backed by a single connection. /// /// See [module] level documentation for more details. /// /// [module]: index.html /// [`Connection`]: struct.Connection.html /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html /// [`Error`]: ../struct.Error.html pub struct SendRequest { inner: proto::Streams, pending: Option, } /// Returns a `SendRequest` instance once it is ready to send at least one /// request. #[derive(Debug)] pub struct ReadySendRequest { inner: Option>, } /// Manages all state associated with an HTTP/2 client connection. /// /// A `Connection` is backed by an I/O resource (usually a TCP socket) and /// implements the HTTP/2 client logic for that connection. It is responsible /// for driving the internal state forward, performing the work requested of the /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`], /// [`RecvStream`]). /// /// `Connection` values are created by calling [`handshake`]. Once a /// `Connection` value is obtained, the caller must repeatedly call [`poll`] /// until `Ready` is returned. The easiest way to do this is to submit the /// `Connection` instance to an [executor]. /// /// [module]: index.html /// [`handshake`]: fn.handshake.html /// [`SendRequest`]: struct.SendRequest.html /// [`ResponseFuture`]: struct.ResponseFuture.html /// [`SendStream`]: ../struct.SendStream.html /// [`RecvStream`]: ../struct.RecvStream.html /// [`poll`]: #method.poll /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client; /// # use h2::client::*; /// # /// # async fn doc(my_io: T) -> Result<(), h2::Error> /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, /// # { /// let (send_request, connection) = client::handshake(my_io).await?; /// // Submit the connection handle to an executor. /// tokio::spawn(async { connection.await.expect("connection failed"); }); /// /// // Now, use `send_request` to initialize HTTP/2 streams. /// // ... /// # Ok(()) /// # } /// # /// # pub fn main() {} /// ``` #[must_use = "futures do nothing unless polled"] pub struct Connection { inner: proto::Connection, } /// A future of an HTTP response. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct ResponseFuture { inner: proto::OpaqueStreamRef, push_promise_consumed: bool, } /// A future of a pushed HTTP response. /// /// We have to differentiate between pushed and non pushed because of the spec /// /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream /// > that is in either the "open" or "half-closed (remote)" state. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct PushedResponseFuture { inner: ResponseFuture, } /// A pushed response and corresponding request headers #[derive(Debug)] pub struct PushPromise { /// The request headers request: Request<()>, /// The pushed response response: PushedResponseFuture, } /// A stream of pushed responses and corresponding promised requests #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct PushPromises { inner: proto::OpaqueStreamRef, } /// Builds client connections with custom configuration values. /// /// Methods can be chained in order to set the configuration values. /// /// The client is constructed by calling [`handshake`] and passing the I/O /// handle that will back the HTTP/2 server. /// /// New instances of `Builder` are obtained via [`Builder::new`]. /// /// See function level documentation for details on the various client /// configuration settings. /// /// [`Builder::new`]: struct.Builder.html#method.new /// [`handshake`]: struct.Builder.html#method.handshake /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .initial_window_size(1_000_000) /// .max_concurrent_streams(1000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` #[derive(Clone, Debug)] pub struct Builder { /// Time to keep locally reset streams around before reaping. reset_stream_duration: Duration, /// Initial maximum number of locally initiated (send) streams. /// After receiving a Settings frame from the remote peer, /// the connection will overwrite this value with the /// MAX_CONCURRENT_STREAMS specified in the frame. initial_max_send_streams: usize, /// Initial target window size for new connections. initial_target_connection_window_size: Option, /// Maximum amount of bytes to "buffer" for writing per stream. max_send_buffer_size: usize, /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, /// Initial `Settings` frame to send as part of the handshake. settings: Settings, /// The stream ID of the first (lowest) stream. Subsequent streams will use /// monotonically increasing stream IDs. stream_id: StreamId, } #[derive(Debug)] pub(crate) struct Peer; // ===== impl SendRequest ===== impl SendRequest where B: Buf + 'static, { /// Returns `Ready` when the connection can initialize a new HTTP/2 /// stream. /// /// This function must return `Ready` before `send_request` is called. When /// `Poll::Pending` is returned, the task will be notified once the readiness /// state changes. /// /// See [module] level docs for more details. /// /// [module]: index.html pub fn poll_ready(&mut self, cx: &mut Context) -> Poll> { ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; self.pending = None; Poll::Ready(Ok(())) } /// Consumes `self`, returning a future that returns `self` back once it is /// ready to send a request. /// /// This function should be called before calling `send_request`. /// /// This is a functional combinator for [`poll_ready`]. The returned future /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to /// the caller. /// /// # Examples /// /// ```rust /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request /// let mut send_request = send_request.ready().await.unwrap(); /// // Use `send_request` here. /// # } /// # pub fn main() {} /// ``` /// /// See [module] level docs for more details. /// /// [`poll_ready`]: #method.poll_ready /// [module]: index.html pub fn ready(self) -> ReadySendRequest { ReadySendRequest { inner: Some(self) } } /// Sends a HTTP/2 request to the server. /// /// `send_request` initializes a new HTTP/2 stream on the associated /// connection, then sends the given request using this new stream. Only the /// request head is sent. /// /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance /// are returned. The [`ResponseFuture`] instance is used to get the /// server's response and the [`SendStream`] instance is used to send a /// request body or trailers to the server over the same HTTP/2 stream. /// /// To send a request body or trailers, set `end_of_stream` to `false`. /// Then, use the returned [`SendStream`] instance to stream request body /// chunks or send trailers. If `end_of_stream` is **not** set to `false` /// then attempting to call [`SendStream::send_data`] or /// [`SendStream::send_trailers`] will result in an error. /// /// If no request body or trailers are to be sent, set `end_of_stream` to /// `true` and drop the returned [`SendStream`] instance. /// /// # A note on HTTP versions /// /// The provided `Request` will be encoded differently depending on the /// value of its version field. If the version is set to 2.0, then the /// request is encoded as per the specification recommends. /// /// If the version is set to a lower value, then the request is encoded to /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host /// headers are permitted and the `:authority` pseudo header is not /// included. /// /// The caller should always set the request's version field to 2.0 unless /// specifically transmitting an HTTP 1.1 request over 2.0. /// /// # Examples /// /// Sending a request with no body /// /// ```rust /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request /// let mut send_request = send_request.ready().await.unwrap(); /// // Prepare the HTTP request to send to the server. /// let request = Request::get("https://www.example.com/") /// .body(()) /// .unwrap(); /// /// // Send the request to the server. Since we are not sending a /// // body or trailers, we can drop the `SendStream` instance. /// let (response, _) = send_request.send_request(request, true).unwrap(); /// let response = response.await.unwrap(); /// // Process the response /// # } /// # pub fn main() {} /// ``` /// /// Sending a request with a body and trailers /// /// ```rust /// # use h2::client::*; /// # use http::*; /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request /// let mut send_request = send_request.ready().await.unwrap(); /// /// // Prepare the HTTP request to send to the server. /// let request = Request::get("https://www.example.com/") /// .body(()) /// .unwrap(); /// /// // Send the request to the server. If we are not sending a /// // body or trailers, we can drop the `SendStream` instance. /// let (response, mut send_stream) = send_request /// .send_request(request, false).unwrap(); /// /// // At this point, one option would be to wait for send capacity. /// // Doing so would allow us to not hold data in memory that /// // cannot be sent. However, this is not a requirement, so this /// // example will skip that step. See `SendStream` documentation /// // for more details. /// send_stream.send_data(b"hello", false).unwrap(); /// send_stream.send_data(b"world", false).unwrap(); /// /// // Send the trailers. /// let mut trailers = HeaderMap::new(); /// trailers.insert( /// header::HeaderName::from_bytes(b"my-trailer").unwrap(), /// header::HeaderValue::from_bytes(b"hello").unwrap()); /// /// send_stream.send_trailers(trailers).unwrap(); /// /// let response = response.await.unwrap(); /// // Process the response /// # } /// # pub fn main() {} /// ``` /// /// [`ResponseFuture`]: struct.ResponseFuture.html /// [`SendStream`]: ../struct.SendStream.html /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers pub fn send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream), crate::Error> { self.inner .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) .map(|stream| { if stream.is_pending_open() { self.pending = Some(stream.clone_to_opaque()); } let response = ResponseFuture { inner: stream.clone_to_opaque(), push_promise_consumed: false, }; let stream = SendStream::new(stream); (response, stream) }) } /// Returns whether the [extended CONNECT protocol][1] is enabled or not. /// /// This setting is configured by the server peer by sending the /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. /// This method returns the currently acknowledged value received from the /// remote. /// /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 pub fn is_extended_connect_protocol_enabled(&self) -> bool { self.inner.is_extended_connect_protocol_enabled() } } impl fmt::Debug for SendRequest where B: Buf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("SendRequest").finish() } } impl Clone for SendRequest where B: Buf, { fn clone(&self) -> Self { SendRequest { inner: self.inner.clone(), pending: None, } } } #[cfg(feature = "unstable")] impl SendRequest where B: Buf, { /// Returns the number of active streams. /// /// An active stream is a stream that has not yet transitioned to a closed /// state. pub fn num_active_streams(&self) -> usize { self.inner.num_active_streams() } /// Returns the number of streams that are held in memory. /// /// A wired stream is a stream that is either active or is closed but must /// stay in memory for some reason. For example, there are still outstanding /// userspace handles pointing to the slot. pub fn num_wired_streams(&self) -> usize { self.inner.num_wired_streams() } } // ===== impl ReadySendRequest ===== impl Future for ReadySendRequest where B: Buf + 'static, { type Output = Result, crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match &mut self.inner { Some(send_request) => { ready!(send_request.poll_ready(cx))?; } None => panic!("called `poll` after future completed"), } Poll::Ready(Ok(self.inner.take().unwrap())) } } // ===== impl Builder ===== impl Builder { /// Returns a new client builder instance initialized with default /// configuration values. /// /// Configuration methods can be chained on the return value. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .initial_window_size(1_000_000) /// .max_concurrent_streams(1000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn new() -> Builder { Builder { max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, initial_max_send_streams: usize::MAX, settings: Default::default(), stream_id: 1.into(), } } /// Indicates the initial window size (in octets) for stream-level /// flow control for received data. /// /// The initial window of a stream is used as part of flow control. For more /// details, see [`FlowControl`]. /// /// The default value is 65,535. /// /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .initial_window_size(1_000_000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn initial_window_size(&mut self, size: u32) -> &mut Self { self.settings.set_initial_window_size(Some(size)); self } /// Indicates the initial window size (in octets) for connection-level flow control /// for received data. /// /// The initial window of a connection is used as part of flow control. For more details, /// see [`FlowControl`]. /// /// The default value is 65,535. /// /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .initial_connection_window_size(1_000_000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { self.initial_target_connection_window_size = Some(size); self } /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the /// configured client is able to accept. /// /// The sender may send data frames that are **smaller** than this value, /// but any data larger than `max` will be broken up into multiple `DATA` /// frames. /// /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .max_frame_size(1_000_000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` /// /// # Panics /// /// This function panics if `max` is not within the legal range specified /// above. pub fn max_frame_size(&mut self, max: u32) -> &mut Self { self.settings.set_max_frame_size(Some(max)); self } /// Sets the max size of received header frames. /// /// This advisory setting informs a peer of the maximum size of header list /// that the sender is prepared to accept, in octets. The value is based on /// the uncompressed size of header fields, including the length of the name /// and value in octets plus an overhead of 32 octets for each header field. /// /// This setting is also used to limit the maximum amount of data that is /// buffered to decode HEADERS frames. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .max_header_list_size(16 * 1024) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { self.settings.set_max_header_list_size(Some(max)); self } /// Sets the maximum number of concurrent streams. /// /// The maximum concurrent streams setting only controls the maximum number /// of streams that can be initiated by the remote peer. In other words, /// when this setting is set to 100, this does not limit the number of /// concurrent streams that can be created by the caller. /// /// It is recommended that this value be no smaller than 100, so as to not /// unnecessarily limit parallelism. However, any value is legal, including /// 0. If `max` is set to 0, then the remote will not be permitted to /// initiate streams. /// /// Note that streams in the reserved state, i.e., push promises that have /// been reserved but the stream has not started, do not count against this /// setting. /// /// Also note that if the remote *does* exceed the value set here, it is not /// a protocol level error. Instead, the `h2` library will immediately reset /// the stream. /// /// See [Section 5.1.2] in the HTTP/2 spec for more details. /// /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .max_concurrent_streams(1000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { self.settings.set_max_concurrent_streams(Some(max)); self } /// Sets the initial maximum of locally initiated (send) streams. /// /// The initial settings will be overwritten by the remote peer when /// the Settings frame is received. The new value will be set to the /// `max_concurrent_streams()` from the frame. /// /// This setting prevents the caller from exceeding this number of /// streams that are counted towards the concurrency limit. /// /// Sending streams past the limit returned by the peer will be treated /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM. /// /// See [Section 5.1.2] in the HTTP/2 spec for more details. /// /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .initial_max_send_streams(1000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self { self.initial_max_send_streams = initial; self } /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset, the HTTP/2 specification requires /// that any further frames received for that stream must be ignored for /// "some time". /// /// In order to satisfy the specification, internal state must be maintained /// to implement the behavior. This state grows linearly with the number of /// streams that are locally reset. /// /// The `max_concurrent_reset_streams` setting configures sets an upper /// bound on the amount of state that is maintained. When this max value is /// reached, the oldest reset stream is purged from memory. /// /// Once the stream has been fully purged from memory, any additional frames /// received for that stream will result in a connection level protocol /// error, forcing the connection to terminate. /// /// The default value is 10. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .max_concurrent_reset_streams(1000) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { self.reset_stream_max = max; self } /// Sets the duration to remember locally reset streams. /// /// When a stream is explicitly reset, the HTTP/2 specification requires /// that any further frames received for that stream must be ignored for /// "some time". /// /// In order to satisfy the specification, internal state must be maintained /// to implement the behavior. This state grows linearly with the number of /// streams that are locally reset. /// /// The `reset_stream_duration` setting configures the max amount of time /// this state will be maintained in memory. Once the duration elapses, the /// stream state is purged from memory. /// /// Once the stream has been fully purged from memory, any additional frames /// received for that stream will result in a connection level protocol /// error, forcing the connection to terminate. /// /// The default value is 30 seconds. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use std::time::Duration; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .reset_stream_duration(Duration::from_secs(10)) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { self.reset_stream_duration = dur; self } /// Sets the maximum send buffer size per stream. /// /// Once a stream has buffered up to (or over) the maximum, the stream's /// flow control will not "poll" additional capacity. Once bytes for the /// stream have been written to the connection, the send buffer capacity /// will be freed up again. /// /// The default is currently ~400MB, but may change. /// /// # Panics /// /// This function panics if `max` is larger than `u32::MAX`. pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { assert!(max <= std::u32::MAX as usize); self.max_send_buffer_size = max; self } /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. When set, the /// server MUST NOT send a push promise. Setting this value to value to /// false in the initial SETTINGS handshake guarantees that the remote server /// will never send a push promise. /// /// This setting can be changed during the life of a single HTTP/2 /// connection by sending another settings frame updating the value. /// /// Default value: `true`. /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use std::time::Duration; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .enable_push(false) /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn enable_push(&mut self, enabled: bool) -> &mut Self { self.settings.set_enable_push(enabled); self } /// Sets the first stream ID to something other than 1. #[cfg(feature = "unstable")] pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { self.stream_id = stream_id.into(); assert!( self.stream_id.is_client_initiated(), "stream id must be odd" ); self } /// Creates a new configured HTTP/2 client backed by `io`. /// /// It is expected that `io` already be in an appropriate state to commence /// the [HTTP/2 handshake]. The handshake is completed once both the connection /// preface and the initial settings frame is sent by the client. /// /// The handshake future does not wait for the initial settings frame from the /// server. /// /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] /// tuple once the HTTP/2 handshake has been completed. /// /// This function also allows the caller to configure the send payload data /// type. See [Outbound data type] for more details. /// /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader /// [`Connection`]: struct.Connection.html /// [`SendRequest`]: struct.SendRequest.html /// [Outbound data type]: ../index.html#outbound-data-type. /// /// # Examples /// /// Basic usage: /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # use bytes::Bytes; /// # /// # async fn doc(my_io: T) /// -> Result<((SendRequest, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .handshake(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` /// /// Configures the send-payload data type. In this case, the outbound data /// type will be `&'static [u8]`. /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client::*; /// # /// # async fn doc(my_io: T) /// # -> Result<((SendRequest<&'static [u8]>, Connection)), h2::Error> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2 /// // handshake. /// let client_fut = Builder::new() /// .handshake::<_, &'static [u8]>(my_io); /// # client_fut.await /// # } /// # /// # pub fn main() {} /// ``` pub fn handshake( &self, io: T, ) -> impl Future, Connection), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static, { Connection::handshake2(io, self.clone()) } } impl Default for Builder { fn default() -> Builder { Builder::new() } } /// Creates a new configured HTTP/2 client with default configuration /// values backed by `io`. /// /// It is expected that `io` already be in an appropriate state to commence /// the [HTTP/2 handshake]. See [Handshake] for more details. /// /// Returns a future which resolves to the [`Connection`] / [`SendRequest`] /// tuple once the HTTP/2 handshake has been completed. The returned /// [`Connection`] instance will be using default configuration values. Use /// [`Builder`] to customize the configuration values used by a [`Connection`] /// instance. /// /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader /// [Handshake]: ../index.html#handshake /// [`Connection`]: struct.Connection.html /// [`SendRequest`]: struct.SendRequest.html /// /// # Examples /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # use h2::client; /// # use h2::client::*; /// # /// # async fn doc(my_io: T) -> Result<(), h2::Error> /// # { /// let (send_request, connection) = client::handshake(my_io).await?; /// // The HTTP/2 handshake has completed, now start polling /// // `connection` and use `send_request` to send requests to the /// // server. /// # Ok(()) /// # } /// # /// # pub fn main() {} /// ``` pub async fn handshake(io: T) -> Result<(SendRequest, Connection), crate::Error> where T: AsyncRead + AsyncWrite + Unpin, { let builder = Builder::new(); builder .handshake(io) .instrument(tracing::trace_span!("client_handshake")) .await } // ===== impl Connection ===== async fn bind_connection(io: &mut T) -> Result<(), crate::Error> where T: AsyncRead + AsyncWrite + Unpin, { tracing::debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; io.write_all(msg).await.map_err(crate::Error::from_io)?; tracing::debug!("client connection bound"); Ok(()) } impl Connection where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static, { async fn handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest, Connection), crate::Error> { bind_connection(&mut io).await?; // Create the codec let mut codec = Codec::new(io); if let Some(max) = builder.settings.max_frame_size() { codec.set_max_recv_frame_size(max as usize); } if let Some(max) = builder.settings.max_header_list_size() { codec.set_max_recv_header_list_size(max as usize); } // Send initial settings frame codec .buffer(builder.settings.clone().into()) .expect("invalid SETTINGS frame"); let inner = proto::Connection::new( codec, proto::Config { next_stream_id: builder.stream_id, initial_max_send_streams: builder.initial_max_send_streams, max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, settings: builder.settings.clone(), }, ); let send_request = SendRequest { inner: inner.streams().clone(), pending: None, }; let mut connection = Connection { inner }; if let Some(sz) = builder.initial_target_connection_window_size { connection.set_target_window_size(sz); } Ok((send_request, connection)) } /// Sets the target window size for the whole connection. /// /// If `size` is greater than the current value, then a `WINDOW_UPDATE` /// frame will be immediately sent to the remote, increasing the connection /// level window by `size - current_value`. /// /// If `size` is less than the current value, nothing will happen /// immediately. However, as window capacity is released by /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent /// out until the number of "in flight" bytes drops below `size`. /// /// The default value is 65,535. /// /// See [`FlowControl`] documentation for more details. /// /// [`FlowControl`]: ../struct.FlowControl.html /// [library level]: ../index.html#flow-control pub fn set_target_window_size(&mut self, size: u32) { assert!(size <= proto::MAX_WINDOW_SIZE); self.inner.set_target_window_size(size); } /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level /// flow control for received data. /// /// The `SETTINGS` will be sent to the remote, and only applied once the /// remote acknowledges the change. /// /// This can be used to increase or decrease the window size for existing /// streams. /// /// # Errors /// /// Returns an error if a previous call is still pending acknowledgement /// from the remote endpoint. pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { assert!(size <= proto::MAX_WINDOW_SIZE); self.inner.set_initial_window_size(size)?; Ok(()) } /// Takes a `PingPong` instance from the connection. /// /// # Note /// /// This may only be called once. Calling multiple times will return `None`. pub fn ping_pong(&mut self) -> Option { self.inner.take_user_pings().map(PingPong::new) } /// Returns the maximum number of concurrent streams that may be initiated /// by this client. /// /// This limit is configured by the server peer by sending the /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame. /// This method returns the currently acknowledged value received from the /// remote. /// /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 pub fn max_concurrent_send_streams(&self) -> usize { self.inner.max_send_streams() } /// Returns the maximum number of concurrent streams that may be initiated /// by the server on this connection. /// /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS` /// parameter][1] sent in a `SETTINGS` frame that has been /// acknowledged by the remote peer. The value to be sent is configured by /// the [`Builder::max_concurrent_streams`][2] method before handshaking /// with the remote peer. /// /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2 /// [2]: ../struct.Builder.html#method.max_concurrent_streams pub fn max_concurrent_recv_streams(&self) -> usize { self.inner.max_recv_streams() } } impl Future for Connection where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static, { type Output = Result<(), crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.inner.maybe_close_connection_if_no_streams(); self.inner.poll(cx).map_err(Into::into) } } impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite, T: fmt::Debug, B: fmt::Debug + Buf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt::Debug::fmt(&self.inner, fmt) } } // ===== impl ResponseFuture ===== impl Future for ResponseFuture { type Output = Result, crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); let body = RecvStream::new(FlowControl::new(self.inner.clone())); Poll::Ready(Ok(Response::from_parts(parts, body))) } } impl ResponseFuture { /// Returns the stream ID of the response stream. /// /// # Panics /// /// If the lock on the stream store has been poisoned. pub fn stream_id(&self) -> crate::StreamId { crate::StreamId::from_internal(self.inner.stream_id()) } /// Returns a stream of PushPromises /// /// # Panics /// /// If this method has been called before /// or the stream was itself was pushed pub fn push_promises(&mut self) -> PushPromises { if self.push_promise_consumed { panic!("Reference to push promises stream taken!"); } self.push_promise_consumed = true; PushPromises { inner: self.inner.clone(), } } } // ===== impl PushPromises ===== impl PushPromises { /// Get the next `PushPromise`. pub async fn push_promise(&mut self) -> Option> { futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await } #[doc(hidden)] pub fn poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll>> { match self.inner.poll_pushed(cx) { Poll::Ready(Some(Ok((request, response)))) => { let response = PushedResponseFuture { inner: ResponseFuture { inner: response, push_promise_consumed: false, }, }; Poll::Ready(Some(Ok(PushPromise { request, response }))) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } #[cfg(feature = "stream")] impl futures_core::Stream for PushPromises { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_push_promise(cx) } } // ===== impl PushPromise ===== impl PushPromise { /// Returns a reference to the push promise's request headers. pub fn request(&self) -> &Request<()> { &self.request } /// Returns a mutable reference to the push promise's request headers. pub fn request_mut(&mut self) -> &mut Request<()> { &mut self.request } /// Consumes `self`, returning the push promise's request headers and /// response future. pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) { (self.request, self.response) } } // ===== impl PushedResponseFuture ===== impl Future for PushedResponseFuture { type Output = Result, crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new(&mut self.inner).poll(cx) } } impl PushedResponseFuture { /// Returns the stream ID of the response stream. /// /// # Panics /// /// If the lock on the stream store has been poisoned. pub fn stream_id(&self) -> crate::StreamId { self.inner.stream_id() } } // ===== impl Peer ===== impl Peer { pub fn convert_send_message( id: StreamId, request: Request<()>, protocol: Option, end_of_stream: bool, ) -> Result { use http::request::Parts; let ( Parts { method, uri, headers, version, .. }, _, ) = request.into_parts(); let is_connect = method == Method::CONNECT; // Build the set pseudo header set. All requests will include `method` // and `path`. let mut pseudo = Pseudo::request(method, uri, protocol); if pseudo.scheme.is_none() { // If the scheme is not set, then there are a two options. // // 1) Authority is not set. In this case, a request was issued with // a relative URI. This is permitted **only** when forwarding // HTTP 1.x requests. If the HTTP version is set to 2.0, then // this is an error. // // 2) Authority is set, then the HTTP method *must* be CONNECT. // // It is not possible to have a scheme but not an authority set (the // `http` crate does not allow it). // if pseudo.authority.is_none() { if version == Version::HTTP_2 { return Err(UserError::MissingUriSchemeAndAuthority.into()); } else { // This is acceptable as per the above comment. However, // HTTP/2 requires that a scheme is set. Since we are // forwarding an HTTP 1.1 request, the scheme is set to // "http". pseudo.set_scheme(uri::Scheme::HTTP); } } else if !is_connect { // TODO: Error } } // Create the HEADERS frame let mut frame = Headers::new(id, pseudo, headers); if end_of_stream { frame.set_end_stream() } Ok(frame) } } impl proto::Peer for Peer { type Poll = Response<()>; const NAME: &'static str = "Client"; fn r#dyn() -> proto::DynPeer { proto::DynPeer::Client } fn is_server() -> bool { false } fn convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result { let mut b = Response::builder(); b = b.version(Version::HTTP_2); if let Some(status) = pseudo.status { b = b.status(status); } let mut response = match b.body(()) { Ok(response) => response, Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR)); } }; *response.headers_mut() = fields; Ok(response) } }