diff options
Diffstat (limited to 'third_party/rust/hyper/src/client')
-rw-r--r-- | third_party/rust/hyper/src/client/client.rs | 1495 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/conn.rs | 1113 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/connect/dns.rs | 425 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/connect/http.rs | 1007 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/connect/mod.rs | 412 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/dispatch.rs | 436 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/mod.rs | 68 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/pool.rs | 1044 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/service.rs | 89 | ||||
-rw-r--r-- | third_party/rust/hyper/src/client/tests.rs | 286 |
10 files changed, 6375 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/client/client.rs b/third_party/rust/hyper/src/client/client.rs new file mode 100644 index 0000000000..4425e25899 --- /dev/null +++ b/third_party/rust/hyper/src/client/client.rs @@ -0,0 +1,1495 @@ +use std::error::Error as StdError; +use std::fmt; +use std::mem; +use std::time::Duration; + +use futures_channel::oneshot; +use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _}; +use http::header::{HeaderValue, HOST}; +use http::uri::{Port, Scheme}; +use http::{Method, Request, Response, Uri, Version}; +use tracing::{debug, trace, warn}; + +use super::conn; +use super::connect::{self, sealed::Connect, Alpn, Connected, Connection}; +use super::pool::{ + self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation, +}; +#[cfg(feature = "tcp")] +use super::HttpConnector; +use crate::body::{Body, HttpBody}; +use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll}; +use crate::rt::Executor; + +/// A Client to make outgoing HTTP requests. +/// +/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The +/// underlying connection pool will be reused. +#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] +pub struct Client<C, B = Body> { + config: Config, + conn_builder: conn::Builder, + connector: C, + pool: Pool<PoolClient<B>>, +} + +#[derive(Clone, Copy, Debug)] +struct Config { + retry_canceled_requests: bool, + set_host: bool, + ver: Ver, +} + +/// A `Future` that will resolve to an HTTP Response. +/// +/// This is returned by `Client::request` (and `Client::get`). +#[must_use = "futures do nothing unless polled"] +pub struct ResponseFuture { + inner: SyncWrapper<Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>>, +} + +// ===== impl Client ===== + +#[cfg(feature = "tcp")] +impl Client<HttpConnector, Body> { + /// Create a new Client with the default [config](Builder). + /// + /// # Note + /// + /// The default connector does **not** handle TLS. Speaking to `https` + /// destinations will require [configuring a connector that implements + /// TLS](https://hyper.rs/guides/client/configuration). + #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] + #[inline] + pub fn new() -> Client<HttpConnector, Body> { + Builder::default().build_http() + } +} + +#[cfg(feature = "tcp")] +impl Default for Client<HttpConnector, Body> { + fn default() -> Client<HttpConnector, Body> { + Client::new() + } +} + +impl Client<(), Body> { + /// Create a builder to configure a new `Client`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use std::time::Duration; + /// use hyper::Client; + /// + /// let client = Client::builder() + /// .pool_idle_timeout(Duration::from_secs(30)) + /// .http2_only(true) + /// .build_http(); + /// # let infer: Client<_, hyper::Body> = client; + /// # drop(infer); + /// # } + /// # fn main() {} + /// ``` + #[inline] + pub fn builder() -> Builder { + Builder::default() + } +} + +impl<C, B> Client<C, B> +where + C: Connect + Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + /// Send a `GET` request to the supplied `Uri`. + /// + /// # Note + /// + /// This requires that the `HttpBody` type have a `Default` implementation. + /// It *should* return an "empty" version of itself, such that + /// `HttpBody::is_end_stream` is `true`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use hyper::{Client, Uri}; + /// + /// let client = Client::new(); + /// + /// let future = client.get(Uri::from_static("http://httpbin.org/ip")); + /// # } + /// # fn main() {} + /// ``` + pub fn get(&self, uri: Uri) -> ResponseFuture + where + B: Default, + { + let body = B::default(); + if !body.is_end_stream() { + warn!("default HttpBody used for get() does not return true for is_end_stream"); + } + + let mut req = Request::new(body); + *req.uri_mut() = uri; + self.request(req) + } + + /// Send a constructed `Request` using this `Client`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use hyper::{Body, Method, Client, Request}; + /// + /// let client = Client::new(); + /// + /// let req = Request::builder() + /// .method(Method::POST) + /// .uri("http://httpbin.org/post") + /// .body(Body::from("Hallo!")) + /// .expect("request builder"); + /// + /// let future = client.request(req); + /// # } + /// # fn main() {} + /// ``` + pub fn request(&self, mut req: Request<B>) -> ResponseFuture { + let is_http_connect = req.method() == Method::CONNECT; + match req.version() { + Version::HTTP_11 => (), + Version::HTTP_10 => { + if is_http_connect { + warn!("CONNECT is not allowed for HTTP/1.0"); + return ResponseFuture::new(future::err( + crate::Error::new_user_unsupported_request_method(), + )); + } + } + Version::HTTP_2 => (), + // completely unsupported HTTP version (like HTTP/0.9)! + other => return ResponseFuture::error_version(other), + }; + + let pool_key = match extract_domain(req.uri_mut(), is_http_connect) { + Ok(s) => s, + Err(err) => { + return ResponseFuture::new(future::err(err)); + } + }; + + ResponseFuture::new(self.clone().retryably_send_request(req, pool_key)) + } + + async fn retryably_send_request( + self, + mut req: Request<B>, + pool_key: PoolKey, + ) -> crate::Result<Response<Body>> { + let uri = req.uri().clone(); + + loop { + req = match self.send_request(req, pool_key.clone()).await { + Ok(resp) => return Ok(resp), + Err(ClientError::Normal(err)) => return Err(err), + Err(ClientError::Canceled { + connection_reused, + mut req, + reason, + }) => { + if !self.config.retry_canceled_requests || !connection_reused { + // if client disabled, don't retry + // a fresh connection means we definitely can't retry + return Err(reason); + } + + trace!( + "unstarted request canceled, trying again (reason={:?})", + reason + ); + *req.uri_mut() = uri.clone(); + req + } + } + } + } + + async fn send_request( + &self, + mut req: Request<B>, + pool_key: PoolKey, + ) -> Result<Response<Body>, ClientError<B>> { + let mut pooled = match self.connection_for(pool_key).await { + Ok(pooled) => pooled, + Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)), + Err(ClientConnectError::H2CheckoutIsClosed(reason)) => { + return Err(ClientError::Canceled { + connection_reused: true, + req, + reason, + }) + } + }; + + if pooled.is_http1() { + if req.version() == Version::HTTP_2 { + warn!("Connection is HTTP/1, but request requires HTTP/2"); + return Err(ClientError::Normal( + crate::Error::new_user_unsupported_version(), + )); + } + + if self.config.set_host { + let uri = req.uri().clone(); + req.headers_mut().entry(HOST).or_insert_with(|| { + let hostname = uri.host().expect("authority implies host"); + if let Some(port) = get_non_default_port(&uri) { + let s = format!("{}:{}", hostname, port); + HeaderValue::from_str(&s) + } else { + HeaderValue::from_str(hostname) + } + .expect("uri host is valid header value") + }); + } + + // CONNECT always sends authority-form, so check it first... + if req.method() == Method::CONNECT { + authority_form(req.uri_mut()); + } else if pooled.conn_info.is_proxied { + absolute_form(req.uri_mut()); + } else { + origin_form(req.uri_mut()); + } + } else if req.method() == Method::CONNECT { + authority_form(req.uri_mut()); + } + + let fut = pooled + .send_request_retryable(req) + .map_err(ClientError::map_with_reused(pooled.is_reused())); + + // If the Connector included 'extra' info, add to Response... + let extra_info = pooled.conn_info.extra.clone(); + let fut = fut.map_ok(move |mut res| { + if let Some(extra) = extra_info { + extra.set(res.extensions_mut()); + } + res + }); + + // As of futures@0.1.21, there is a race condition in the mpsc + // channel, such that sending when the receiver is closing can + // result in the message being stuck inside the queue. It won't + // ever notify until the Sender side is dropped. + // + // To counteract this, we must check if our senders 'want' channel + // has been closed after having tried to send. If so, error out... + if pooled.is_closed() { + return fut.await; + } + + let mut res = fut.await?; + + // If pooled is HTTP/2, we can toss this reference immediately. + // + // when pooled is dropped, it will try to insert back into the + // pool. To delay that, spawn a future that completes once the + // sender is ready again. + // + // This *should* only be once the related `Connection` has polled + // for a new request to start. + // + // It won't be ready if there is a body to stream. + if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { + drop(pooled); + } else if !res.body().is_end_stream() { + let (delayed_tx, delayed_rx) = oneshot::channel(); + res.body_mut().delayed_eof(delayed_rx); + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + drop(delayed_tx); + }); + + self.conn_builder.exec.execute(on_idle); + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); + + self.conn_builder.exec.execute(on_idle); + } + + Ok(res) + } + + async fn connection_for( + &self, + pool_key: PoolKey, + ) -> Result<Pooled<PoolClient<B>>, ClientConnectError> { + // This actually races 2 different futures to try to get a ready + // connection the fastest, and to reduce connection churn. + // + // - If the pool has an idle connection waiting, that's used + // immediately. + // - Otherwise, the Connector is asked to start connecting to + // the destination Uri. + // - Meanwhile, the pool Checkout is watching to see if any other + // request finishes and tries to insert an idle connection. + // - If a new connection is started, but the Checkout wins after + // (an idle connection became available first), the started + // connection future is spawned into the runtime to complete, + // and then be inserted into the pool as an idle connection. + let checkout = self.pool.checkout(pool_key.clone()); + let connect = self.connect_to(pool_key); + let is_ver_h2 = self.config.ver == Ver::Http2; + + // The order of the `select` is depended on below... + + match future::select(checkout, connect).await { + // Checkout won, connect future may have been started or not. + // + // If it has, let it finish and insert back into the pool, + // so as to not waste the socket... + Either::Left((Ok(checked_out), connecting)) => { + // This depends on the `select` above having the correct + // order, such that if the checkout future were ready + // immediately, the connect future will never have been + // started. + // + // If it *wasn't* ready yet, then the connect future will + // have been started... + if connecting.started() { + let bg = connecting + .map_err(|err| { + trace!("background connect error: {}", err); + }) + .map(|_pooled| { + // dropping here should just place it in + // the Pool for us... + }); + // An execute error here isn't important, we're just trying + // to prevent a waste of a socket... + self.conn_builder.exec.execute(bg); + } + Ok(checked_out) + } + // Connect won, checkout can just be dropped. + Either::Right((Ok(connected), _checkout)) => Ok(connected), + // Either checkout or connect could get canceled: + // + // 1. Connect is canceled if this is HTTP/2 and there is + // an outstanding HTTP/2 connecting task. + // 2. Checkout is canceled if the pool cannot deliver an + // idle connection reliably. + // + // In both cases, we should just wait for the other future. + Either::Left((Err(err), connecting)) => { + if err.is_canceled() { + connecting.await.map_err(ClientConnectError::Normal) + } else { + Err(ClientConnectError::Normal(err)) + } + } + Either::Right((Err(err), checkout)) => { + if err.is_canceled() { + checkout.await.map_err(move |err| { + if is_ver_h2 + && err.is_canceled() + && err.find_source::<CheckoutIsClosedError>().is_some() + { + ClientConnectError::H2CheckoutIsClosed(err) + } else { + ClientConnectError::Normal(err) + } + }) + } else { + Err(ClientConnectError::Normal(err)) + } + } + } + } + + fn connect_to( + &self, + pool_key: PoolKey, + ) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin { + let executor = self.conn_builder.exec.clone(); + let pool = self.pool.clone(); + #[cfg(not(feature = "http2"))] + let conn_builder = self.conn_builder.clone(); + #[cfg(feature = "http2")] + let mut conn_builder = self.conn_builder.clone(); + let ver = self.config.ver; + let is_ver_h2 = ver == Ver::Http2; + let connector = self.connector.clone(); + let dst = domain_as_uri(pool_key.clone()); + hyper_lazy(move || { + // Try to take a "connecting lock". + // + // If the pool_key is for HTTP/2, and there is already a + // connection being established, then this can't take a + // second lock. The "connect_to" future is Canceled. + let connecting = match pool.connecting(&pool_key, ver) { + Some(lock) => lock, + None => { + let canceled = + crate::Error::new_canceled().with("HTTP/2 connection in progress"); + return Either::Right(future::err(canceled)); + } + }; + Either::Left( + connector + .connect(connect::sealed::Internal, dst) + .map_err(crate::Error::new_connect) + .and_then(move |io| { + let connected = io.connected(); + // If ALPN is h2 and we aren't http2_only already, + // then we need to convert our pool checkout into + // a single HTTP2 one. + let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { + match connecting.alpn_h2(&pool) { + Some(lock) => { + trace!("ALPN negotiated h2, updating pool"); + lock + } + None => { + // Another connection has already upgraded, + // the pool checkout should finish up for us. + let canceled = crate::Error::new_canceled() + .with("ALPN upgraded to HTTP/2"); + return Either::Right(future::err(canceled)); + } + } + } else { + connecting + }; + + #[cfg_attr(not(feature = "http2"), allow(unused))] + let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; + #[cfg(feature = "http2")] + { + conn_builder.http2_only(is_h2); + } + + Either::Left(Box::pin(async move { + let (tx, conn) = conn_builder.handshake(io).await?; + + trace!("handshake complete, spawning background dispatcher task"); + executor.execute( + conn.map_err(|e| debug!("client connection error: {}", e)) + .map(|_| ()), + ); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + let tx = tx.when_ready().await?; + + let tx = { + #[cfg(feature = "http2")] + { + if is_h2 { + PoolTx::Http2(tx.into_http2()) + } else { + PoolTx::Http1(tx) + } + } + #[cfg(not(feature = "http2"))] + PoolTx::Http1(tx) + }; + + Ok(pool.pooled( + connecting, + PoolClient { + conn_info: connected, + tx, + }, + )) + })) + }), + ) + }) + } +} + +impl<C, B> tower_service::Service<Request<B>> for Client<C, B> +where + C: Connect + Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Response = Response<Body>; + type Error = crate::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<B>) -> Self::Future { + self.request(req) + } +} + +impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B> +where + C: Connect + Clone + Send + Sync + 'static, + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Response = Response<Body>; + type Error = crate::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request<B>) -> Self::Future { + self.request(req) + } +} + +impl<C: Clone, B> Clone for Client<C, B> { + fn clone(&self) -> Client<C, B> { + Client { + config: self.config.clone(), + conn_builder: self.conn_builder.clone(), + connector: self.connector.clone(), + pool: self.pool.clone(), + } + } +} + +impl<C, B> fmt::Debug for Client<C, B> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Client").finish() + } +} + +// ===== impl ResponseFuture ===== + +impl ResponseFuture { + fn new<F>(value: F) -> Self + where + F: Future<Output = crate::Result<Response<Body>>> + Send + 'static, + { + Self { + inner: SyncWrapper::new(Box::pin(value)) + } + } + + fn error_version(ver: Version) -> Self { + warn!("Request has unsupported version \"{:?}\"", ver); + ResponseFuture::new(Box::pin(future::err( + crate::Error::new_user_unsupported_version(), + ))) + } +} + +impl fmt::Debug for ResponseFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Future<Response>") + } +} + +impl Future for ResponseFuture { + type Output = crate::Result<Response<Body>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.inner.get_mut().as_mut().poll(cx) + } +} + +// ===== impl PoolClient ===== + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +struct PoolClient<B> { + conn_info: Connected, + tx: PoolTx<B>, +} + +enum PoolTx<B> { + Http1(conn::SendRequest<B>), + #[cfg(feature = "http2")] + Http2(conn::Http2SendRequest<B>), +} + +impl<B> PoolClient<B> { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + match self.tx { + PoolTx::Http1(ref mut tx) => tx.poll_ready(cx), + #[cfg(feature = "http2")] + PoolTx::Http2(_) => Poll::Ready(Ok(())), + } + } + + fn is_http1(&self) -> bool { + !self.is_http2() + } + + fn is_http2(&self) -> bool { + match self.tx { + PoolTx::Http1(_) => false, + #[cfg(feature = "http2")] + PoolTx::Http2(_) => true, + } + } + + fn is_ready(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_ready(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_ready(), + } + } + + fn is_closed(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_closed(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_closed(), + } + } +} + +impl<B: HttpBody + 'static> PoolClient<B> { + fn send_request_retryable( + &mut self, + req: Request<B>, + ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + where + B: Send, + { + match self.tx { + #[cfg(not(feature = "http2"))] + PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req), + #[cfg(feature = "http2")] + PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)), + #[cfg(feature = "http2")] + PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)), + } + } +} + +impl<B> Poolable for PoolClient<B> +where + B: Send + 'static, +{ + fn is_open(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_ready(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_ready(), + } + } + + fn reserve(self) -> Reservation<Self> { + match self.tx { + PoolTx::Http1(tx) => Reservation::Unique(PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http1(tx), + }), + #[cfg(feature = "http2")] + PoolTx::Http2(tx) => { + let b = PoolClient { + conn_info: self.conn_info.clone(), + tx: PoolTx::Http2(tx.clone()), + }; + let a = PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http2(tx), + }; + Reservation::Shared(a, b) + } + } + } + + fn can_share(&self) -> bool { + self.is_http2() + } +} + +// ===== impl ClientError ===== + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +enum ClientError<B> { + Normal(crate::Error), + Canceled { + connection_reused: bool, + req: Request<B>, + reason: crate::Error, + }, +} + +impl<B> ClientError<B> { + fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self { + move |(err, orig_req)| { + if let Some(req) = orig_req { + ClientError::Canceled { + connection_reused: conn_reused, + reason: err, + req, + } + } else { + ClientError::Normal(err) + } + } + } +} + +enum ClientConnectError { + Normal(crate::Error), + H2CheckoutIsClosed(crate::Error), +} + +/// A marker to identify what version a pooled connection is. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub(super) enum Ver { + Auto, + Http2, +} + +fn origin_form(uri: &mut Uri) { + let path = match uri.path_and_query() { + Some(path) if path.as_str() != "/" => { + let mut parts = ::http::uri::Parts::default(); + parts.path_and_query = Some(path.clone()); + Uri::from_parts(parts).expect("path is valid uri") + } + _none_or_just_slash => { + debug_assert!(Uri::default() == "/"); + Uri::default() + } + }; + *uri = path +} + +fn absolute_form(uri: &mut Uri) { + debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme"); + debug_assert!( + uri.authority().is_some(), + "absolute_form needs an authority" + ); + // If the URI is to HTTPS, and the connector claimed to be a proxy, + // then it *should* have tunneled, and so we don't want to send + // absolute-form in that case. + if uri.scheme() == Some(&Scheme::HTTPS) { + origin_form(uri); + } +} + +fn authority_form(uri: &mut Uri) { + if let Some(path) = uri.path_and_query() { + // `https://hyper.rs` would parse with `/` path, don't + // annoy people about that... + if path != "/" { + warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path); + } + } + *uri = match uri.authority() { + Some(auth) => { + let mut parts = ::http::uri::Parts::default(); + parts.authority = Some(auth.clone()); + Uri::from_parts(parts).expect("authority is valid") + } + None => { + unreachable!("authority_form with relative uri"); + } + }; +} + +fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<PoolKey> { + let uri_clone = uri.clone(); + match (uri_clone.scheme(), uri_clone.authority()) { + (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())), + (None, Some(auth)) if is_http_connect => { + let scheme = match auth.port_u16() { + Some(443) => { + set_scheme(uri, Scheme::HTTPS); + Scheme::HTTPS + } + _ => { + set_scheme(uri, Scheme::HTTP); + Scheme::HTTP + } + }; + Ok((scheme, auth.clone())) + } + _ => { + debug!("Client requires absolute-form URIs, received: {:?}", uri); + Err(crate::Error::new_user_absolute_uri_required()) + } + } +} + +fn domain_as_uri((scheme, auth): PoolKey) -> Uri { + http::uri::Builder::new() + .scheme(scheme) + .authority(auth) + .path_and_query("/") + .build() + .expect("domain is valid Uri") +} + +fn set_scheme(uri: &mut Uri, scheme: Scheme) { + debug_assert!( + uri.scheme().is_none(), + "set_scheme expects no existing scheme" + ); + let old = mem::replace(uri, Uri::default()); + let mut parts: ::http::uri::Parts = old.into(); + parts.scheme = Some(scheme); + parts.path_and_query = Some("/".parse().expect("slash is a valid path")); + *uri = Uri::from_parts(parts).expect("scheme is valid"); +} + +fn get_non_default_port(uri: &Uri) -> Option<Port<&str>> { + match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) { + (Some(443), true) => None, + (Some(80), false) => None, + _ => uri.port(), + } +} + +fn is_schema_secure(uri: &Uri) -> bool { + uri.scheme_str() + .map(|scheme_str| matches!(scheme_str, "wss" | "https")) + .unwrap_or_default() +} + +/// A builder to configure a new [`Client`](Client). +/// +/// # Example +/// +/// ``` +/// # #[cfg(feature = "runtime")] +/// # fn run () { +/// use std::time::Duration; +/// use hyper::Client; +/// +/// let client = Client::builder() +/// .pool_idle_timeout(Duration::from_secs(30)) +/// .http2_only(true) +/// .build_http(); +/// # let infer: Client<_, hyper::Body> = client; +/// # drop(infer); +/// # } +/// # fn main() {} +/// ``` +#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] +#[derive(Clone)] +pub struct Builder { + client_config: Config, + conn_builder: conn::Builder, + pool_config: pool::Config, +} + +impl Default for Builder { + fn default() -> Self { + Self { + client_config: Config { + retry_canceled_requests: true, + set_host: true, + ver: Ver::Auto, + }, + conn_builder: conn::Builder::new(), + pool_config: pool::Config { + idle_timeout: Some(Duration::from_secs(90)), + max_idle_per_host: std::usize::MAX, + }, + } + } +} + +impl Builder { + #[doc(hidden)] + #[deprecated( + note = "name is confusing, to disable the connection pool, call pool_max_idle_per_host(0)" + )] + pub fn keep_alive(&mut self, val: bool) -> &mut Self { + if !val { + // disable + self.pool_max_idle_per_host(0) + } else if self.pool_config.max_idle_per_host == 0 { + // enable + self.pool_max_idle_per_host(std::usize::MAX) + } else { + // already enabled + self + } + } + + #[doc(hidden)] + #[deprecated(note = "renamed to `pool_idle_timeout`")] + pub fn keep_alive_timeout<D>(&mut self, val: D) -> &mut Self + where + D: Into<Option<Duration>>, + { + self.pool_idle_timeout(val) + } + + /// Set an optional timeout for idle sockets being kept-alive. + /// + /// Pass `None` to disable timeout. + /// + /// Default is 90 seconds. + pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self + where + D: Into<Option<Duration>>, + { + self.pool_config.idle_timeout = val.into(); + self + } + + #[doc(hidden)] + #[deprecated(note = "renamed to `pool_max_idle_per_host`")] + pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { + self.pool_config.max_idle_per_host = max_idle; + self + } + + /// Sets the maximum idle connection per host allowed in the pool. + /// + /// Default is `usize::MAX` (no limit). + pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { + self.pool_config.max_idle_per_host = max_idle; + self + } + + // HTTP/1 options + + /// Sets the exact size of the read buffer to *always* use. + /// + /// Note that setting this option unsets the `http1_max_buf_size` option. + /// + /// Default is an adaptive read buffer. + pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self { + self.conn_builder.http1_read_buf_exact_size(Some(sz)); + self + } + + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + /// + /// Note that setting this option unsets the `http1_read_exact_buf_size` option. + /// + /// # Panics + /// + /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. + #[cfg(feature = "http1")] + #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] + pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { + self.conn_builder.http1_max_buf_size(max); + self + } + + /// Set whether HTTP/1 connections will accept spaces between header names + /// and the colon that follow them in responses. + /// + /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when + /// parsing. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > No whitespace is allowed between the header field-name and colon. In + /// > the past, differences in the handling of such whitespace have led to + /// > security vulnerabilities in request routing and response handling. A + /// > server MUST reject any received request message that contains + /// > whitespace between a header field-name and colon with a response code + /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a + /// > response message before forwarding the message downstream. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self { + self.conn_builder + .http1_allow_spaces_after_header_name_in_responses(val); + self + } + + /// Set whether HTTP/1 connections will accept obsolete line folding for + /// header values. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > A server that receives an obs-fold in a request message that is not + /// > within a message/http container MUST either reject the message by + /// > sending a 400 (Bad Request), preferably with a representation + /// > explaining that obsolete line folding is unacceptable, or replace + /// > each received obs-fold with one or more SP octets prior to + /// > interpreting the field value or forwarding the message downstream. + /// + /// > A proxy or gateway that receives an obs-fold in a response message + /// > that is not within a message/http container MUST either discard the + /// > message and replace it with a 502 (Bad Gateway) response, preferably + /// > with a representation explaining that unacceptable line folding was + /// > received, or replace each received obs-fold with one or more SP + /// > octets prior to interpreting the field value or forwarding the + /// > message downstream. + /// + /// > A user agent that receives an obs-fold in a response message that is + /// > not within a message/http container MUST replace each received + /// > obs-fold with one or more SP octets prior to interpreting the field + /// > value. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self { + self.conn_builder + .http1_allow_obsolete_multiline_headers_in_responses(val); + self + } + + /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses. + /// + /// This mimicks the behaviour of major browsers. You probably don't want this. + /// You should only want this if you are implementing a proxy whose main + /// purpose is to sit in front of browsers whose users access arbitrary content + /// which may be malformed, and they expect everything that works without + /// the proxy to keep working with the proxy. + /// + /// This option will prevent Hyper's client from returning an error encountered + /// when parsing a header, except if the error was caused by the character NUL + /// (ASCII code 0), as Chrome specifically always reject those. + /// + /// The ignorable errors are: + /// * empty header names; + /// * characters that are not allowed in header names, except for `\0` and `\r`; + /// * when `allow_spaces_after_header_name_in_responses` is not enabled, + /// spaces and tabs between the header name and the colon; + /// * missing colon between header name and colon; + /// * characters that are not allowed in header values except for `\0` and `\r`. + /// + /// If an ignorable error is encountered, the parser tries to find the next + /// line in the input to resume parsing the rest of the headers. An error + /// will be emitted nonetheless if it finds `\0` or a lone `\r` while + /// looking for the next line. + pub fn http1_ignore_invalid_headers_in_responses( + &mut self, + val: bool, + ) -> &mut Builder { + self.conn_builder + .http1_ignore_invalid_headers_in_responses(val); + self + } + + /// Set whether HTTP/1 connections should try to use vectored writes, + /// or always flatten into a single buffer. + /// + /// Note that setting this to false may mean more copies of body data, + /// but may also improve performance when an IO transport doesn't + /// support vectored writes well, such as most TLS implementations. + /// + /// Setting this to true will force hyper to use queued strategy + /// which may eliminate unnecessary cloning on some TLS backends + /// + /// Default is `auto`. In this mode hyper will try to guess which + /// mode to use + pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { + self.conn_builder.http1_writev(enabled); + self + } + + /// Set whether HTTP/1 connections will write header names as title case at + /// the socket level. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self { + self.conn_builder.http1_title_case_headers(val); + self + } + + /// Set whether to support preserving original header cases. + /// + /// Currently, this will record the original cases received, and store them + /// in a private extension on the `Response`. It will also look for and use + /// such an extension in any provided `Request`. + /// + /// Since the relevant extension is still private, there is no way to + /// interact with the original cases. The only effect this can have now is + /// to forward the cases in a proxy-like fashion. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self { + self.conn_builder.http1_preserve_header_case(val); + self + } + + /// Set whether HTTP/0.9 responses should be tolerated. + /// + /// Default is false. + pub fn http09_responses(&mut self, val: bool) -> &mut Self { + self.conn_builder.http09_responses(val); + self + } + + /// Set whether the connection **must** use HTTP/2. + /// + /// The destination must either allow HTTP2 Prior Knowledge, or the + /// `Connect` should be configured to do use ALPN to upgrade to `h2` + /// as part of the connection process. This will not make the `Client` + /// utilize ALPN by itself. + /// + /// Note that setting this to true prevents HTTP/1 from being allowed. + /// + /// Default is false. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_only(&mut self, val: bool) -> &mut Self { + self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto }; + self + } + + /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 + /// stream-level flow control. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + /// + /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { + self.conn_builder + .http2_initial_stream_window_size(sz.into()); + self + } + + /// Sets the max connection-level flow control for HTTP2 + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_connection_window_size( + &mut self, + sz: impl Into<Option<u32>>, + ) -> &mut Self { + self.conn_builder + .http2_initial_connection_window_size(sz.into()); + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.conn_builder.http2_adaptive_window(enabled); + self + } + + /// Sets the maximum frame size to use for HTTP2. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { + self.conn_builder.http2_max_frame_size(sz); + self + } + + /// Sets an interval for HTTP2 Ping frames should be sent to keep a + /// connection alive. + /// + /// Pass `None` to disable HTTP2 keep-alive. + /// + /// Default is currently disabled. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_interval( + &mut self, + interval: impl Into<Option<Duration>>, + ) -> &mut Self { + self.conn_builder.http2_keep_alive_interval(interval); + self + } + + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. + /// + /// Default is 20 seconds. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.conn_builder.http2_keep_alive_timeout(timeout); + self + } + + /// Sets whether HTTP2 keep-alive should apply while the connection is idle. + /// + /// If disabled, keep-alive pings are only sent while there are open + /// request/responses streams. If enabled, pings are also sent when no + /// streams are active. Does nothing if `http2_keep_alive_interval` is + /// disabled. + /// + /// Default is `false`. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { + self.conn_builder.http2_keep_alive_while_idle(enabled); + self + } + + /// Sets the maximum number of HTTP2 concurrent locally reset streams. + /// + /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more + /// details. + /// + /// The default value is determined by the `h2` crate. + /// + /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.conn_builder.http2_max_concurrent_reset_streams(max); + self + } + + /// Set the maximum write buffer size for each HTTP/2 stream. + /// + /// Default is currently 1MB, but may change. + /// + /// # Panics + /// + /// The value must be no larger than `u32::MAX`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { + self.conn_builder.http2_max_send_buf_size(max); + self + } + + /// Set whether to retry requests that get disrupted before ever starting + /// to write. + /// + /// This means a request that is queued, and gets given an idle, reused + /// connection, and then encounters an error immediately as the idle + /// connection was found to be unusable. + /// + /// When this is set to `false`, the related `ResponseFuture` would instead + /// resolve to an `Error::Cancel`. + /// + /// Default is `true`. + #[inline] + pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self { + self.client_config.retry_canceled_requests = val; + self + } + + /// Set whether to automatically add the `Host` header to requests. + /// + /// If true, and a request does not include a `Host` header, one will be + /// added automatically, derived from the authority of the `Uri`. + /// + /// Default is `true`. + #[inline] + pub fn set_host(&mut self, val: bool) -> &mut Self { + self.client_config.set_host = val; + self + } + + /// Provide an executor to execute background `Connection` tasks. + pub fn executor<E>(&mut self, exec: E) -> &mut Self + where + E: Executor<BoxSendFuture> + Send + Sync + 'static, + { + self.conn_builder.executor(exec); + self + } + + /// Builder a client with this configuration and the default `HttpConnector`. + #[cfg(feature = "tcp")] + pub fn build_http<B>(&self) -> Client<HttpConnector, B> + where + B: HttpBody + Send, + B::Data: Send, + { + let mut connector = HttpConnector::new(); + if self.pool_config.is_enabled() { + connector.set_keepalive(self.pool_config.idle_timeout); + } + self.build(connector) + } + + /// Combine the configuration of this builder with a connector to create a `Client`. + pub fn build<C, B>(&self, connector: C) -> Client<C, B> + where + C: Connect + Clone, + B: HttpBody + Send, + B::Data: Send, + { + Client { + config: self.client_config, + conn_builder: self.conn_builder.clone(), + connector, + pool: Pool::new(self.pool_config, &self.conn_builder.exec), + } + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Builder") + .field("client_config", &self.client_config) + .field("conn_builder", &self.conn_builder) + .field("pool_config", &self.pool_config) + .finish() + } +} + +#[cfg(test)] +mod unit_tests { + use super::*; + + #[test] + fn response_future_is_sync() { + fn assert_sync<T: Sync>() {} + assert_sync::<ResponseFuture>(); + } + + #[test] + fn set_relative_uri_with_implicit_path() { + let mut uri = "http://hyper.rs".parse().unwrap(); + origin_form(&mut uri); + assert_eq!(uri.to_string(), "/"); + } + + #[test] + fn test_origin_form() { + let mut uri = "http://hyper.rs/guides".parse().unwrap(); + origin_form(&mut uri); + assert_eq!(uri.to_string(), "/guides"); + + let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap(); + origin_form(&mut uri); + assert_eq!(uri.to_string(), "/guides?foo=bar"); + } + + #[test] + fn test_absolute_form() { + let mut uri = "http://hyper.rs/guides".parse().unwrap(); + absolute_form(&mut uri); + assert_eq!(uri.to_string(), "http://hyper.rs/guides"); + + let mut uri = "https://hyper.rs/guides".parse().unwrap(); + absolute_form(&mut uri); + assert_eq!(uri.to_string(), "/guides"); + } + + #[test] + fn test_authority_form() { + let _ = pretty_env_logger::try_init(); + + let mut uri = "http://hyper.rs".parse().unwrap(); + authority_form(&mut uri); + assert_eq!(uri.to_string(), "hyper.rs"); + + let mut uri = "hyper.rs".parse().unwrap(); + authority_form(&mut uri); + assert_eq!(uri.to_string(), "hyper.rs"); + } + + #[test] + fn test_extract_domain_connect_no_port() { + let mut uri = "hyper.rs".parse().unwrap(); + let (scheme, host) = extract_domain(&mut uri, true).expect("extract domain"); + assert_eq!(scheme, *"http"); + assert_eq!(host, "hyper.rs"); + } + + #[test] + fn test_is_secure() { + assert_eq!( + is_schema_secure(&"http://hyper.rs".parse::<Uri>().unwrap()), + false + ); + assert_eq!(is_schema_secure(&"hyper.rs".parse::<Uri>().unwrap()), false); + assert_eq!( + is_schema_secure(&"wss://hyper.rs".parse::<Uri>().unwrap()), + true + ); + assert_eq!( + is_schema_secure(&"ws://hyper.rs".parse::<Uri>().unwrap()), + false + ); + } + + #[test] + fn test_get_non_default_port() { + assert!(get_non_default_port(&"http://hyper.rs".parse::<Uri>().unwrap()).is_none()); + assert!(get_non_default_port(&"http://hyper.rs:80".parse::<Uri>().unwrap()).is_none()); + assert!(get_non_default_port(&"https://hyper.rs:443".parse::<Uri>().unwrap()).is_none()); + assert!(get_non_default_port(&"hyper.rs:80".parse::<Uri>().unwrap()).is_none()); + + assert_eq!( + get_non_default_port(&"http://hyper.rs:123".parse::<Uri>().unwrap()) + .unwrap() + .as_u16(), + 123 + ); + assert_eq!( + get_non_default_port(&"https://hyper.rs:80".parse::<Uri>().unwrap()) + .unwrap() + .as_u16(), + 80 + ); + assert_eq!( + get_non_default_port(&"hyper.rs:123".parse::<Uri>().unwrap()) + .unwrap() + .as_u16(), + 123 + ); + } +} diff --git a/third_party/rust/hyper/src/client/conn.rs b/third_party/rust/hyper/src/client/conn.rs new file mode 100644 index 0000000000..3eb12b4204 --- /dev/null +++ b/third_party/rust/hyper/src/client/conn.rs @@ -0,0 +1,1113 @@ +//! Lower-level client connection API. +//! +//! The types in this module are to provide a lower-level API based around a +//! single connection. Connecting to a host, pooling connections, and the like +//! are not handled at this level. This module provides the building blocks to +//! customize those things externally. +//! +//! If don't have need to manage connections yourself, consider using the +//! higher-level [Client](super) API. +//! +//! ## Example +//! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream +//! ```no_run +//! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))] +//! # mod rt { +//! use tower::ServiceExt; +//! use http::{Request, StatusCode}; +//! use hyper::{client::conn, Body}; +//! use tokio::net::TcpStream; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let target_stream = TcpStream::connect("example.com:80").await?; +//! +//! let (mut request_sender, connection) = conn::handshake(target_stream).await?; +//! +//! // spawn a task to poll the connection and drive the HTTP state +//! tokio::spawn(async move { +//! if let Err(e) = connection.await { +//! eprintln!("Error in connection: {}", e); +//! } +//! }); +//! +//! let request = Request::builder() +//! // We need to manually add the host header because SendRequest does not +//! .header("Host", "example.com") +//! .method("GET") +//! .body(Body::from(""))?; +//! let response = request_sender.send_request(request).await?; +//! assert!(response.status() == StatusCode::OK); +//! +//! // To send via the same connection again, it may not work as it may not be ready, +//! // so we have to wait until the request_sender becomes ready. +//! request_sender.ready().await?; +//! let request = Request::builder() +//! .header("Host", "example.com") +//! .method("GET") +//! .body(Body::from(""))?; +//! let response = request_sender.send_request(request).await?; +//! assert!(response.status() == StatusCode::OK); +//! Ok(()) +//! } +//! +//! # } +//! ``` + +use std::error::Error as StdError; +use std::fmt; +#[cfg(not(all(feature = "http1", feature = "http2")))] +use std::marker::PhantomData; +use std::sync::Arc; +#[cfg(all(feature = "runtime", feature = "http2"))] +use std::time::Duration; + +use bytes::Bytes; +use futures_util::future::{self, Either, FutureExt as _}; +use httparse::ParserConfig; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower_service::Service; +use tracing::{debug, trace}; + +use super::dispatch; +use crate::body::HttpBody; +#[cfg(not(all(feature = "http1", feature = "http2")))] +use crate::common::Never; +use crate::common::{ + exec::{BoxSendFuture, Exec}, + task, Future, Pin, Poll, +}; +use crate::proto; +use crate::rt::Executor; +#[cfg(feature = "http1")] +use crate::upgrade::Upgraded; +use crate::{Body, Request, Response}; + +#[cfg(feature = "http1")] +type Http1Dispatcher<T, B> = + proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>; + +#[cfg(not(feature = "http1"))] +type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>); + +#[cfg(feature = "http2")] +type Http2ClientTask<B> = proto::h2::ClientTask<B>; + +#[cfg(not(feature = "http2"))] +type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>); + +pin_project! { + #[project = ProtoClientProj] + enum ProtoClient<T, B> + where + B: HttpBody, + { + H1 { + #[pin] + h1: Http1Dispatcher<T, B>, + }, + H2 { + #[pin] + h2: Http2ClientTask<B>, + }, + } +} + +/// Returns a handshake future over some IO. +/// +/// This is a shortcut for `Builder::new().handshake(io)`. +/// See [`client::conn`](crate::client::conn) for more. +pub async fn handshake<T>( + io: T, +) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)> +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + Builder::new().handshake(io).await +} + +/// The sender side of an established connection. +pub struct SendRequest<B> { + dispatch: dispatch::Sender<Request<B>, Response<Body>>, +} + +/// A future that processes all HTTP state for the IO object. +/// +/// In most cases, this should just be spawned into an executor, so that it +/// can process incoming and outgoing messages, notice hangups, and the like. +#[must_use = "futures do nothing unless polled"] +pub struct Connection<T, B> +where + T: AsyncRead + AsyncWrite + Send + 'static, + B: HttpBody + 'static, +{ + inner: Option<ProtoClient<T, B>>, +} + +/// A builder to configure an HTTP connection. +/// +/// After setting options, the builder is used to create a handshake future. +#[derive(Clone, Debug)] +pub struct Builder { + pub(super) exec: Exec, + h09_responses: bool, + h1_parser_config: ParserConfig, + h1_writev: Option<bool>, + h1_title_case_headers: bool, + h1_preserve_header_case: bool, + #[cfg(feature = "ffi")] + h1_preserve_header_order: bool, + h1_read_buf_exact_size: Option<usize>, + h1_max_buf_size: Option<usize>, + #[cfg(feature = "ffi")] + h1_headers_raw: bool, + #[cfg(feature = "http2")] + h2_builder: proto::h2::client::Config, + version: Proto, +} + +#[derive(Clone, Debug)] +enum Proto { + #[cfg(feature = "http1")] + Http1, + #[cfg(feature = "http2")] + Http2, +} + +/// A future returned by `SendRequest::send_request`. +/// +/// Yields a `Response` if successful. +#[must_use = "futures do nothing unless polled"] +pub struct ResponseFuture { + inner: ResponseFutureState, +} + +enum ResponseFutureState { + Waiting(dispatch::Promise<Response<Body>>), + // Option is to be able to `take()` it in `poll` + Error(Option<crate::Error>), +} + +/// Deconstructed parts of a `Connection`. +/// +/// This allows taking apart a `Connection` at a later time, in order to +/// reclaim the IO object, and additional related pieces. +#[derive(Debug)] +pub struct Parts<T> { + /// The original IO object used in the handshake. + pub io: T, + /// A buffer of bytes that have been read but not processed as HTTP. + /// + /// For instance, if the `Connection` is used for an HTTP upgrade request, + /// it is possible the server sent back the first bytes of the new protocol + /// along with the response upgrade. + /// + /// You will want to check for any existing bytes if you plan to continue + /// communicating on the IO object. + pub read_buf: Bytes, + _inner: (), +} + +// ========== internal client api + +// A `SendRequest` that can be cloned to send HTTP2 requests. +// private for now, probably not a great idea of a type... +#[must_use = "futures do nothing unless polled"] +#[cfg(feature = "http2")] +pub(super) struct Http2SendRequest<B> { + dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>, +} + +// ===== impl SendRequest + +impl<B> SendRequest<B> { + /// Polls to determine whether this sender can be used yet for a request. + /// + /// If the associated connection is closed, this returns an Error. + pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + self.dispatch.poll_ready(cx) + } + + pub(super) async fn when_ready(self) -> crate::Result<Self> { + let mut me = Some(self); + future::poll_fn(move |cx| { + ready!(me.as_mut().unwrap().poll_ready(cx))?; + Poll::Ready(Ok(me.take().unwrap())) + }) + .await + } + + pub(super) fn is_ready(&self) -> bool { + self.dispatch.is_ready() + } + + pub(super) fn is_closed(&self) -> bool { + self.dispatch.is_closed() + } + + #[cfg(feature = "http2")] + pub(super) fn into_http2(self) -> Http2SendRequest<B> { + Http2SendRequest { + dispatch: self.dispatch.unbound(), + } + } +} + +impl<B> SendRequest<B> +where + B: HttpBody + 'static, +{ + /// Sends a `Request` on the associated connection. + /// + /// Returns a future that if successful, yields the `Response`. + /// + /// # Note + /// + /// There are some key differences in what automatic things the `Client` + /// does for you that will not be done here: + /// + /// - `Client` requires absolute-form `Uri`s, since the scheme and + /// authority are needed to connect. They aren't required here. + /// - Since the `Client` requires absolute-form `Uri`s, it can add + /// the `Host` header based on it. You must add a `Host` header yourself + /// before calling this method. + /// - Since absolute-form `Uri`s are not required, if received, they will + /// be serialized as-is. + /// + /// # Example + /// + /// ``` + /// # use http::header::HOST; + /// # use hyper::client::conn::SendRequest; + /// # use hyper::Body; + /// use hyper::Request; + /// + /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> { + /// // build a Request + /// let req = Request::builder() + /// .uri("/foo/bar") + /// .header(HOST, "hyper.rs") + /// .body(Body::empty()) + /// .unwrap(); + /// + /// // send it and await a Response + /// let res = tx.send_request(req).await?; + /// // assert the Response + /// assert!(res.status().is_success()); + /// # Ok(()) + /// # } + /// # fn main() {} + /// ``` + pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture { + let inner = match self.dispatch.send(req) { + Ok(rx) => ResponseFutureState::Waiting(rx), + Err(_req) => { + debug!("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); + ResponseFutureState::Error(Some(err)) + } + }; + + ResponseFuture { inner } + } + + pub(super) fn send_request_retryable( + &mut self, + req: Request<B>, + ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin + where + B: Send, + { + match self.dispatch.try_send(req) { + Ok(rx) => { + Either::Left(rx.then(move |res| { + match res { + Ok(Ok(res)) => future::ok(res), + Ok(Err(err)) => future::err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + } + })) + } + Err(req) => { + debug!("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); + Either::Right(future::err((err, Some(req)))) + } + } + } +} + +impl<B> Service<Request<B>> for SendRequest<B> +where + B: HttpBody + 'static, +{ + type Response = Response<Body>; + type Error = crate::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_ready(cx) + } + + fn call(&mut self, req: Request<B>) -> Self::Future { + self.send_request(req) + } +} + +impl<B> fmt::Debug for SendRequest<B> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendRequest").finish() + } +} + +// ===== impl Http2SendRequest + +#[cfg(feature = "http2")] +impl<B> Http2SendRequest<B> { + pub(super) fn is_ready(&self) -> bool { + self.dispatch.is_ready() + } + + pub(super) fn is_closed(&self) -> bool { + self.dispatch.is_closed() + } +} + +#[cfg(feature = "http2")] +impl<B> Http2SendRequest<B> +where + B: HttpBody + 'static, +{ + pub(super) fn send_request_retryable( + &mut self, + req: Request<B>, + ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + where + B: Send, + { + match self.dispatch.try_send(req) { + Ok(rx) => { + Either::Left(rx.then(move |res| { + match res { + Ok(Ok(res)) => future::ok(res), + Ok(Err(err)) => future::err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + } + })) + } + Err(req) => { + debug!("connection was not ready"); + let err = crate::Error::new_canceled().with("connection was not ready"); + Either::Right(future::err((err, Some(req)))) + } + } + } +} + +#[cfg(feature = "http2")] +impl<B> fmt::Debug for Http2SendRequest<B> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Http2SendRequest").finish() + } +} + +#[cfg(feature = "http2")] +impl<B> Clone for Http2SendRequest<B> { + fn clone(&self) -> Self { + Http2SendRequest { + dispatch: self.dispatch.clone(), + } + } +} + +// ===== impl Connection + +impl<T, B> Connection<T, B> +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: HttpBody + Unpin + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + /// Return the inner IO object, and additional information. + /// + /// Only works for HTTP/1 connections. HTTP/2 connections will panic. + pub fn into_parts(self) -> Parts<T> { + match self.inner.expect("already upgraded") { + #[cfg(feature = "http1")] + ProtoClient::H1 { h1 } => { + let (io, read_buf, _) = h1.into_inner(); + Parts { + io, + read_buf, + _inner: (), + } + } + ProtoClient::H2 { .. } => { + panic!("http2 cannot into_inner"); + } + + #[cfg(not(feature = "http1"))] + ProtoClient::H1 { h1 } => match h1.0 {}, + } + } + + /// Poll the connection for completion, but without calling `shutdown` + /// on the underlying IO. + /// + /// This is useful to allow running a connection while doing an HTTP + /// upgrade. Once the upgrade is completed, the connection would be "done", + /// but it is not desired to actually shutdown the IO object. Instead you + /// would take it back using `into_parts`. + /// + /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) + /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) + /// to work with this function; or use the `without_shutdown` wrapper. + pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + match *self.inner.as_mut().expect("already upgraded") { + #[cfg(feature = "http1")] + ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), + #[cfg(feature = "http2")] + ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), + + #[cfg(not(feature = "http1"))] + ProtoClient::H1 { ref mut h1 } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoClient::H2 { ref mut h2, .. } => match h2.0 {}, + } + } + + /// Prevent shutdown of the underlying IO object at the end of service the request, + /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. + pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> { + let mut conn = Some(self); + future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> { + ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; + Poll::Ready(Ok(conn.take().unwrap().into_parts())) + }) + } + + /// Returns whether the [extended CONNECT protocol][1] is enabled or not. + /// + /// This setting is configured by the server peer by sending the + /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value received from the + /// remote. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 + #[cfg(feature = "http2")] + pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool { + match self.inner.as_ref().unwrap() { + ProtoClient::H1 { .. } => false, + ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(), + } + } +} + +impl<T, B> Future for Connection<T, B> +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? { + proto::Dispatched::Shutdown => Poll::Ready(Ok(())), + #[cfg(feature = "http1")] + proto::Dispatched::Upgrade(pending) => match self.inner.take() { + Some(ProtoClient::H1 { h1 }) => { + let (io, buf, _) = h1.into_inner(); + pending.fulfill(Upgraded::new(io, buf)); + Poll::Ready(Ok(())) + } + _ => { + drop(pending); + unreachable!("Upgrade expects h1"); + } + }, + } + } +} + +impl<T, B> fmt::Debug for Connection<T, B> +where + T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static, + B: HttpBody + 'static, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Connection").finish() + } +} + +// ===== impl Builder + +impl Builder { + /// Creates a new connection builder. + #[inline] + pub fn new() -> Builder { + Builder { + exec: Exec::Default, + h09_responses: false, + h1_writev: None, + h1_read_buf_exact_size: None, + h1_parser_config: Default::default(), + h1_title_case_headers: false, + h1_preserve_header_case: false, + #[cfg(feature = "ffi")] + h1_preserve_header_order: false, + h1_max_buf_size: None, + #[cfg(feature = "ffi")] + h1_headers_raw: false, + #[cfg(feature = "http2")] + h2_builder: Default::default(), + #[cfg(feature = "http1")] + version: Proto::Http1, + #[cfg(not(feature = "http1"))] + version: Proto::Http2, + } + } + + /// Provide an executor to execute background HTTP2 tasks. + pub fn executor<E>(&mut self, exec: E) -> &mut Builder + where + E: Executor<BoxSendFuture> + Send + Sync + 'static, + { + self.exec = Exec::Executor(Arc::new(exec)); + self + } + + /// Set whether HTTP/0.9 responses should be tolerated. + /// + /// Default is false. + pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder { + self.h09_responses = enabled; + self + } + + /// Set whether HTTP/1 connections will accept spaces between header names + /// and the colon that follow them in responses. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > No whitespace is allowed between the header field-name and colon. In + /// > the past, differences in the handling of such whitespace have led to + /// > security vulnerabilities in request routing and response handling. A + /// > server MUST reject any received request message that contains + /// > whitespace between a header field-name and colon with a response code + /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a + /// > response message before forwarding the message downstream. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_spaces_after_header_name_in_responses( + &mut self, + enabled: bool, + ) -> &mut Builder { + self.h1_parser_config + .allow_spaces_after_header_name_in_responses(enabled); + self + } + + /// Set whether HTTP/1 connections will accept obsolete line folding for + /// header values. + /// + /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when + /// parsing. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > A server that receives an obs-fold in a request message that is not + /// > within a message/http container MUST either reject the message by + /// > sending a 400 (Bad Request), preferably with a representation + /// > explaining that obsolete line folding is unacceptable, or replace + /// > each received obs-fold with one or more SP octets prior to + /// > interpreting the field value or forwarding the message downstream. + /// + /// > A proxy or gateway that receives an obs-fold in a response message + /// > that is not within a message/http container MUST either discard the + /// > message and replace it with a 502 (Bad Gateway) response, preferably + /// > with a representation explaining that unacceptable line folding was + /// > received, or replace each received obs-fold with one or more SP + /// > octets prior to interpreting the field value or forwarding the + /// > message downstream. + /// + /// > A user agent that receives an obs-fold in a response message that is + /// > not within a message/http container MUST replace each received + /// > obs-fold with one or more SP octets prior to interpreting the field + /// > value. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_obsolete_multiline_headers_in_responses( + &mut self, + enabled: bool, + ) -> &mut Builder { + self.h1_parser_config + .allow_obsolete_multiline_headers_in_responses(enabled); + self + } + + /// Set whether HTTP/1 connections will silently ignored malformed header lines. + /// + /// If this is enabled and and a header line does not start with a valid header + /// name, or does not include a colon at all, the line will be silently ignored + /// and no error will be reported. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_ignore_invalid_headers_in_responses( + &mut self, + enabled: bool, + ) -> &mut Builder { + self.h1_parser_config + .ignore_invalid_headers_in_responses(enabled); + self + } + + /// Set whether HTTP/1 connections should try to use vectored writes, + /// or always flatten into a single buffer. + /// + /// Note that setting this to false may mean more copies of body data, + /// but may also improve performance when an IO transport doesn't + /// support vectored writes well, such as most TLS implementations. + /// + /// Setting this to true will force hyper to use queued strategy + /// which may eliminate unnecessary cloning on some TLS backends + /// + /// Default is `auto`. In this mode hyper will try to guess which + /// mode to use + pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { + self.h1_writev = Some(enabled); + self + } + + /// Set whether HTTP/1 connections will write header names as title case at + /// the socket level. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder { + self.h1_title_case_headers = enabled; + self + } + + /// Set whether to support preserving original header cases. + /// + /// Currently, this will record the original cases received, and store them + /// in a private extension on the `Response`. It will also look for and use + /// such an extension in any provided `Request`. + /// + /// Since the relevant extension is still private, there is no way to + /// interact with the original cases. The only effect this can have now is + /// to forward the cases in a proxy-like fashion. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder { + self.h1_preserve_header_case = enabled; + self + } + + /// Set whether to support preserving original header order. + /// + /// Currently, this will record the order in which headers are received, and store this + /// ordering in a private extension on the `Response`. It will also look for and use + /// such an extension in any provided `Request`. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + #[cfg(feature = "ffi")] + pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder { + self.h1_preserve_header_order = enabled; + self + } + + /// Sets the exact size of the read buffer to *always* use. + /// + /// Note that setting this option unsets the `http1_max_buf_size` option. + /// + /// Default is an adaptive read buffer. + pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder { + self.h1_read_buf_exact_size = sz; + self.h1_max_buf_size = None; + self + } + + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + /// + /// Note that setting this option unsets the `http1_read_exact_buf_size` option. + /// + /// # Panics + /// + /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. + #[cfg(feature = "http1")] + #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] + pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { + assert!( + max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE, + "the max_buf_size cannot be smaller than the minimum that h1 specifies." + ); + + self.h1_max_buf_size = Some(max); + self.h1_read_buf_exact_size = None; + self + } + + #[cfg(feature = "ffi")] + pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self { + self.h1_headers_raw = enabled; + self + } + + /// Sets whether HTTP2 is required. + /// + /// Default is false. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_only(&mut self, enabled: bool) -> &mut Builder { + if enabled { + self.version = Proto::Http2 + } + self + } + + /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 + /// stream-level flow control. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + /// + /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; + } + self + } + + /// Sets the max connection-level flow control for HTTP2 + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_connection_window_size( + &mut self, + sz: impl Into<Option<u32>>, + ) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; + } + self + } + + /// Sets the maximum frame size to use for HTTP2. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self { + if let Some(sz) = sz.into() { + self.h2_builder.max_frame_size = sz; + } + self + } + + /// Sets an interval for HTTP2 Ping frames should be sent to keep a + /// connection alive. + /// + /// Pass `None` to disable HTTP2 keep-alive. + /// + /// Default is currently disabled. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_interval( + &mut self, + interval: impl Into<Option<Duration>>, + ) -> &mut Self { + self.h2_builder.keep_alive_interval = interval.into(); + self + } + + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. + /// + /// Default is 20 seconds. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.h2_builder.keep_alive_timeout = timeout; + self + } + + /// Sets whether HTTP2 keep-alive should apply while the connection is idle. + /// + /// If disabled, keep-alive pings are only sent while there are open + /// request/responses streams. If enabled, pings are also sent when no + /// streams are active. Does nothing if `http2_keep_alive_interval` is + /// disabled. + /// + /// Default is `false`. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { + self.h2_builder.keep_alive_while_idle = enabled; + self + } + + /// Sets the maximum number of HTTP2 concurrent locally reset streams. + /// + /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more + /// details. + /// + /// The default value is determined by the `h2` crate. + /// + /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.h2_builder.max_concurrent_reset_streams = Some(max); + self + } + + /// Set the maximum write buffer size for each HTTP/2 stream. + /// + /// Default is currently 1MB, but may change. + /// + /// # Panics + /// + /// The value must be no larger than `u32::MAX`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.h2_builder.max_send_buffer_size = max; + self + } + + /// Constructs a connection with the configured options and IO. + /// See [`client::conn`](crate::client::conn) for more. + /// + /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will + /// do nothing. + pub fn handshake<T, B>( + &self, + io: T, + ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>> + where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + B: HttpBody + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + { + let opts = self.clone(); + + async move { + trace!("client handshake {:?}", opts.version); + + let (tx, rx) = dispatch::channel(); + let proto = match opts.version { + #[cfg(feature = "http1")] + Proto::Http1 => { + let mut conn = proto::Conn::new(io); + conn.set_h1_parser_config(opts.h1_parser_config); + if let Some(writev) = opts.h1_writev { + if writev { + conn.set_write_strategy_queue(); + } else { + conn.set_write_strategy_flatten(); + } + } + if opts.h1_title_case_headers { + conn.set_title_case_headers(); + } + if opts.h1_preserve_header_case { + conn.set_preserve_header_case(); + } + #[cfg(feature = "ffi")] + if opts.h1_preserve_header_order { + conn.set_preserve_header_order(); + } + if opts.h09_responses { + conn.set_h09_responses(); + } + + #[cfg(feature = "ffi")] + conn.set_raw_headers(opts.h1_headers_raw); + + if let Some(sz) = opts.h1_read_buf_exact_size { + conn.set_read_buf_exact_size(sz); + } + if let Some(max) = opts.h1_max_buf_size { + conn.set_max_buf_size(max); + } + let cd = proto::h1::dispatch::Client::new(rx); + let dispatch = proto::h1::Dispatcher::new(cd, conn); + ProtoClient::H1 { h1: dispatch } + } + #[cfg(feature = "http2")] + Proto::Http2 => { + let h2 = + proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) + .await?; + ProtoClient::H2 { h2 } + } + }; + + Ok(( + SendRequest { dispatch: tx }, + Connection { inner: Some(proto) }, + )) + } + } +} + +// ===== impl ResponseFuture + +impl Future for ResponseFuture { + type Output = crate::Result<Response<Body>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match self.inner { + ResponseFutureState::Waiting(ref mut rx) => { + Pin::new(rx).poll(cx).map(|res| match res { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_canceled) => panic!("dispatch dropped without returning error"), + }) + } + ResponseFutureState::Error(ref mut err) => { + Poll::Ready(Err(err.take().expect("polled after ready"))) + } + } + } +} + +impl fmt::Debug for ResponseFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ResponseFuture").finish() + } +} + +// ===== impl ProtoClient + +impl<T, B> Future for ProtoClient<T, B> +where + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<proto::Dispatched>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match self.project() { + #[cfg(feature = "http1")] + ProtoClientProj::H1 { h1 } => h1.poll(cx), + #[cfg(feature = "http2")] + ProtoClientProj::H2 { h2, .. } => h2.poll(cx), + + #[cfg(not(feature = "http1"))] + ProtoClientProj::H1 { h1 } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoClientProj::H2 { h2, .. } => match h2.0 {}, + } + } +} + +// assert trait markers + +trait AssertSend: Send {} +trait AssertSendSync: Send + Sync {} + +#[doc(hidden)] +impl<B: Send> AssertSendSync for SendRequest<B> {} + +#[doc(hidden)] +impl<T: Send, B: Send> AssertSend for Connection<T, B> +where + T: AsyncRead + AsyncWrite + Send + 'static, + B: HttpBody + 'static, + B::Data: Send, +{ +} + +#[doc(hidden)] +impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B> +where + T: AsyncRead + AsyncWrite + Send + 'static, + B: HttpBody + 'static, + B::Data: Send + Sync + 'static, +{ +} + +#[doc(hidden)] +impl AssertSendSync for Builder {} + +#[doc(hidden)] +impl AssertSend for ResponseFuture {} diff --git a/third_party/rust/hyper/src/client/connect/dns.rs b/third_party/rust/hyper/src/client/connect/dns.rs new file mode 100644 index 0000000000..e4465078b3 --- /dev/null +++ b/third_party/rust/hyper/src/client/connect/dns.rs @@ -0,0 +1,425 @@ +//! DNS Resolution used by the `HttpConnector`. +//! +//! This module contains: +//! +//! - A [`GaiResolver`](GaiResolver) that is the default resolver for the +//! `HttpConnector`. +//! - The `Name` type used as an argument to custom resolvers. +//! +//! # Resolvers are `Service`s +//! +//! A resolver is just a +//! `Service<Name, Response = impl Iterator<Item = SocketAddr>>`. +//! +//! A simple resolver that ignores the name and always returns a specific +//! address: +//! +//! ```rust,ignore +//! use std::{convert::Infallible, iter, net::SocketAddr}; +//! +//! let resolver = tower::service_fn(|_name| async { +//! Ok::<_, Infallible>(iter::once(SocketAddr::from(([127, 0, 0, 1], 8080)))) +//! }); +//! ``` +use std::error::Error; +use std::future::Future; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; +use std::pin::Pin; +use std::str::FromStr; +use std::task::{self, Poll}; +use std::{fmt, io, vec}; + +use tokio::task::JoinHandle; +use tower_service::Service; +use tracing::debug; + +pub(super) use self::sealed::Resolve; + +/// A domain name to resolve into IP addresses. +#[derive(Clone, Hash, Eq, PartialEq)] +pub struct Name { + host: Box<str>, +} + +/// A resolver using blocking `getaddrinfo` calls in a threadpool. +#[derive(Clone)] +pub struct GaiResolver { + _priv: (), +} + +/// An iterator of IP addresses returned from `getaddrinfo`. +pub struct GaiAddrs { + inner: SocketAddrs, +} + +/// A future to resolve a name returned by `GaiResolver`. +pub struct GaiFuture { + inner: JoinHandle<Result<SocketAddrs, io::Error>>, +} + +impl Name { + pub(super) fn new(host: Box<str>) -> Name { + Name { host } + } + + /// View the hostname as a string slice. + pub fn as_str(&self) -> &str { + &self.host + } +} + +impl fmt::Debug for Name { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.host, f) + } +} + +impl fmt::Display for Name { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.host, f) + } +} + +impl FromStr for Name { + type Err = InvalidNameError; + + fn from_str(host: &str) -> Result<Self, Self::Err> { + // Possibly add validation later + Ok(Name::new(host.into())) + } +} + +/// Error indicating a given string was not a valid domain name. +#[derive(Debug)] +pub struct InvalidNameError(()); + +impl fmt::Display for InvalidNameError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Not a valid domain name") + } +} + +impl Error for InvalidNameError {} + +impl GaiResolver { + /// Construct a new `GaiResolver`. + pub fn new() -> Self { + GaiResolver { _priv: () } + } +} + +impl Service<Name> for GaiResolver { + type Response = GaiAddrs; + type Error = io::Error; + type Future = GaiFuture; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, name: Name) -> Self::Future { + let blocking = tokio::task::spawn_blocking(move || { + debug!("resolving host={:?}", name.host); + (&*name.host, 0) + .to_socket_addrs() + .map(|i| SocketAddrs { iter: i }) + }); + + GaiFuture { inner: blocking } + } +} + +impl fmt::Debug for GaiResolver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("GaiResolver") + } +} + +impl Future for GaiFuture { + type Output = Result<GaiAddrs, io::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.inner).poll(cx).map(|res| match res { + Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }), + Ok(Err(err)) => Err(err), + Err(join_err) => { + if join_err.is_cancelled() { + Err(io::Error::new(io::ErrorKind::Interrupted, join_err)) + } else { + panic!("gai background task failed: {:?}", join_err) + } + } + }) + } +} + +impl fmt::Debug for GaiFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("GaiFuture") + } +} + +impl Drop for GaiFuture { + fn drop(&mut self) { + self.inner.abort(); + } +} + +impl Iterator for GaiAddrs { + type Item = SocketAddr; + + fn next(&mut self) -> Option<Self::Item> { + self.inner.next() + } +} + +impl fmt::Debug for GaiAddrs { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("GaiAddrs") + } +} + +pub(super) struct SocketAddrs { + iter: vec::IntoIter<SocketAddr>, +} + +impl SocketAddrs { + pub(super) fn new(addrs: Vec<SocketAddr>) -> Self { + SocketAddrs { + iter: addrs.into_iter(), + } + } + + pub(super) fn try_parse(host: &str, port: u16) -> Option<SocketAddrs> { + if let Ok(addr) = host.parse::<Ipv4Addr>() { + let addr = SocketAddrV4::new(addr, port); + return Some(SocketAddrs { + iter: vec![SocketAddr::V4(addr)].into_iter(), + }); + } + if let Ok(addr) = host.parse::<Ipv6Addr>() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + return Some(SocketAddrs { + iter: vec![SocketAddr::V6(addr)].into_iter(), + }); + } + None + } + + #[inline] + fn filter(self, predicate: impl FnMut(&SocketAddr) -> bool) -> SocketAddrs { + SocketAddrs::new(self.iter.filter(predicate).collect()) + } + + pub(super) fn split_by_preference( + self, + local_addr_ipv4: Option<Ipv4Addr>, + local_addr_ipv6: Option<Ipv6Addr>, + ) -> (SocketAddrs, SocketAddrs) { + match (local_addr_ipv4, local_addr_ipv6) { + (Some(_), None) => (self.filter(SocketAddr::is_ipv4), SocketAddrs::new(vec![])), + (None, Some(_)) => (self.filter(SocketAddr::is_ipv6), SocketAddrs::new(vec![])), + _ => { + let preferring_v6 = self + .iter + .as_slice() + .first() + .map(SocketAddr::is_ipv6) + .unwrap_or(false); + + let (preferred, fallback) = self + .iter + .partition::<Vec<_>, _>(|addr| addr.is_ipv6() == preferring_v6); + + (SocketAddrs::new(preferred), SocketAddrs::new(fallback)) + } + } + } + + pub(super) fn is_empty(&self) -> bool { + self.iter.as_slice().is_empty() + } + + pub(super) fn len(&self) -> usize { + self.iter.as_slice().len() + } +} + +impl Iterator for SocketAddrs { + type Item = SocketAddr; + #[inline] + fn next(&mut self) -> Option<SocketAddr> { + self.iter.next() + } +} + +/* +/// A resolver using `getaddrinfo` calls via the `tokio_executor::threadpool::blocking` API. +/// +/// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the +/// multi-threaded Tokio runtime. +#[cfg(feature = "runtime")] +#[derive(Clone, Debug)] +pub struct TokioThreadpoolGaiResolver(()); + +/// The future returned by `TokioThreadpoolGaiResolver`. +#[cfg(feature = "runtime")] +#[derive(Debug)] +pub struct TokioThreadpoolGaiFuture { + name: Name, +} + +#[cfg(feature = "runtime")] +impl TokioThreadpoolGaiResolver { + /// Creates a new DNS resolver that will use tokio threadpool's blocking + /// feature. + /// + /// **Requires** its futures to be run on the threadpool runtime. + pub fn new() -> Self { + TokioThreadpoolGaiResolver(()) + } +} + +#[cfg(feature = "runtime")] +impl Service<Name> for TokioThreadpoolGaiResolver { + type Response = GaiAddrs; + type Error = io::Error; + type Future = TokioThreadpoolGaiFuture; + + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, name: Name) -> Self::Future { + TokioThreadpoolGaiFuture { name } + } +} + +#[cfg(feature = "runtime")] +impl Future for TokioThreadpoolGaiFuture { + type Output = Result<GaiAddrs, io::Error>; + + fn poll(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match ready!(tokio_executor::threadpool::blocking(|| ( + self.name.as_str(), + 0 + ) + .to_socket_addrs())) + { + Ok(Ok(iter)) => Poll::Ready(Ok(GaiAddrs { + inner: IpAddrs { iter }, + })), + Ok(Err(e)) => Poll::Ready(Err(e)), + // a BlockingError, meaning not on a tokio_executor::threadpool :( + Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))), + } + } +} +*/ + +mod sealed { + use super::{SocketAddr, Name}; + use crate::common::{task, Future, Poll}; + use tower_service::Service; + + // "Trait alias" for `Service<Name, Response = Addrs>` + pub trait Resolve { + type Addrs: Iterator<Item = SocketAddr>; + type Error: Into<Box<dyn std::error::Error + Send + Sync>>; + type Future: Future<Output = Result<Self::Addrs, Self::Error>>; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>; + fn resolve(&mut self, name: Name) -> Self::Future; + } + + impl<S> Resolve for S + where + S: Service<Name>, + S::Response: Iterator<Item = SocketAddr>, + S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + { + type Addrs = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + Service::poll_ready(self, cx) + } + + fn resolve(&mut self, name: Name) -> Self::Future { + Service::call(self, name) + } + } +} + +pub(super) async fn resolve<R>(resolver: &mut R, name: Name) -> Result<R::Addrs, R::Error> +where + R: Resolve, +{ + futures_util::future::poll_fn(|cx| resolver.poll_ready(cx)).await?; + resolver.resolve(name).await +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn test_ip_addrs_split_by_preference() { + let ip_v4 = Ipv4Addr::new(127, 0, 0, 1); + let ip_v6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); + let v4_addr = (ip_v4, 80).into(); + let v6_addr = (ip_v6, 80).into(); + + let (mut preferred, mut fallback) = SocketAddrs { + iter: vec![v4_addr, v6_addr].into_iter(), + } + .split_by_preference(None, None); + assert!(preferred.next().unwrap().is_ipv4()); + assert!(fallback.next().unwrap().is_ipv6()); + + let (mut preferred, mut fallback) = SocketAddrs { + iter: vec![v6_addr, v4_addr].into_iter(), + } + .split_by_preference(None, None); + assert!(preferred.next().unwrap().is_ipv6()); + assert!(fallback.next().unwrap().is_ipv4()); + + let (mut preferred, mut fallback) = SocketAddrs { + iter: vec![v4_addr, v6_addr].into_iter(), + } + .split_by_preference(Some(ip_v4), Some(ip_v6)); + assert!(preferred.next().unwrap().is_ipv4()); + assert!(fallback.next().unwrap().is_ipv6()); + + let (mut preferred, mut fallback) = SocketAddrs { + iter: vec![v6_addr, v4_addr].into_iter(), + } + .split_by_preference(Some(ip_v4), Some(ip_v6)); + assert!(preferred.next().unwrap().is_ipv6()); + assert!(fallback.next().unwrap().is_ipv4()); + + let (mut preferred, fallback) = SocketAddrs { + iter: vec![v4_addr, v6_addr].into_iter(), + } + .split_by_preference(Some(ip_v4), None); + assert!(preferred.next().unwrap().is_ipv4()); + assert!(fallback.is_empty()); + + let (mut preferred, fallback) = SocketAddrs { + iter: vec![v4_addr, v6_addr].into_iter(), + } + .split_by_preference(None, Some(ip_v6)); + assert!(preferred.next().unwrap().is_ipv6()); + assert!(fallback.is_empty()); + } + + #[test] + fn test_name_from_str() { + const DOMAIN: &str = "test.example.com"; + let name = Name::from_str(DOMAIN).expect("Should be a valid domain"); + assert_eq!(name.as_str(), DOMAIN); + assert_eq!(name.to_string(), DOMAIN); + } +} diff --git a/third_party/rust/hyper/src/client/connect/http.rs b/third_party/rust/hyper/src/client/connect/http.rs new file mode 100644 index 0000000000..afe7b155eb --- /dev/null +++ b/third_party/rust/hyper/src/client/connect/http.rs @@ -0,0 +1,1007 @@ +use std::error::Error as StdError; +use std::fmt; +use std::future::Future; +use std::io; +use std::marker::PhantomData; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Poll}; +use std::time::Duration; + +use futures_util::future::Either; +use http::uri::{Scheme, Uri}; +use pin_project_lite::pin_project; +use tokio::net::{TcpSocket, TcpStream}; +use tokio::time::Sleep; +use tracing::{debug, trace, warn}; + +use super::dns::{self, resolve, GaiResolver, Resolve}; +use super::{Connected, Connection}; +//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver; + +/// A connector for the `http` scheme. +/// +/// Performs DNS resolution in a thread pool, and then connects over TCP. +/// +/// # Note +/// +/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes +/// transport information such as the remote socket address used. +#[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] +#[derive(Clone)] +pub struct HttpConnector<R = GaiResolver> { + config: Arc<Config>, + resolver: R, +} + +/// Extra information about the transport when an HttpConnector is used. +/// +/// # Example +/// +/// ``` +/// # async fn doc() -> hyper::Result<()> { +/// use hyper::Uri; +/// use hyper::client::{Client, connect::HttpInfo}; +/// +/// let client = Client::new(); +/// let uri = Uri::from_static("http://example.com"); +/// +/// let res = client.get(uri).await?; +/// res +/// .extensions() +/// .get::<HttpInfo>() +/// .map(|info| { +/// println!("remote addr = {}", info.remote_addr()); +/// }); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Note +/// +/// If a different connector is used besides [`HttpConnector`](HttpConnector), +/// this value will not exist in the extensions. Consult that specific +/// connector to see what "extra" information it might provide to responses. +#[derive(Clone, Debug)] +pub struct HttpInfo { + remote_addr: SocketAddr, + local_addr: SocketAddr, +} + +#[derive(Clone)] +struct Config { + connect_timeout: Option<Duration>, + enforce_http: bool, + happy_eyeballs_timeout: Option<Duration>, + keep_alive_timeout: Option<Duration>, + local_address_ipv4: Option<Ipv4Addr>, + local_address_ipv6: Option<Ipv6Addr>, + nodelay: bool, + reuse_address: bool, + send_buffer_size: Option<usize>, + recv_buffer_size: Option<usize>, +} + +// ===== impl HttpConnector ===== + +impl HttpConnector { + /// Construct a new HttpConnector. + pub fn new() -> HttpConnector { + HttpConnector::new_with_resolver(GaiResolver::new()) + } +} + +/* +#[cfg(feature = "runtime")] +impl HttpConnector<TokioThreadpoolGaiResolver> { + /// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`. + /// + /// This resolver **requires** the threadpool runtime to be used. + pub fn new_with_tokio_threadpool_resolver() -> Self { + HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new()) + } +} +*/ + +impl<R> HttpConnector<R> { + /// Construct a new HttpConnector. + /// + /// Takes a [`Resolver`](crate::client::connect::dns#resolvers-are-services) to handle DNS lookups. + pub fn new_with_resolver(resolver: R) -> HttpConnector<R> { + HttpConnector { + config: Arc::new(Config { + connect_timeout: None, + enforce_http: true, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), + keep_alive_timeout: None, + local_address_ipv4: None, + local_address_ipv6: None, + nodelay: false, + reuse_address: false, + send_buffer_size: None, + recv_buffer_size: None, + }), + resolver, + } + } + + /// Option to enforce all `Uri`s have the `http` scheme. + /// + /// Enabled by default. + #[inline] + pub fn enforce_http(&mut self, is_enforced: bool) { + self.config_mut().enforce_http = is_enforced; + } + + /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. + /// + /// If `None`, the option will not be set. + /// + /// Default is `None`. + #[inline] + pub fn set_keepalive(&mut self, dur: Option<Duration>) { + self.config_mut().keep_alive_timeout = dur; + } + + /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. + /// + /// Default is `false`. + #[inline] + pub fn set_nodelay(&mut self, nodelay: bool) { + self.config_mut().nodelay = nodelay; + } + + /// Sets the value of the SO_SNDBUF option on the socket. + #[inline] + pub fn set_send_buffer_size(&mut self, size: Option<usize>) { + self.config_mut().send_buffer_size = size; + } + + /// Sets the value of the SO_RCVBUF option on the socket. + #[inline] + pub fn set_recv_buffer_size(&mut self, size: Option<usize>) { + self.config_mut().recv_buffer_size = size; + } + + /// Set that all sockets are bound to the configured address before connection. + /// + /// If `None`, the sockets will not be bound. + /// + /// Default is `None`. + #[inline] + pub fn set_local_address(&mut self, addr: Option<IpAddr>) { + let (v4, v6) = match addr { + Some(IpAddr::V4(a)) => (Some(a), None), + Some(IpAddr::V6(a)) => (None, Some(a)), + _ => (None, None), + }; + + let cfg = self.config_mut(); + + cfg.local_address_ipv4 = v4; + cfg.local_address_ipv6 = v6; + } + + /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's + /// preferences) before connection. + #[inline] + pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) { + let cfg = self.config_mut(); + + cfg.local_address_ipv4 = Some(addr_ipv4); + cfg.local_address_ipv6 = Some(addr_ipv6); + } + + /// Set the connect timeout. + /// + /// If a domain resolves to multiple IP addresses, the timeout will be + /// evenly divided across them. + /// + /// Default is `None`. + #[inline] + pub fn set_connect_timeout(&mut self, dur: Option<Duration>) { + self.config_mut().connect_timeout = dur; + } + + /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. + /// + /// If hostname resolves to both IPv4 and IPv6 addresses and connection + /// cannot be established using preferred address family before timeout + /// elapses, then connector will in parallel attempt connection using other + /// address family. + /// + /// If `None`, parallel connection attempts are disabled. + /// + /// Default is 300 milliseconds. + /// + /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 + #[inline] + pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) { + self.config_mut().happy_eyeballs_timeout = dur; + } + + /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. + /// + /// Default is `false`. + #[inline] + pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { + self.config_mut().reuse_address = reuse_address; + self + } + + // private + + fn config_mut(&mut self) -> &mut Config { + // If the are HttpConnector clones, this will clone the inner + // config. So mutating the config won't ever affect previous + // clones. + Arc::make_mut(&mut self.config) + } +} + +static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http"; +static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing"; +static INVALID_MISSING_HOST: &str = "invalid URL, host is missing"; + +// R: Debug required for now to allow adding it to debug output later... +impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HttpConnector").finish() + } +} + +impl<R> tower_service::Service<Uri> for HttpConnector<R> +where + R: Resolve + Clone + Send + Sync + 'static, + R::Future: Send, +{ + type Response = TcpStream; + type Error = ConnectError; + type Future = HttpConnecting<R>; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?; + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: Uri) -> Self::Future { + let mut self_ = self.clone(); + HttpConnecting { + fut: Box::pin(async move { self_.call_async(dst).await }), + _marker: PhantomData, + } + } +} + +fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> { + trace!( + "Http::connect; scheme={:?}, host={:?}, port={:?}", + dst.scheme(), + dst.host(), + dst.port(), + ); + + if config.enforce_http { + if dst.scheme() != Some(&Scheme::HTTP) { + return Err(ConnectError { + msg: INVALID_NOT_HTTP.into(), + cause: None, + }); + } + } else if dst.scheme().is_none() { + return Err(ConnectError { + msg: INVALID_MISSING_SCHEME.into(), + cause: None, + }); + } + + let host = match dst.host() { + Some(s) => s, + None => { + return Err(ConnectError { + msg: INVALID_MISSING_HOST.into(), + cause: None, + }) + } + }; + let port = match dst.port() { + Some(port) => port.as_u16(), + None => { + if dst.scheme() == Some(&Scheme::HTTPS) { + 443 + } else { + 80 + } + } + }; + + Ok((host, port)) +} + +impl<R> HttpConnector<R> +where + R: Resolve, +{ + async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> { + let config = &self.config; + + let (host, port) = get_host_port(config, &dst)?; + let host = host.trim_start_matches('[').trim_end_matches(']'); + + // If the host is already an IP addr (v4 or v6), + // skip resolving the dns and start connecting right away. + let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) { + addrs + } else { + let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) + .await + .map_err(ConnectError::dns)?; + let addrs = addrs + .map(|mut addr| { + addr.set_port(port); + addr + }) + .collect(); + dns::SocketAddrs::new(addrs) + }; + + let c = ConnectingTcp::new(addrs, config); + + let sock = c.connect().await?; + + if let Err(e) = sock.set_nodelay(config.nodelay) { + warn!("tcp set_nodelay error: {}", e); + } + + Ok(sock) + } +} + +impl Connection for TcpStream { + fn connected(&self) -> Connected { + let connected = Connected::new(); + if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) { + connected.extra(HttpInfo { remote_addr, local_addr }) + } else { + connected + } + } +} + +impl HttpInfo { + /// Get the remote address of the transport used. + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } + + /// Get the local address of the transport used. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } +} + +pin_project! { + // Not publicly exported (so missing_docs doesn't trigger). + // + // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly + // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot + // (and thus we can change the type in the future). + #[must_use = "futures do nothing unless polled"] + #[allow(missing_debug_implementations)] + pub struct HttpConnecting<R> { + #[pin] + fut: BoxConnecting, + _marker: PhantomData<R>, + } +} + +type ConnectResult = Result<TcpStream, ConnectError>; +type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>; + +impl<R: Resolve> Future for HttpConnecting<R> { + type Output = ConnectResult; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.project().fut.poll(cx) + } +} + +// Not publicly exported (so missing_docs doesn't trigger). +pub struct ConnectError { + msg: Box<str>, + cause: Option<Box<dyn StdError + Send + Sync>>, +} + +impl ConnectError { + fn new<S, E>(msg: S, cause: E) -> ConnectError + where + S: Into<Box<str>>, + E: Into<Box<dyn StdError + Send + Sync>>, + { + ConnectError { + msg: msg.into(), + cause: Some(cause.into()), + } + } + + fn dns<E>(cause: E) -> ConnectError + where + E: Into<Box<dyn StdError + Send + Sync>>, + { + ConnectError::new("dns error", cause) + } + + fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError + where + S: Into<Box<str>>, + E: Into<Box<dyn StdError + Send + Sync>>, + { + move |cause| ConnectError::new(msg, cause) + } +} + +impl fmt::Debug for ConnectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(ref cause) = self.cause { + f.debug_tuple("ConnectError") + .field(&self.msg) + .field(cause) + .finish() + } else { + self.msg.fmt(f) + } + } +} + +impl fmt::Display for ConnectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.msg)?; + + if let Some(ref cause) = self.cause { + write!(f, ": {}", cause)?; + } + + Ok(()) + } +} + +impl StdError for ConnectError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.cause.as_ref().map(|e| &**e as _) + } +} + +struct ConnectingTcp<'a> { + preferred: ConnectingTcpRemote, + fallback: Option<ConnectingTcpFallback>, + config: &'a Config, +} + +impl<'a> ConnectingTcp<'a> { + fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { + if let Some(fallback_timeout) = config.happy_eyeballs_timeout { + let (preferred_addrs, fallback_addrs) = remote_addrs + .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); + if fallback_addrs.is_empty() { + return ConnectingTcp { + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), + fallback: None, + config, + }; + } + + ConnectingTcp { + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), + fallback: Some(ConnectingTcpFallback { + delay: tokio::time::sleep(fallback_timeout), + remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout), + }), + config, + } + } else { + ConnectingTcp { + preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), + fallback: None, + config, + } + } + } +} + +struct ConnectingTcpFallback { + delay: Sleep, + remote: ConnectingTcpRemote, +} + +struct ConnectingTcpRemote { + addrs: dns::SocketAddrs, + connect_timeout: Option<Duration>, +} + +impl ConnectingTcpRemote { + fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self { + let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32)); + + Self { + addrs, + connect_timeout, + } + } +} + +impl ConnectingTcpRemote { + async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> { + let mut err = None; + for addr in &mut self.addrs { + debug!("connecting to {}", addr); + match connect(&addr, config, self.connect_timeout)?.await { + Ok(tcp) => { + debug!("connected to {}", addr); + return Ok(tcp); + } + Err(e) => { + trace!("connect error for {}: {:?}", addr, e); + err = Some(e); + } + } + } + + match err { + Some(e) => Err(e), + None => Err(ConnectError::new( + "tcp connect error", + std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"), + )), + } + } +} + +fn bind_local_address( + socket: &socket2::Socket, + dst_addr: &SocketAddr, + local_addr_ipv4: &Option<Ipv4Addr>, + local_addr_ipv6: &Option<Ipv6Addr>, +) -> io::Result<()> { + match (*dst_addr, local_addr_ipv4, local_addr_ipv6) { + (SocketAddr::V4(_), Some(addr), _) => { + socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?; + } + (SocketAddr::V6(_), _, Some(addr)) => { + socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?; + } + _ => { + if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match *dst_addr { + SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(), + SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(), + }; + socket.bind(&any.into())?; + } + } + } + + Ok(()) +} + +fn connect( + addr: &SocketAddr, + config: &Config, + connect_timeout: Option<Duration>, +) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> { + // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the + // keepalive timeout, it would be nice to use that instead of socket2, + // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance... + use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type}; + use std::convert::TryInto; + + let domain = Domain::for_address(*addr); + let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) + .map_err(ConnectError::m("tcp open error"))?; + + // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is + // responsible for ensuring O_NONBLOCK is set. + socket + .set_nonblocking(true) + .map_err(ConnectError::m("tcp set_nonblocking error"))?; + + if let Some(dur) = config.keep_alive_timeout { + let conf = TcpKeepalive::new().with_time(dur); + if let Err(e) = socket.set_tcp_keepalive(&conf) { + warn!("tcp set_keepalive error: {}", e); + } + } + + bind_local_address( + &socket, + addr, + &config.local_address_ipv4, + &config.local_address_ipv6, + ) + .map_err(ConnectError::m("tcp bind local error"))?; + + #[cfg(unix)] + let socket = unsafe { + // Safety: `from_raw_fd` is only safe to call if ownership of the raw + // file descriptor is transferred. Since we call `into_raw_fd` on the + // socket2 socket, it gives up ownership of the fd and will not close + // it, so this is safe. + use std::os::unix::io::{FromRawFd, IntoRawFd}; + TcpSocket::from_raw_fd(socket.into_raw_fd()) + }; + #[cfg(windows)] + let socket = unsafe { + // Safety: `from_raw_socket` is only safe to call if ownership of the raw + // Windows SOCKET is transferred. Since we call `into_raw_socket` on the + // socket2 socket, it gives up ownership of the SOCKET and will not close + // it, so this is safe. + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + TcpSocket::from_raw_socket(socket.into_raw_socket()) + }; + + if config.reuse_address { + if let Err(e) = socket.set_reuseaddr(true) { + warn!("tcp set_reuse_address error: {}", e); + } + } + + if let Some(size) = config.send_buffer_size { + if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) { + warn!("tcp set_buffer_size error: {}", e); + } + } + + if let Some(size) = config.recv_buffer_size { + if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) { + warn!("tcp set_recv_buffer_size error: {}", e); + } + } + + let connect = socket.connect(*addr); + Ok(async move { + match connect_timeout { + Some(dur) => match tokio::time::timeout(dur, connect).await { + Ok(Ok(s)) => Ok(s), + Ok(Err(e)) => Err(e), + Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)), + }, + None => connect.await, + } + .map_err(ConnectError::m("tcp connect error")) + }) +} + +impl ConnectingTcp<'_> { + async fn connect(mut self) -> Result<TcpStream, ConnectError> { + match self.fallback { + None => self.preferred.connect(self.config).await, + Some(mut fallback) => { + let preferred_fut = self.preferred.connect(self.config); + futures_util::pin_mut!(preferred_fut); + + let fallback_fut = fallback.remote.connect(self.config); + futures_util::pin_mut!(fallback_fut); + + let fallback_delay = fallback.delay; + futures_util::pin_mut!(fallback_delay); + + let (result, future) = + match futures_util::future::select(preferred_fut, fallback_delay).await { + Either::Left((result, _fallback_delay)) => { + (result, Either::Right(fallback_fut)) + } + Either::Right(((), preferred_fut)) => { + // Delay is done, start polling both the preferred and the fallback + futures_util::future::select(preferred_fut, fallback_fut) + .await + .factor_first() + } + }; + + if result.is_err() { + // Fallback to the remaining future (could be preferred or fallback) + // if we get an error + future.await + } else { + result + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::io; + + use ::http::Uri; + + use super::super::sealed::{Connect, ConnectSvc}; + use super::{Config, ConnectError, HttpConnector}; + + async fn connect<C>( + connector: C, + dst: Uri, + ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> + where + C: Connect, + { + connector.connect(super::super::sealed::Internal, dst).await + } + + #[tokio::test] + async fn test_errors_enforce_http() { + let dst = "https://example.domain/foo/bar?baz".parse().unwrap(); + let connector = HttpConnector::new(); + + let err = connect(connector, dst).await.unwrap_err(); + assert_eq!(&*err.msg, super::INVALID_NOT_HTTP); + } + + #[cfg(any(target_os = "linux", target_os = "macos"))] + fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) { + use std::net::{IpAddr, TcpListener}; + + let mut ip_v4 = None; + let mut ip_v6 = None; + + let ips = pnet_datalink::interfaces() + .into_iter() + .flat_map(|i| i.ips.into_iter().map(|n| n.ip())); + + for ip in ips { + match ip { + IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip), + IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip), + _ => (), + } + + if ip_v4.is_some() && ip_v6.is_some() { + break; + } + } + + (ip_v4, ip_v6) + } + + #[tokio::test] + async fn test_errors_missing_scheme() { + let dst = "example.domain".parse().unwrap(); + let mut connector = HttpConnector::new(); + connector.enforce_http(false); + + let err = connect(connector, dst).await.unwrap_err(); + assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME); + } + + // NOTE: pnet crate that we use in this test doesn't compile on Windows + #[cfg(any(target_os = "linux", target_os = "macos"))] + #[tokio::test] + async fn local_address() { + use std::net::{IpAddr, TcpListener}; + let _ = pretty_env_logger::try_init(); + + let (bind_ip_v4, bind_ip_v6) = get_local_ips(); + let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = server4.local_addr().unwrap().port(); + let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap(); + + let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move { + let mut connector = HttpConnector::new(); + + match (bind_ip_v4, bind_ip_v6) { + (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6), + (Some(v4), None) => connector.set_local_address(Some(v4.into())), + (None, Some(v6)) => connector.set_local_address(Some(v6.into())), + _ => unreachable!(), + } + + connect(connector, dst.parse().unwrap()).await.unwrap(); + + let (_, client_addr) = server.accept().unwrap(); + + assert_eq!(client_addr.ip(), expected_ip); + }; + + if let Some(ip) = bind_ip_v4 { + assert_client_ip(format!("http://127.0.0.1:{}", port), server4, ip.into()).await; + } + + if let Some(ip) = bind_ip_v6 { + assert_client_ip(format!("http://[::1]:{}", port), server6, ip.into()).await; + } + } + + #[test] + #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] + fn client_happy_eyeballs() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; + use std::time::{Duration, Instant}; + + use super::dns; + use super::ConnectingTcp; + + let _ = pretty_env_logger::try_init(); + let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server4.local_addr().unwrap(); + let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let local_timeout = Duration::default(); + let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1; + let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1; + let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout) + + Duration::from_millis(250); + + let scenarios = &[ + // Fast primary, without fallback. + (&[local_ipv4_addr()][..], 4, local_timeout, false), + (&[local_ipv6_addr()][..], 6, local_timeout, false), + // Fast primary, with (unused) fallback. + ( + &[local_ipv4_addr(), local_ipv6_addr()][..], + 4, + local_timeout, + false, + ), + ( + &[local_ipv6_addr(), local_ipv4_addr()][..], + 6, + local_timeout, + false, + ), + // Unreachable + fast primary, without fallback. + ( + &[unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, + unreachable_v4_timeout, + false, + ), + ( + &[unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, + unreachable_v6_timeout, + false, + ), + // Unreachable + fast primary, with (unused) fallback. + ( + &[ + unreachable_ipv4_addr(), + local_ipv4_addr(), + local_ipv6_addr(), + ][..], + 4, + unreachable_v4_timeout, + false, + ), + ( + &[ + unreachable_ipv6_addr(), + local_ipv6_addr(), + local_ipv4_addr(), + ][..], + 6, + unreachable_v6_timeout, + true, + ), + // Slow primary, with (used) fallback. + ( + &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], + 6, + fallback_timeout, + false, + ), + ( + &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], + 4, + fallback_timeout, + true, + ), + // Slow primary, with (used) unreachable + fast fallback. + ( + &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, + fallback_timeout + unreachable_v6_timeout, + false, + ), + ( + &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, + fallback_timeout + unreachable_v4_timeout, + true, + ), + ]; + + // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network. + // Otherwise, connection to "slow" IPv6 address will error-out immediately. + let ipv6_accessible = measure_connect(slow_ipv6_addr()).0; + + for &(hosts, family, timeout, needs_ipv6_access) in scenarios { + if needs_ipv6_access && !ipv6_accessible { + continue; + } + + let (start, stream) = rt + .block_on(async move { + let addrs = hosts + .iter() + .map(|host| (host.clone(), addr.port()).into()) + .collect(); + let cfg = Config { + local_address_ipv4: None, + local_address_ipv6: None, + connect_timeout: None, + keep_alive_timeout: None, + happy_eyeballs_timeout: Some(fallback_timeout), + nodelay: false, + reuse_address: false, + enforce_http: false, + send_buffer_size: None, + recv_buffer_size: None, + }; + let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg); + let start = Instant::now(); + Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) + }) + .unwrap(); + let res = if stream.peer_addr().unwrap().is_ipv4() { + 4 + } else { + 6 + }; + let duration = start.elapsed(); + + // Allow actual duration to be +/- 150ms off. + let min_duration = if timeout >= Duration::from_millis(150) { + timeout - Duration::from_millis(150) + } else { + Duration::default() + }; + let max_duration = timeout + Duration::from_millis(150); + + assert_eq!(res, family); + assert!(duration >= min_duration); + assert!(duration <= max_duration); + } + + fn local_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 1).into() + } + + fn local_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into() + } + + fn unreachable_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 2).into() + } + + fn unreachable_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into() + } + + fn slow_ipv4_addr() -> IpAddr { + // RFC 6890 reserved IPv4 address. + Ipv4Addr::new(198, 18, 0, 25).into() + } + + fn slow_ipv6_addr() -> IpAddr { + // RFC 6890 reserved IPv6 address. + Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into() + } + + fn measure_connect(addr: IpAddr) -> (bool, Duration) { + let start = Instant::now(); + let result = + std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1)); + + let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut; + let duration = start.elapsed(); + (reachable, duration) + } + } +} diff --git a/third_party/rust/hyper/src/client/connect/mod.rs b/third_party/rust/hyper/src/client/connect/mod.rs new file mode 100644 index 0000000000..862a0e65c1 --- /dev/null +++ b/third_party/rust/hyper/src/client/connect/mod.rs @@ -0,0 +1,412 @@ +//! Connectors used by the `Client`. +//! +//! This module contains: +//! +//! - A default [`HttpConnector`][] that does DNS resolution and establishes +//! connections over TCP. +//! - Types to build custom connectors. +//! +//! # Connectors +//! +//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and +//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][], +//! and [`Connection`][]. +//! +//! ## Custom Connectors +//! +//! A simple connector that ignores the `Uri` destination and always returns +//! a TCP connection to the same address could be written like this: +//! +//! ```rust,ignore +//! let connector = tower::service_fn(|_dst| async { +//! tokio::net::TcpStream::connect("127.0.0.1:1337") +//! }) +//! ``` +//! +//! Or, fully written out: +//! +//! ``` +//! # #[cfg(feature = "runtime")] +//! # mod rt { +//! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}}; +//! use hyper::{service::Service, Uri}; +//! use tokio::net::TcpStream; +//! +//! #[derive(Clone)] +//! struct LocalConnector; +//! +//! impl Service<Uri> for LocalConnector { +//! type Response = TcpStream; +//! type Error = std::io::Error; +//! // We can't "name" an `async` generated future. +//! type Future = Pin<Box< +//! dyn Future<Output = Result<Self::Response, Self::Error>> + Send +//! >>; +//! +//! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { +//! // This connector is always ready, but others might not be. +//! Poll::Ready(Ok(())) +//! } +//! +//! fn call(&mut self, _: Uri) -> Self::Future { +//! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337)))) +//! } +//! } +//! # } +//! ``` +//! +//! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a +//! better starting place to extend from. +//! +//! Using either of the above connector examples, it can be used with the +//! `Client` like this: +//! +//! ``` +//! # #[cfg(feature = "runtime")] +//! # fn rt () { +//! # let connector = hyper::client::HttpConnector::new(); +//! // let connector = ... +//! +//! let client = hyper::Client::builder() +//! .build::<_, hyper::Body>(connector); +//! # } +//! ``` +//! +//! +//! [`HttpConnector`]: HttpConnector +//! [`Service`]: crate::service::Service +//! [`Uri`]: ::http::Uri +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Connection`]: Connection +use std::fmt; + +use ::http::Extensions; + +cfg_feature! { + #![feature = "tcp"] + + pub use self::http::{HttpConnector, HttpInfo}; + + pub mod dns; + mod http; +} + +cfg_feature! { + #![any(feature = "http1", feature = "http2")] + + pub use self::sealed::Connect; +} + +/// Describes a type returned by a connector. +pub trait Connection { + /// Return metadata describing the connection. + fn connected(&self) -> Connected; +} + +/// Extra information about the connected transport. +/// +/// This can be used to inform recipients about things like if ALPN +/// was used, or if connected to an HTTP proxy. +#[derive(Debug)] +pub struct Connected { + pub(super) alpn: Alpn, + pub(super) is_proxied: bool, + pub(super) extra: Option<Extra>, +} + +pub(super) struct Extra(Box<dyn ExtraInner>); + +#[derive(Clone, Copy, Debug, PartialEq)] +pub(super) enum Alpn { + H2, + None, +} + +impl Connected { + /// Create new `Connected` type with empty metadata. + pub fn new() -> Connected { + Connected { + alpn: Alpn::None, + is_proxied: false, + extra: None, + } + } + + /// Set whether the connected transport is to an HTTP proxy. + /// + /// This setting will affect if HTTP/1 requests written on the transport + /// will have the request-target in absolute-form or origin-form: + /// + /// - When `proxy(false)`: + /// + /// ```http + /// GET /guide HTTP/1.1 + /// ``` + /// + /// - When `proxy(true)`: + /// + /// ```http + /// GET http://hyper.rs/guide HTTP/1.1 + /// ``` + /// + /// Default is `false`. + pub fn proxy(mut self, is_proxied: bool) -> Connected { + self.is_proxied = is_proxied; + self + } + + /// Determines if the connected transport is to an HTTP proxy. + pub fn is_proxied(&self) -> bool { + self.is_proxied + } + + /// Set extra connection information to be set in the extensions of every `Response`. + pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected { + if let Some(prev) = self.extra { + self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra)))); + } else { + self.extra = Some(Extra(Box::new(ExtraEnvelope(extra)))); + } + self + } + + /// Copies the extra connection information into an `Extensions` map. + pub fn get_extras(&self, extensions: &mut Extensions) { + if let Some(extra) = &self.extra { + extra.set(extensions); + } + } + + /// Set that the connected transport negotiated HTTP/2 as its next protocol. + pub fn negotiated_h2(mut self) -> Connected { + self.alpn = Alpn::H2; + self + } + + /// Determines if the connected transport negotiated HTTP/2 as its next protocol. + pub fn is_negotiated_h2(&self) -> bool { + self.alpn == Alpn::H2 + } + + // Don't public expose that `Connected` is `Clone`, unsure if we want to + // keep that contract... + #[cfg(feature = "http2")] + pub(super) fn clone(&self) -> Connected { + Connected { + alpn: self.alpn.clone(), + is_proxied: self.is_proxied, + extra: self.extra.clone(), + } + } +} + +// ===== impl Extra ===== + +impl Extra { + pub(super) fn set(&self, res: &mut Extensions) { + self.0.set(res); + } +} + +impl Clone for Extra { + fn clone(&self) -> Extra { + Extra(self.0.clone_box()) + } +} + +impl fmt::Debug for Extra { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Extra").finish() + } +} + +trait ExtraInner: Send + Sync { + fn clone_box(&self) -> Box<dyn ExtraInner>; + fn set(&self, res: &mut Extensions); +} + +// This indirection allows the `Connected` to have a type-erased "extra" value, +// while that type still knows its inner extra type. This allows the correct +// TypeId to be used when inserting into `res.extensions_mut()`. +#[derive(Clone)] +struct ExtraEnvelope<T>(T); + +impl<T> ExtraInner for ExtraEnvelope<T> +where + T: Clone + Send + Sync + 'static, +{ + fn clone_box(&self) -> Box<dyn ExtraInner> { + Box::new(self.clone()) + } + + fn set(&self, res: &mut Extensions) { + res.insert(self.0.clone()); + } +} + +struct ExtraChain<T>(Box<dyn ExtraInner>, T); + +impl<T: Clone> Clone for ExtraChain<T> { + fn clone(&self) -> Self { + ExtraChain(self.0.clone_box(), self.1.clone()) + } +} + +impl<T> ExtraInner for ExtraChain<T> +where + T: Clone + Send + Sync + 'static, +{ + fn clone_box(&self) -> Box<dyn ExtraInner> { + Box::new(self.clone()) + } + + fn set(&self, res: &mut Extensions) { + self.0.set(res); + res.insert(self.1.clone()); + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +pub(super) mod sealed { + use std::error::Error as StdError; + + use ::http::Uri; + use tokio::io::{AsyncRead, AsyncWrite}; + + use super::Connection; + use crate::common::{Future, Unpin}; + + /// Connect to a destination, returning an IO transport. + /// + /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the + /// ready connection. + /// + /// # Trait Alias + /// + /// This is really just an *alias* for the `tower::Service` trait, with + /// additional bounds set for convenience *inside* hyper. You don't actually + /// implement this trait, but `tower::Service<Uri>` instead. + // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot + // fit the `Connect` bounds because of the blanket impl for `Service`. + pub trait Connect: Sealed + Sized { + #[doc(hidden)] + type _Svc: ConnectSvc; + #[doc(hidden)] + fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future; + } + + pub trait ConnectSvc { + type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static; + type Error: Into<Box<dyn StdError + Send + Sync>>; + type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static; + + fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future; + } + + impl<S, T> Connect for S + where + S: tower_service::Service<Uri, Response = T> + Send + 'static, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + { + type _Svc = S; + + fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> { + crate::service::oneshot(self, dst) + } + } + + impl<S, T> ConnectSvc for S + where + S: tower_service::Service<Uri, Response = T> + Send + 'static, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + { + type Connection = T; + type Error = S::Error; + type Future = crate::service::Oneshot<S, Uri>; + + fn connect(self, _: Internal, dst: Uri) -> Self::Future { + crate::service::oneshot(self, dst) + } + } + + impl<S, T> Sealed for S + where + S: tower_service::Service<Uri, Response = T> + Send, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + S::Future: Unpin + Send, + T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + { + } + + pub trait Sealed {} + #[allow(missing_debug_implementations)] + pub struct Internal; +} + +#[cfg(test)] +mod tests { + use super::Connected; + + #[derive(Clone, Debug, PartialEq)] + struct Ex1(usize); + + #[derive(Clone, Debug, PartialEq)] + struct Ex2(&'static str); + + #[derive(Clone, Debug, PartialEq)] + struct Ex3(&'static str); + + #[test] + fn test_connected_extra() { + let c1 = Connected::new().extra(Ex1(41)); + + let mut ex = ::http::Extensions::new(); + + assert_eq!(ex.get::<Ex1>(), None); + + c1.extra.as_ref().expect("c1 extra").set(&mut ex); + + assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41))); + } + + #[test] + fn test_connected_extra_chain() { + // If a user composes connectors and at each stage, there's "extra" + // info to attach, it shouldn't override the previous extras. + + let c1 = Connected::new() + .extra(Ex1(45)) + .extra(Ex2("zoom")) + .extra(Ex3("pew pew")); + + let mut ex1 = ::http::Extensions::new(); + + assert_eq!(ex1.get::<Ex1>(), None); + assert_eq!(ex1.get::<Ex2>(), None); + assert_eq!(ex1.get::<Ex3>(), None); + + c1.extra.as_ref().expect("c1 extra").set(&mut ex1); + + assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45))); + assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom"))); + assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew"))); + + // Just like extensions, inserting the same type overrides previous type. + let c2 = Connected::new() + .extra(Ex1(33)) + .extra(Ex2("hiccup")) + .extra(Ex1(99)); + + let mut ex2 = ::http::Extensions::new(); + + c2.extra.as_ref().expect("c2 extra").set(&mut ex2); + + assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99))); + assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup"))); + } +} diff --git a/third_party/rust/hyper/src/client/dispatch.rs b/third_party/rust/hyper/src/client/dispatch.rs new file mode 100644 index 0000000000..0d70dbccea --- /dev/null +++ b/third_party/rust/hyper/src/client/dispatch.rs @@ -0,0 +1,436 @@ +#[cfg(feature = "http2")] +use std::future::Future; + +use futures_util::FutureExt; +use tokio::sync::{mpsc, oneshot}; + +#[cfg(feature = "http2")] +use crate::common::Pin; +use crate::common::{task, Poll}; + +pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>; +pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; + +pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { + let (tx, rx) = mpsc::unbounded_channel(); + let (giver, taker) = want::new(); + let tx = Sender { + buffered_once: false, + giver, + inner: tx, + }; + let rx = Receiver { inner: rx, taker }; + (tx, rx) +} + +/// A bounded sender of requests and callbacks for when responses are ready. +/// +/// While the inner sender is unbounded, the Giver is used to determine +/// if the Receiver is ready for another request. +pub(crate) struct Sender<T, U> { + /// One message is always allowed, even if the Receiver hasn't asked + /// for it yet. This boolean keeps track of whether we've sent one + /// without notice. + buffered_once: bool, + /// The Giver helps watch that the the Receiver side has been polled + /// when the queue is empty. This helps us know when a request and + /// response have been fully processed, and a connection is ready + /// for more. + giver: want::Giver, + /// Actually bounded by the Giver, plus `buffered_once`. + inner: mpsc::UnboundedSender<Envelope<T, U>>, +} + +/// An unbounded version. +/// +/// Cannot poll the Giver, but can still use it to determine if the Receiver +/// has been dropped. However, this version can be cloned. +#[cfg(feature = "http2")] +pub(crate) struct UnboundedSender<T, U> { + /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. + giver: want::SharedGiver, + inner: mpsc::UnboundedSender<Envelope<T, U>>, +} + +impl<T, U> Sender<T, U> { + pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { + self.giver + .poll_want(cx) + .map_err(|_| crate::Error::new_closed()) + } + + pub(crate) fn is_ready(&self) -> bool { + self.giver.is_wanting() + } + + pub(crate) fn is_closed(&self) -> bool { + self.giver.is_canceled() + } + + fn can_send(&mut self) -> bool { + if self.giver.give() || !self.buffered_once { + // If the receiver is ready *now*, then of course we can send. + // + // If the receiver isn't ready yet, but we don't have anything + // in the channel yet, then allow one message. + self.buffered_once = true; + true + } else { + false + } + } + + pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { + if !self.can_send() { + return Err(val); + } + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } + + pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { + if !self.can_send() { + return Err(val); + } + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } + + #[cfg(feature = "http2")] + pub(crate) fn unbound(self) -> UnboundedSender<T, U> { + UnboundedSender { + giver: self.giver.shared(), + inner: self.inner, + } + } +} + +#[cfg(feature = "http2")] +impl<T, U> UnboundedSender<T, U> { + pub(crate) fn is_ready(&self) -> bool { + !self.giver.is_canceled() + } + + pub(crate) fn is_closed(&self) -> bool { + self.giver.is_canceled() + } + + pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { + let (tx, rx) = oneshot::channel(); + self.inner + .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) + .map(move |_| rx) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) + } +} + +#[cfg(feature = "http2")] +impl<T, U> Clone for UnboundedSender<T, U> { + fn clone(&self) -> Self { + UnboundedSender { + giver: self.giver.clone(), + inner: self.inner.clone(), + } + } +} + +pub(crate) struct Receiver<T, U> { + inner: mpsc::UnboundedReceiver<Envelope<T, U>>, + taker: want::Taker, +} + +impl<T, U> Receiver<T, U> { + pub(crate) fn poll_recv( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<Option<(T, Callback<T, U>)>> { + match self.inner.poll_recv(cx) { + Poll::Ready(item) => { + Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) + } + Poll::Pending => { + self.taker.want(); + Poll::Pending + } + } + } + + #[cfg(feature = "http1")] + pub(crate) fn close(&mut self) { + self.taker.cancel(); + self.inner.close(); + } + + #[cfg(feature = "http1")] + pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { + match self.inner.recv().now_or_never() { + Some(Some(mut env)) => env.0.take(), + _ => None, + } + } +} + +impl<T, U> Drop for Receiver<T, U> { + fn drop(&mut self) { + // Notify the giver about the closure first, before dropping + // the mpsc::Receiver. + self.taker.cancel(); + } +} + +struct Envelope<T, U>(Option<(T, Callback<T, U>)>); + +impl<T, U> Drop for Envelope<T, U> { + fn drop(&mut self) { + if let Some((val, cb)) = self.0.take() { + cb.send(Err(( + crate::Error::new_canceled().with("connection closed"), + Some(val), + ))); + } + } +} + +pub(crate) enum Callback<T, U> { + Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>), + NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), +} + +impl<T, U> Drop for Callback<T, U> { + fn drop(&mut self) { + // FIXME(nox): What errors do we want here? + let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() { + "user code panicked" + } else { + "runtime dropped the dispatch task" + }); + + match self { + Callback::Retry(tx) => { + if let Some(tx) = tx.take() { + let _ = tx.send(Err((error, None))); + } + } + Callback::NoRetry(tx) => { + if let Some(tx) = tx.take() { + let _ = tx.send(Err(error)); + } + } + } + } +} + +impl<T, U> Callback<T, U> { + #[cfg(feature = "http2")] + pub(crate) fn is_canceled(&self) -> bool { + match *self { + Callback::Retry(Some(ref tx)) => tx.is_closed(), + Callback::NoRetry(Some(ref tx)) => tx.is_closed(), + _ => unreachable!(), + } + } + + pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { + match *self { + Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), + Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), + _ => unreachable!(), + } + } + + pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) { + match self { + Callback::Retry(ref mut tx) => { + let _ = tx.take().unwrap().send(val); + } + Callback::NoRetry(ref mut tx) => { + let _ = tx.take().unwrap().send(val.map_err(|e| e.0)); + } + } + } + + #[cfg(feature = "http2")] + pub(crate) async fn send_when( + self, + mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin, + ) { + use futures_util::future; + use tracing::trace; + + let mut cb = Some(self); + + // "select" on this callback being canceled, and the future completing + future::poll_fn(move |cx| { + match Pin::new(&mut when).poll(cx) { + Poll::Ready(Ok(res)) => { + cb.take().expect("polled after complete").send(Ok(res)); + Poll::Ready(()) + } + Poll::Pending => { + // check if the callback is canceled + ready!(cb.as_mut().unwrap().poll_canceled(cx)); + trace!("send_when canceled"); + Poll::Ready(()) + } + Poll::Ready(Err(err)) => { + cb.take().expect("polled after complete").send(Err(err)); + Poll::Ready(()) + } + } + }) + .await + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "nightly")] + extern crate test; + + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use super::{channel, Callback, Receiver}; + + #[derive(Debug)] + struct Custom(i32); + + impl<T, U> Future for Receiver<T, U> { + type Output = Option<(T, Callback<T, U>)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.poll_recv(cx) + } + } + + /// Helper to check if the future is ready after polling once. + struct PollOnce<'a, F>(&'a mut F); + + impl<F, T> Future for PollOnce<'_, F> + where + F: Future<Output = T> + Unpin, + { + type Output = Option<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(())), + Poll::Pending => Poll::Ready(None), + } + } + } + + #[tokio::test] + async fn drop_receiver_sends_cancel_errors() { + let _ = pretty_env_logger::try_init(); + + let (mut tx, mut rx) = channel::<Custom, ()>(); + + // must poll once for try_send to succeed + assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); + + let promise = tx.try_send(Custom(43)).unwrap(); + drop(rx); + + let fulfilled = promise.await; + let err = fulfilled + .expect("fulfilled") + .expect_err("promise should error"); + match (err.0.kind(), err.1) { + (&crate::error::Kind::Canceled, Some(_)) => (), + e => panic!("expected Error::Cancel(_), found {:?}", e), + } + } + + #[tokio::test] + async fn sender_checks_for_want_on_send() { + let (mut tx, mut rx) = channel::<Custom, ()>(); + + // one is allowed to buffer, second is rejected + let _ = tx.try_send(Custom(1)).expect("1 buffered"); + tx.try_send(Custom(2)).expect_err("2 not ready"); + + assert!(PollOnce(&mut rx).await.is_some(), "rx once"); + + // Even though 1 has been popped, only 1 could be buffered for the + // lifetime of the channel. + tx.try_send(Custom(2)).expect_err("2 still not ready"); + + assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); + + let _ = tx.try_send(Custom(2)).expect("2 ready"); + } + + #[cfg(feature = "http2")] + #[test] + fn unbounded_sender_doesnt_bound_on_want() { + let (tx, rx) = channel::<Custom, ()>(); + let mut tx = tx.unbound(); + + let _ = tx.try_send(Custom(1)).unwrap(); + let _ = tx.try_send(Custom(2)).unwrap(); + let _ = tx.try_send(Custom(3)).unwrap(); + + drop(rx); + + let _ = tx.try_send(Custom(4)).unwrap_err(); + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_throughput(b: &mut test::Bencher) { + use crate::{Body, Request, Response}; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let (mut tx, mut rx) = channel::<Request<Body>, Response<Body>>(); + + b.iter(move || { + let _ = tx.send(Request::default()).unwrap(); + rt.block_on(async { + loop { + let poll_once = PollOnce(&mut rx); + let opt = poll_once.await; + if opt.is_none() { + break; + } + } + }); + }) + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_not_ready(b: &mut test::Bencher) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let (_tx, mut rx) = channel::<i32, ()>(); + b.iter(move || { + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_none()); + }); + }) + } + + #[cfg(feature = "nightly")] + #[bench] + fn giver_queue_cancel(b: &mut test::Bencher) { + let (_tx, mut rx) = channel::<i32, ()>(); + + b.iter(move || { + rx.taker.cancel(); + }) + } +} diff --git a/third_party/rust/hyper/src/client/mod.rs b/third_party/rust/hyper/src/client/mod.rs new file mode 100644 index 0000000000..734bda8819 --- /dev/null +++ b/third_party/rust/hyper/src/client/mod.rs @@ -0,0 +1,68 @@ +//! HTTP Client +//! +//! There are two levels of APIs provided for construct HTTP clients: +//! +//! - The higher-level [`Client`](Client) type. +//! - The lower-level [`conn`](conn) module. +//! +//! # Client +//! +//! The [`Client`](Client) is the main way to send HTTP requests to a server. +//! The default `Client` provides these things on top of the lower-level API: +//! +//! - A default **connector**, able to resolve hostnames and connect to +//! destinations over plain-text TCP. +//! - A **pool** of existing connections, allowing better performance when +//! making multiple requests to the same hostname. +//! - Automatic setting of the `Host` header, based on the request `Uri`. +//! - Automatic request **retries** when a pooled connection is closed by the +//! server before any bytes have been written. +//! +//! Many of these features can configured, by making use of +//! [`Client::builder`](Client::builder). +//! +//! ## Example +//! +//! For a small example program simply fetching a URL, take a look at the +//! [full client example](https://github.com/hyperium/hyper/blob/master/examples/client.rs). +//! +//! ``` +//! # #[cfg(all(feature = "tcp", feature = "client", any(feature = "http1", feature = "http2")))] +//! # async fn fetch_httpbin() -> hyper::Result<()> { +//! use hyper::{body::HttpBody as _, Client, Uri}; +//! +//! let client = Client::new(); +//! +//! // Make a GET /ip to 'http://httpbin.org' +//! let res = client.get(Uri::from_static("http://httpbin.org/ip")).await?; +//! +//! // And then, if the request gets a response... +//! println!("status: {}", res.status()); +//! +//! // Concatenate the body stream into a single buffer... +//! let buf = hyper::body::to_bytes(res).await?; +//! +//! println!("body: {:?}", buf); +//! # Ok(()) +//! # } +//! # fn main () {} +//! ``` + +#[cfg(feature = "tcp")] +pub use self::connect::HttpConnector; + +pub mod connect; +#[cfg(all(test, feature = "runtime"))] +mod tests; + +cfg_feature! { + #![any(feature = "http1", feature = "http2")] + + pub use self::client::{Builder, Client, ResponseFuture}; + + mod client; + pub mod conn; + pub(super) mod dispatch; + mod pool; + pub mod service; +} diff --git a/third_party/rust/hyper/src/client/pool.rs b/third_party/rust/hyper/src/client/pool.rs new file mode 100644 index 0000000000..b9772d688d --- /dev/null +++ b/third_party/rust/hyper/src/client/pool.rs @@ -0,0 +1,1044 @@ +use std::collections::{HashMap, HashSet, VecDeque}; +use std::error::Error as StdError; +use std::fmt; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, Weak}; + +#[cfg(not(feature = "runtime"))] +use std::time::{Duration, Instant}; + +use futures_channel::oneshot; +#[cfg(feature = "runtime")] +use tokio::time::{Duration, Instant, Interval}; +use tracing::{debug, trace}; + +use super::client::Ver; +use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(super) struct Pool<T> { + // If the pool is disabled, this is None. + inner: Option<Arc<Mutex<PoolInner<T>>>>, +} + +// Before using a pooled connection, make sure the sender is not dead. +// +// This is a trait to allow the `client::pool::tests` to work for `i32`. +// +// See https://github.com/hyperium/hyper/issues/1429 +pub(super) trait Poolable: Unpin + Send + Sized + 'static { + fn is_open(&self) -> bool; + /// Reserve this connection. + /// + /// Allows for HTTP/2 to return a shared reservation. + fn reserve(self) -> Reservation<Self>; + fn can_share(&self) -> bool; +} + +/// When checking out a pooled connection, it might be that the connection +/// only supports a single reservation, or it might be usable for many. +/// +/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be +/// used for multiple requests. +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(super) enum Reservation<T> { + /// This connection could be used multiple times, the first one will be + /// reinserted into the `idle` pool, and the second will be given to + /// the `Checkout`. + #[cfg(feature = "http2")] + Shared(T, T), + /// This connection requires unique access. It will be returned after + /// use is complete. + Unique(T), +} + +/// Simple type alias in case the key type needs to be adjusted. +pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; + +struct PoolInner<T> { + // A flag that a connection is being established, and the connection + // should be shared. This prevents making multiple HTTP/2 connections + // to the same host. + connecting: HashSet<Key>, + // These are internal Conns sitting in the event loop in the KeepAlive + // state, waiting to receive a new Request to send on the socket. + idle: HashMap<Key, Vec<Idle<T>>>, + max_idle_per_host: usize, + // These are outstanding Checkouts that are waiting for a socket to be + // able to send a Request one. This is used when "racing" for a new + // connection. + // + // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait + // for the Pool to receive an idle Conn. When a Conn becomes idle, + // this list is checked for any parked Checkouts, and tries to notify + // them that the Conn could be used instead of waiting for a brand new + // connection. + waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, + // A oneshot channel is used to allow the interval to be notified when + // the Pool completely drops. That way, the interval can cancel immediately. + #[cfg(feature = "runtime")] + idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>, + #[cfg(feature = "runtime")] + exec: Exec, + timeout: Option<Duration>, +} + +// This is because `Weak::new()` *allocates* space for `T`, even if it +// doesn't need it! +struct WeakOpt<T>(Option<Weak<T>>); + +#[derive(Clone, Copy, Debug)] +pub(super) struct Config { + pub(super) idle_timeout: Option<Duration>, + pub(super) max_idle_per_host: usize, +} + +impl Config { + pub(super) fn is_enabled(&self) -> bool { + self.max_idle_per_host > 0 + } +} + +impl<T> Pool<T> { + pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> { + let inner = if config.is_enabled() { + Some(Arc::new(Mutex::new(PoolInner { + connecting: HashSet::new(), + idle: HashMap::new(), + #[cfg(feature = "runtime")] + idle_interval_ref: None, + max_idle_per_host: config.max_idle_per_host, + waiters: HashMap::new(), + #[cfg(feature = "runtime")] + exec: __exec.clone(), + timeout: config.idle_timeout, + }))) + } else { + None + }; + + Pool { inner } + } + + fn is_enabled(&self) -> bool { + self.inner.is_some() + } + + #[cfg(test)] + pub(super) fn no_timer(&self) { + // Prevent an actual interval from being created for this pool... + #[cfg(feature = "runtime")] + { + let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); + assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); + let (tx, _) = oneshot::channel(); + inner.idle_interval_ref = Some(tx); + } + } +} + +impl<T: Poolable> Pool<T> { + /// Returns a `Checkout` which is a future that resolves if an idle + /// connection becomes available. + pub(super) fn checkout(&self, key: Key) -> Checkout<T> { + Checkout { + key, + pool: self.clone(), + waiter: None, + } + } + + /// Ensure that there is only ever 1 connecting task for HTTP/2 + /// connections. This does nothing for HTTP/1. + pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { + if ver == Ver::Http2 { + if let Some(ref enabled) = self.inner { + let mut inner = enabled.lock().unwrap(); + return if inner.connecting.insert(key.clone()) { + let connecting = Connecting { + key: key.clone(), + pool: WeakOpt::downgrade(enabled), + }; + Some(connecting) + } else { + trace!("HTTP/2 connecting already in progress for {:?}", key); + None + }; + } + } + + // else + Some(Connecting { + key: key.clone(), + // in HTTP/1's case, there is never a lock, so we don't + // need to do anything in Drop. + pool: WeakOpt::none(), + }) + } + + #[cfg(test)] + fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { + self.inner.as_ref().expect("enabled").lock().expect("lock") + } + + /* Used in client/tests.rs... + #[cfg(feature = "runtime")] + #[cfg(test)] + pub(super) fn h1_key(&self, s: &str) -> Key { + Arc::new(s.to_string()) + } + + #[cfg(feature = "runtime")] + #[cfg(test)] + pub(super) fn idle_count(&self, key: &Key) -> usize { + self + .locked() + .idle + .get(key) + .map(|list| list.len()) + .unwrap_or(0) + } + */ + + pub(super) fn pooled( + &self, + #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>, + value: T, + ) -> Pooled<T> { + let (value, pool_ref) = if let Some(ref enabled) = self.inner { + match value.reserve() { + #[cfg(feature = "http2")] + Reservation::Shared(to_insert, to_return) => { + let mut inner = enabled.lock().unwrap(); + inner.put(connecting.key.clone(), to_insert, enabled); + // Do this here instead of Drop for Connecting because we + // already have a lock, no need to lock the mutex twice. + inner.connected(&connecting.key); + // prevent the Drop of Connecting from repeating inner.connected() + connecting.pool = WeakOpt::none(); + + // Shared reservations don't need a reference to the pool, + // since the pool always keeps a copy. + (to_return, WeakOpt::none()) + } + Reservation::Unique(value) => { + // Unique reservations must take a reference to the pool + // since they hope to reinsert once the reservation is + // completed + (value, WeakOpt::downgrade(enabled)) + } + } + } else { + // If pool is not enabled, skip all the things... + + // The Connecting should have had no pool ref + debug_assert!(connecting.pool.upgrade().is_none()); + + (value, WeakOpt::none()) + }; + Pooled { + key: connecting.key.clone(), + is_reused: false, + pool: pool_ref, + value: Some(value), + } + } + + fn reuse(&self, key: &Key, value: T) -> Pooled<T> { + debug!("reuse idle connection for {:?}", key); + // TODO: unhack this + // In Pool::pooled(), which is used for inserting brand new connections, + // there's some code that adjusts the pool reference taken depending + // on if the Reservation can be shared or is unique. By the time + // reuse() is called, the reservation has already been made, and + // we just have the final value, without knowledge of if this is + // unique or shared. So, the hack is to just assume Ver::Http2 means + // shared... :( + let mut pool_ref = WeakOpt::none(); + if !value.can_share() { + if let Some(ref enabled) = self.inner { + pool_ref = WeakOpt::downgrade(enabled); + } + } + + Pooled { + is_reused: true, + key: key.clone(), + pool: pool_ref, + value: Some(value), + } + } +} + +/// Pop off this list, looking for a usable connection that hasn't expired. +struct IdlePopper<'a, T> { + key: &'a Key, + list: &'a mut Vec<Idle<T>>, +} + +impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { + fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { + while let Some(entry) = self.list.pop() { + // If the connection has been closed, or is older than our idle + // timeout, simply drop it and keep looking... + if !entry.value.is_open() { + trace!("removing closed connection for {:?}", self.key); + continue; + } + // TODO: Actually, since the `idle` list is pushed to the end always, + // that would imply that if *this* entry is expired, then anything + // "earlier" in the list would *have* to be expired also... Right? + // + // In that case, we could just break out of the loop and drop the + // whole list... + if expiration.expires(entry.idle_at) { + trace!("removing expired connection for {:?}", self.key); + continue; + } + + let value = match entry.value.reserve() { + #[cfg(feature = "http2")] + Reservation::Shared(to_reinsert, to_checkout) => { + self.list.push(Idle { + idle_at: Instant::now(), + value: to_reinsert, + }); + to_checkout + } + Reservation::Unique(unique) => unique, + }; + + return Some(Idle { + idle_at: entry.idle_at, + value, + }); + } + + None + } +} + +impl<T: Poolable> PoolInner<T> { + fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { + if value.can_share() && self.idle.contains_key(&key) { + trace!("put; existing idle HTTP/2 connection for {:?}", key); + return; + } + trace!("put; add idle connection for {:?}", key); + let mut remove_waiters = false; + let mut value = Some(value); + if let Some(waiters) = self.waiters.get_mut(&key) { + while let Some(tx) = waiters.pop_front() { + if !tx.is_canceled() { + let reserved = value.take().expect("value already sent"); + let reserved = match reserved.reserve() { + #[cfg(feature = "http2")] + Reservation::Shared(to_keep, to_send) => { + value = Some(to_keep); + to_send + } + Reservation::Unique(uniq) => uniq, + }; + match tx.send(reserved) { + Ok(()) => { + if value.is_none() { + break; + } else { + continue; + } + } + Err(e) => { + value = Some(e); + } + } + } + + trace!("put; removing canceled waiter for {:?}", key); + } + remove_waiters = waiters.is_empty(); + } + if remove_waiters { + self.waiters.remove(&key); + } + + match value { + Some(value) => { + // borrow-check scope... + { + let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); + if self.max_idle_per_host <= idle_list.len() { + trace!("max idle per host for {:?}, dropping connection", key); + return; + } + + debug!("pooling idle connection for {:?}", key); + idle_list.push(Idle { + value, + idle_at: Instant::now(), + }); + } + + #[cfg(feature = "runtime")] + { + self.spawn_idle_interval(__pool_ref); + } + } + None => trace!("put; found waiter for {:?}", key), + } + } + + /// A `Connecting` task is complete. Not necessarily successfully, + /// but the lock is going away, so clean up. + fn connected(&mut self, key: &Key) { + let existed = self.connecting.remove(key); + debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); + // cancel any waiters. if there are any, it's because + // this Connecting task didn't complete successfully. + // those waiters would never receive a connection. + self.waiters.remove(key); + } + + #[cfg(feature = "runtime")] + fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { + let (dur, rx) = { + if self.idle_interval_ref.is_some() { + return; + } + + if let Some(dur) = self.timeout { + let (tx, rx) = oneshot::channel(); + self.idle_interval_ref = Some(tx); + (dur, rx) + } else { + return; + } + }; + + let interval = IdleTask { + interval: tokio::time::interval(dur), + pool: WeakOpt::downgrade(pool_ref), + pool_drop_notifier: rx, + }; + + self.exec.execute(interval); + } +} + +impl<T> PoolInner<T> { + /// Any `FutureResponse`s that were created will have made a `Checkout`, + /// and possibly inserted into the pool that it is waiting for an idle + /// connection. If a user ever dropped that future, we need to clean out + /// those parked senders. + fn clean_waiters(&mut self, key: &Key) { + let mut remove_waiters = false; + if let Some(waiters) = self.waiters.get_mut(key) { + waiters.retain(|tx| !tx.is_canceled()); + remove_waiters = waiters.is_empty(); + } + if remove_waiters { + self.waiters.remove(key); + } + } +} + +#[cfg(feature = "runtime")] +impl<T: Poolable> PoolInner<T> { + /// This should *only* be called by the IdleTask + fn clear_expired(&mut self) { + let dur = self.timeout.expect("interval assumes timeout"); + + let now = Instant::now(); + //self.last_idle_check_at = now; + + self.idle.retain(|key, values| { + values.retain(|entry| { + if !entry.value.is_open() { + trace!("idle interval evicting closed for {:?}", key); + return false; + } + + // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. + if now.saturating_duration_since(entry.idle_at) > dur { + trace!("idle interval evicting expired for {:?}", key); + return false; + } + + // Otherwise, keep this value... + true + }); + + // returning false evicts this key/val + !values.is_empty() + }); + } +} + +impl<T> Clone for Pool<T> { + fn clone(&self) -> Pool<T> { + Pool { + inner: self.inner.clone(), + } + } +} + +/// A wrapped poolable value that tries to reinsert to the Pool on Drop. +// Note: The bounds `T: Poolable` is needed for the Drop impl. +pub(super) struct Pooled<T: Poolable> { + value: Option<T>, + is_reused: bool, + key: Key, + pool: WeakOpt<Mutex<PoolInner<T>>>, +} + +impl<T: Poolable> Pooled<T> { + pub(super) fn is_reused(&self) -> bool { + self.is_reused + } + + pub(super) fn is_pool_enabled(&self) -> bool { + self.pool.0.is_some() + } + + fn as_ref(&self) -> &T { + self.value.as_ref().expect("not dropped") + } + + fn as_mut(&mut self) -> &mut T { + self.value.as_mut().expect("not dropped") + } +} + +impl<T: Poolable> Deref for Pooled<T> { + type Target = T; + fn deref(&self) -> &T { + self.as_ref() + } +} + +impl<T: Poolable> DerefMut for Pooled<T> { + fn deref_mut(&mut self) -> &mut T { + self.as_mut() + } +} + +impl<T: Poolable> Drop for Pooled<T> { + fn drop(&mut self) { + if let Some(value) = self.value.take() { + if !value.is_open() { + // If we *already* know the connection is done here, + // it shouldn't be re-inserted back into the pool. + return; + } + + if let Some(pool) = self.pool.upgrade() { + if let Ok(mut inner) = pool.lock() { + inner.put(self.key.clone(), value, &pool); + } + } else if !value.can_share() { + trace!("pool dropped, dropping pooled ({:?})", self.key); + } + // Ver::Http2 is already in the Pool (or dead), so we wouldn't + // have an actual reference to the Pool. + } + } +} + +impl<T: Poolable> fmt::Debug for Pooled<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Pooled").field("key", &self.key).finish() + } +} + +struct Idle<T> { + idle_at: Instant, + value: T, +} + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(super) struct Checkout<T> { + key: Key, + pool: Pool<T>, + waiter: Option<oneshot::Receiver<T>>, +} + +#[derive(Debug)] +pub(super) struct CheckoutIsClosedError; + +impl StdError for CheckoutIsClosedError {} + +impl fmt::Display for CheckoutIsClosedError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("checked out connection was closed") + } +} + +impl<T: Poolable> Checkout<T> { + fn poll_waiter( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll<Option<crate::Result<Pooled<T>>>> { + if let Some(mut rx) = self.waiter.take() { + match Pin::new(&mut rx).poll(cx) { + Poll::Ready(Ok(value)) => { + if value.is_open() { + Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) + } else { + Poll::Ready(Some(Err( + crate::Error::new_canceled().with(CheckoutIsClosedError) + ))) + } + } + Poll::Pending => { + self.waiter = Some(rx); + Poll::Pending + } + Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( + crate::Error::new_canceled().with("request has been canceled") + ))), + } + } else { + Poll::Ready(None) + } + } + + fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> { + let entry = { + let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); + let expiration = Expiration::new(inner.timeout); + let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { + trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); + // A block to end the mutable borrow on list, + // so the map below can check is_empty() + { + let popper = IdlePopper { + key: &self.key, + list, + }; + popper.pop(&expiration) + } + .map(|e| (e, list.is_empty())) + }); + + let (entry, empty) = if let Some((e, empty)) = maybe_entry { + (Some(e), empty) + } else { + // No entry found means nuke the list for sure. + (None, true) + }; + if empty { + //TODO: This could be done with the HashMap::entry API instead. + inner.idle.remove(&self.key); + } + + if entry.is_none() && self.waiter.is_none() { + let (tx, mut rx) = oneshot::channel(); + trace!("checkout waiting for idle connection: {:?}", self.key); + inner + .waiters + .entry(self.key.clone()) + .or_insert_with(VecDeque::new) + .push_back(tx); + + // register the waker with this oneshot + assert!(Pin::new(&mut rx).poll(cx).is_pending()); + self.waiter = Some(rx); + } + + entry + }; + + entry.map(|e| self.pool.reuse(&self.key, e.value)) + } +} + +impl<T: Poolable> Future for Checkout<T> { + type Output = crate::Result<Pooled<T>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + if let Some(pooled) = ready!(self.poll_waiter(cx)?) { + return Poll::Ready(Ok(pooled)); + } + + if let Some(pooled) = self.checkout(cx) { + Poll::Ready(Ok(pooled)) + } else if !self.pool.is_enabled() { + Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled"))) + } else { + // There's a new waiter, already registered in self.checkout() + debug_assert!(self.waiter.is_some()); + Poll::Pending + } + } +} + +impl<T> Drop for Checkout<T> { + fn drop(&mut self) { + if self.waiter.take().is_some() { + trace!("checkout dropped for {:?}", self.key); + if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) { + inner.clean_waiters(&self.key); + } + } + } +} + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(super) struct Connecting<T: Poolable> { + key: Key, + pool: WeakOpt<Mutex<PoolInner<T>>>, +} + +impl<T: Poolable> Connecting<T> { + pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { + debug_assert!( + self.pool.0.is_none(), + "Connecting::alpn_h2 but already Http2" + ); + + pool.connecting(&self.key, Ver::Http2) + } +} + +impl<T: Poolable> Drop for Connecting<T> { + fn drop(&mut self) { + if let Some(pool) = self.pool.upgrade() { + // No need to panic on drop, that could abort! + if let Ok(mut inner) = pool.lock() { + inner.connected(&self.key); + } + } + } +} + +struct Expiration(Option<Duration>); + +impl Expiration { + fn new(dur: Option<Duration>) -> Expiration { + Expiration(dur) + } + + fn expires(&self, instant: Instant) -> bool { + match self.0 { + // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. + Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout, + None => false, + } + } +} + +#[cfg(feature = "runtime")] +pin_project_lite::pin_project! { + struct IdleTask<T> { + #[pin] + interval: Interval, + pool: WeakOpt<Mutex<PoolInner<T>>>, + // This allows the IdleTask to be notified as soon as the entire + // Pool is fully dropped, and shutdown. This channel is never sent on, + // but Err(Canceled) will be received when the Pool is dropped. + #[pin] + pool_drop_notifier: oneshot::Receiver<crate::common::Never>, + } +} + +#[cfg(feature = "runtime")] +impl<T: Poolable + 'static> Future for IdleTask<T> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + loop { + match this.pool_drop_notifier.as_mut().poll(cx) { + Poll::Ready(Ok(n)) => match n {}, + Poll::Pending => (), + Poll::Ready(Err(_canceled)) => { + trace!("pool closed, canceling idle interval"); + return Poll::Ready(()); + } + } + + ready!(this.interval.as_mut().poll_tick(cx)); + + if let Some(inner) = this.pool.upgrade() { + if let Ok(mut inner) = inner.lock() { + trace!("idle interval checking for expired"); + inner.clear_expired(); + continue; + } + } + return Poll::Ready(()); + } + } +} + +impl<T> WeakOpt<T> { + fn none() -> Self { + WeakOpt(None) + } + + fn downgrade(arc: &Arc<T>) -> Self { + WeakOpt(Some(Arc::downgrade(arc))) + } + + fn upgrade(&self) -> Option<Arc<T>> { + self.0.as_ref().and_then(Weak::upgrade) + } +} + +#[cfg(test)] +mod tests { + use std::task::Poll; + use std::time::Duration; + + use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; + use crate::common::{exec::Exec, task, Future, Pin}; + + /// Test unique reservations. + #[derive(Debug, PartialEq, Eq)] + struct Uniq<T>(T); + + impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { + fn is_open(&self) -> bool { + true + } + + fn reserve(self) -> Reservation<Self> { + Reservation::Unique(self) + } + + fn can_share(&self) -> bool { + false + } + } + + fn c<T: Poolable>(key: Key) -> Connecting<T> { + Connecting { + key, + pool: WeakOpt::none(), + } + } + + fn host_key(s: &str) -> Key { + (http::uri::Scheme::HTTP, s.parse().expect("host key")) + } + + fn pool_no_timer<T>() -> Pool<T> { + pool_max_idle_no_timer(::std::usize::MAX) + } + + fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { + let pool = Pool::new( + super::Config { + idle_timeout: Some(Duration::from_millis(100)), + max_idle_per_host: max_idle, + }, + &Exec::Default, + ); + pool.no_timer(); + pool + } + + #[tokio::test] + async fn test_pool_checkout_smoke() { + let pool = pool_no_timer(); + let key = host_key("foo"); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); + + drop(pooled); + + match pool.checkout(key).await { + Ok(pooled) => assert_eq!(*pooled, Uniq(41)), + Err(_) => panic!("not ready"), + }; + } + + /// Helper to check if the future is ready after polling once. + struct PollOnce<'a, F>(&'a mut F); + + impl<F, T, U> Future for PollOnce<'_, F> + where + F: Future<Output = Result<T, U>> + Unpin, + { + type Output = Option<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Some(())), + Poll::Ready(Err(_)) => Poll::Ready(Some(())), + Poll::Pending => Poll::Ready(None), + } + } + } + + #[tokio::test] + async fn test_pool_checkout_returns_none_if_expired() { + let pool = pool_no_timer(); + let key = host_key("foo"); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); + + drop(pooled); + tokio::time::sleep(pool.locked().timeout.unwrap()).await; + let mut checkout = pool.checkout(key); + let poll_once = PollOnce(&mut checkout); + let is_not_ready = poll_once.await.is_none(); + assert!(is_not_ready); + } + + #[cfg(feature = "runtime")] + #[tokio::test] + async fn test_pool_checkout_removes_expired() { + let pool = pool_no_timer(); + let key = host_key("foo"); + + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); + + assert_eq!( + pool.locked().idle.get(&key).map(|entries| entries.len()), + Some(3) + ); + tokio::time::sleep(pool.locked().timeout.unwrap()).await; + + let mut checkout = pool.checkout(key.clone()); + let poll_once = PollOnce(&mut checkout); + // checkout.await should clean out the expired + poll_once.await; + assert!(pool.locked().idle.get(&key).is_none()); + } + + #[test] + fn test_pool_max_idle_per_host() { + let pool = pool_max_idle_no_timer(2); + let key = host_key("foo"); + + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); + + // pooled and dropped 3, max_idle should only allow 2 + assert_eq!( + pool.locked().idle.get(&key).map(|entries| entries.len()), + Some(2) + ); + } + + #[cfg(feature = "runtime")] + #[tokio::test] + async fn test_pool_timer_removes_expired() { + let _ = pretty_env_logger::try_init(); + tokio::time::pause(); + + let pool = Pool::new( + super::Config { + idle_timeout: Some(Duration::from_millis(10)), + max_idle_per_host: std::usize::MAX, + }, + &Exec::Default, + ); + + let key = host_key("foo"); + + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); + + assert_eq!( + pool.locked().idle.get(&key).map(|entries| entries.len()), + Some(3) + ); + + // Let the timer tick passed the expiration... + tokio::time::advance(Duration::from_millis(30)).await; + // Yield so the Interval can reap... + tokio::task::yield_now().await; + + assert!(pool.locked().idle.get(&key).is_none()); + } + + #[tokio::test] + async fn test_pool_checkout_task_unparked() { + use futures_util::future::join; + use futures_util::FutureExt; + + let pool = pool_no_timer(); + let key = host_key("foo"); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); + + let checkout = join(pool.checkout(key), async { + // the checkout future will park first, + // and then this lazy future will be polled, which will insert + // the pooled back into the pool + // + // this test makes sure that doing so will unpark the checkout + drop(pooled); + }) + .map(|(entry, _)| entry); + + assert_eq!(*checkout.await.unwrap(), Uniq(41)); + } + + #[tokio::test] + async fn test_pool_checkout_drop_cleans_up_waiters() { + let pool = pool_no_timer::<Uniq<i32>>(); + let key = host_key("foo"); + + let mut checkout1 = pool.checkout(key.clone()); + let mut checkout2 = pool.checkout(key.clone()); + + let poll_once1 = PollOnce(&mut checkout1); + let poll_once2 = PollOnce(&mut checkout2); + + // first poll needed to get into Pool's parked + poll_once1.await; + assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); + poll_once2.await; + assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); + + // on drop, clean up Pool + drop(checkout1); + assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); + + drop(checkout2); + assert!(pool.locked().waiters.get(&key).is_none()); + } + + #[derive(Debug)] + struct CanClose { + #[allow(unused)] + val: i32, + closed: bool, + } + + impl Poolable for CanClose { + fn is_open(&self) -> bool { + !self.closed + } + + fn reserve(self) -> Reservation<Self> { + Reservation::Unique(self) + } + + fn can_share(&self) -> bool { + false + } + } + + #[test] + fn pooled_drop_if_closed_doesnt_reinsert() { + let pool = pool_no_timer(); + let key = host_key("foo"); + pool.pooled( + c(key.clone()), + CanClose { + val: 57, + closed: true, + }, + ); + + assert!(!pool.locked().idle.contains_key(&key)); + } +} diff --git a/third_party/rust/hyper/src/client/service.rs b/third_party/rust/hyper/src/client/service.rs new file mode 100644 index 0000000000..406f61edc9 --- /dev/null +++ b/third_party/rust/hyper/src/client/service.rs @@ -0,0 +1,89 @@ +//! Utilities used to interact with the Tower ecosystem. +//! +//! This module provides `Connect` which hook-ins into the Tower ecosystem. + +use std::error::Error as StdError; +use std::future::Future; +use std::marker::PhantomData; + +use tracing::debug; + +use super::conn::{Builder, SendRequest}; +use crate::{ + body::HttpBody, + common::{task, Pin, Poll}, + service::{MakeConnection, Service}, +}; + +/// Creates a connection via `SendRequest`. +/// +/// This accepts a `hyper::client::conn::Builder` and provides +/// a `MakeService` implementation to create connections from some +/// target `T`. +#[derive(Debug)] +pub struct Connect<C, B, T> { + inner: C, + builder: Builder, + _pd: PhantomData<fn(T, B)>, +} + +impl<C, B, T> Connect<C, B, T> { + /// Create a new `Connect` with some inner connector `C` and a connection + /// builder. + pub fn new(inner: C, builder: Builder) -> Self { + Self { + inner, + builder, + _pd: PhantomData, + } + } +} + +impl<C, B, T> Service<T> for Connect<C, B, T> +where + C: MakeConnection<T>, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into<Box<dyn StdError + Send + Sync>> + Send, + B: HttpBody + Unpin + Send + 'static, + B::Data: Send + Unpin, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Response = SendRequest<B>; + type Error = crate::Error; + type Future = + Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + self.inner + .poll_ready(cx) + .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into())) + } + + fn call(&mut self, req: T) -> Self::Future { + let builder = self.builder.clone(); + let io = self.inner.make_connection(req); + + let fut = async move { + match io.await { + Ok(io) => match builder.handshake(io).await { + Ok((sr, conn)) => { + builder.exec.execute(async move { + if let Err(e) = conn.await { + debug!("connection error: {:?}", e); + } + }); + Ok(sr) + } + Err(e) => Err(e), + }, + Err(e) => { + let err = crate::Error::new(crate::error::Kind::Connect).with(e.into()); + Err(err) + } + } + }; + + Box::pin(fut) + } +} diff --git a/third_party/rust/hyper/src/client/tests.rs b/third_party/rust/hyper/src/client/tests.rs new file mode 100644 index 0000000000..0a281a637d --- /dev/null +++ b/third_party/rust/hyper/src/client/tests.rs @@ -0,0 +1,286 @@ +use std::io; + +use futures_util::future; +use tokio::net::TcpStream; + +use super::Client; + +#[tokio::test] +async fn client_connect_uri_argument() { + let connector = tower::service_fn(|dst: http::Uri| { + assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP)); + assert_eq!(dst.host(), Some("example.local")); + assert_eq!(dst.port(), None); + assert_eq!(dst.path(), "/", "path should be removed"); + + future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me")) + }); + + let client = Client::builder().build::<_, crate::Body>(connector); + let _ = client + .get("http://example.local/and/a/path".parse().unwrap()) + .await + .expect_err("response should fail"); +} + +/* +// FIXME: re-implement tests with `async/await` +#[test] +fn retryable_request() { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + let sock1 = connector.mock("http://mock.local"); + let sock2 = connector.mock("http://mock.local"); + + let client = Client::builder() + .build::<_, crate::Body>(connector); + + client.pool.no_timer(); + + { + + let req = Request::builder() + .uri("http://mock.local/a") + .body(Default::default()) + .unwrap(); + let res1 = client.request(req); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e)); + rt.block_on(res1.join(srv1)).expect("res1"); + } + drop(sock1); + + let req = Request::builder() + .uri("http://mock.local/b") + .body(Default::default()) + .unwrap(); + let res2 = client.request(req) + .map(|res| { + assert_eq!(res.status().as_u16(), 222); + }); + let srv2 = poll_fn(|| { + try_ready!(sock2.read(&mut [0u8; 512])); + try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e)); + + rt.block_on(res2.join(srv2)).expect("res2"); +} + +#[test] +fn conn_reset_after_write() { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + let sock1 = connector.mock("http://mock.local"); + + let client = Client::builder() + .build::<_, crate::Body>(connector); + + client.pool.no_timer(); + + { + let req = Request::builder() + .uri("http://mock.local/a") + .body(Default::default()) + .unwrap(); + let res1 = client.request(req); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e)); + rt.block_on(res1.join(srv1)).expect("res1"); + } + + let req = Request::builder() + .uri("http://mock.local/a") + .body(Default::default()) + .unwrap(); + let res2 = client.request(req); + let mut sock1 = Some(sock1); + let srv2 = poll_fn(|| { + // We purposefully keep the socket open until the client + // has written the second request, and THEN disconnect. + // + // Not because we expect servers to be jerks, but to trigger + // state where we write on an assumedly good connection, and + // only reset the close AFTER we wrote bytes. + try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512])); + sock1.take(); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e)); + let err = rt.block_on(res2.join(srv2)).expect_err("res2"); + assert!(err.is_incomplete_message(), "{:?}", err); +} + +#[test] +fn checkout_win_allows_connect_future_to_be_pooled() { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + + let (tx, rx) = oneshot::channel::<()>(); + let sock1 = connector.mock("http://mock.local"); + let sock2 = connector.mock_fut("http://mock.local", rx); + + let client = Client::builder() + .build::<_, crate::Body>(connector); + + client.pool.no_timer(); + + let uri = "http://mock.local/a".parse::<crate::Uri>().expect("uri parse"); + + // First request just sets us up to have a connection able to be put + // back in the pool. *However*, it doesn't insert immediately. The + // body has 1 pending byte, and we will only drain in request 2, once + // the connect future has been started. + let mut body = { + let res1 = client.get(uri.clone()) + .map(|res| res.into_body().concat2()); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + // Chunked is used so as to force 2 body reads. + try_ready!(sock1.write(b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 1\r\nx\r\n\ + 0\r\n\r\n\ + ")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e)); + + rt.block_on(res1.join(srv1)).expect("res1").0 + }; + + + // The second request triggers the only mocked connect future, but then + // the drained body allows the first socket to go back to the pool, + // "winning" the checkout race. + { + let res2 = client.get(uri.clone()); + let drain = poll_fn(move || { + body.poll() + }); + let srv2 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e)); + + rt.block_on(res2.join(drain).join(srv2)).expect("res2"); + } + + // "Release" the mocked connect future, and let the runtime spin once so + // it's all setup... + { + let mut tx = Some(tx); + let client = &client; + let key = client.pool.h1_key("http://mock.local"); + let mut tick_cnt = 0; + let fut = poll_fn(move || { + tx.take(); + + if client.pool.idle_count(&key) == 0 { + tick_cnt += 1; + assert!(tick_cnt < 10, "ticked too many times waiting for idle"); + trace!("no idle yet; tick count: {}", tick_cnt); + ::futures::task::current().notify(); + Ok(Async::NotReady) + } else { + Ok::<_, ()>(Async::Ready(())) + } + }); + rt.block_on(fut).unwrap(); + } + + // Third request just tests out that the "loser" connection was pooled. If + // it isn't, this will panic since the MockConnector doesn't have any more + // mocks to give out. + { + let res3 = client.get(uri); + let srv3 = poll_fn(|| { + try_ready!(sock2.read(&mut [0u8; 512])); + try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv3 poll_fn error: {}", e)); + + rt.block_on(res3.join(srv3)).expect("res3"); + } +} + +#[cfg(feature = "nightly")] +#[bench] +fn bench_http1_get_0b(b: &mut test::Bencher) { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + + let client = Client::builder() + .build::<_, crate::Body>(connector.clone()); + + client.pool.no_timer(); + + let uri = Uri::from_static("http://mock.local/a"); + + b.iter(move || { + let sock1 = connector.mock("http://mock.local"); + let res1 = client + .get(uri.clone()) + .and_then(|res| { + res.into_body().for_each(|_| Ok(())) + }); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e)); + rt.block_on(res1.join(srv1)).expect("res1"); + }); +} + +#[cfg(feature = "nightly")] +#[bench] +fn bench_http1_get_10b(b: &mut test::Bencher) { + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().expect("new rt"); + let mut connector = MockConnector::new(); + + + let client = Client::builder() + .build::<_, crate::Body>(connector.clone()); + + client.pool.no_timer(); + + let uri = Uri::from_static("http://mock.local/a"); + + b.iter(move || { + let sock1 = connector.mock("http://mock.local"); + let res1 = client + .get(uri.clone()) + .and_then(|res| { + res.into_body().for_each(|_| Ok(())) + }); + let srv1 = poll_fn(|| { + try_ready!(sock1.read(&mut [0u8; 512])); + try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n0123456789")); + Ok(Async::Ready(())) + }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e)); + rt.block_on(res1.join(srv1)).expect("res1"); + }); +} +*/ |