summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/client
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/client')
-rw-r--r--third_party/rust/hyper/src/client/client.rs1495
-rw-r--r--third_party/rust/hyper/src/client/conn.rs1113
-rw-r--r--third_party/rust/hyper/src/client/connect/dns.rs425
-rw-r--r--third_party/rust/hyper/src/client/connect/http.rs1007
-rw-r--r--third_party/rust/hyper/src/client/connect/mod.rs412
-rw-r--r--third_party/rust/hyper/src/client/dispatch.rs436
-rw-r--r--third_party/rust/hyper/src/client/mod.rs68
-rw-r--r--third_party/rust/hyper/src/client/pool.rs1044
-rw-r--r--third_party/rust/hyper/src/client/service.rs89
-rw-r--r--third_party/rust/hyper/src/client/tests.rs286
10 files changed, 6375 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/client/client.rs b/third_party/rust/hyper/src/client/client.rs
new file mode 100644
index 0000000000..4425e25899
--- /dev/null
+++ b/third_party/rust/hyper/src/client/client.rs
@@ -0,0 +1,1495 @@
+use std::error::Error as StdError;
+use std::fmt;
+use std::mem;
+use std::time::Duration;
+
+use futures_channel::oneshot;
+use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
+use http::header::{HeaderValue, HOST};
+use http::uri::{Port, Scheme};
+use http::{Method, Request, Response, Uri, Version};
+use tracing::{debug, trace, warn};
+
+use super::conn;
+use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
+use super::pool::{
+ self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
+};
+#[cfg(feature = "tcp")]
+use super::HttpConnector;
+use crate::body::{Body, HttpBody};
+use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
+use crate::rt::Executor;
+
+/// A Client to make outgoing HTTP requests.
+///
+/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
+/// underlying connection pool will be reused.
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+pub struct Client<C, B = Body> {
+ config: Config,
+ conn_builder: conn::Builder,
+ connector: C,
+ pool: Pool<PoolClient<B>>,
+}
+
+#[derive(Clone, Copy, Debug)]
+struct Config {
+ retry_canceled_requests: bool,
+ set_host: bool,
+ ver: Ver,
+}
+
+/// A `Future` that will resolve to an HTTP Response.
+///
+/// This is returned by `Client::request` (and `Client::get`).
+#[must_use = "futures do nothing unless polled"]
+pub struct ResponseFuture {
+ inner: SyncWrapper<Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>>,
+}
+
+// ===== impl Client =====
+
+#[cfg(feature = "tcp")]
+impl Client<HttpConnector, Body> {
+ /// Create a new Client with the default [config](Builder).
+ ///
+ /// # Note
+ ///
+ /// The default connector does **not** handle TLS. Speaking to `https`
+ /// destinations will require [configuring a connector that implements
+ /// TLS](https://hyper.rs/guides/client/configuration).
+ #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))]
+ #[inline]
+ pub fn new() -> Client<HttpConnector, Body> {
+ Builder::default().build_http()
+ }
+}
+
+#[cfg(feature = "tcp")]
+impl Default for Client<HttpConnector, Body> {
+ fn default() -> Client<HttpConnector, Body> {
+ Client::new()
+ }
+}
+
+impl Client<(), Body> {
+ /// Create a builder to configure a new `Client`.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[cfg(feature = "runtime")]
+ /// # fn run () {
+ /// use std::time::Duration;
+ /// use hyper::Client;
+ ///
+ /// let client = Client::builder()
+ /// .pool_idle_timeout(Duration::from_secs(30))
+ /// .http2_only(true)
+ /// .build_http();
+ /// # let infer: Client<_, hyper::Body> = client;
+ /// # drop(infer);
+ /// # }
+ /// # fn main() {}
+ /// ```
+ #[inline]
+ pub fn builder() -> Builder {
+ Builder::default()
+ }
+}
+
+impl<C, B> Client<C, B>
+where
+ C: Connect + Clone + Send + Sync + 'static,
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ /// Send a `GET` request to the supplied `Uri`.
+ ///
+ /// # Note
+ ///
+ /// This requires that the `HttpBody` type have a `Default` implementation.
+ /// It *should* return an "empty" version of itself, such that
+ /// `HttpBody::is_end_stream` is `true`.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[cfg(feature = "runtime")]
+ /// # fn run () {
+ /// use hyper::{Client, Uri};
+ ///
+ /// let client = Client::new();
+ ///
+ /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
+ /// # }
+ /// # fn main() {}
+ /// ```
+ pub fn get(&self, uri: Uri) -> ResponseFuture
+ where
+ B: Default,
+ {
+ let body = B::default();
+ if !body.is_end_stream() {
+ warn!("default HttpBody used for get() does not return true for is_end_stream");
+ }
+
+ let mut req = Request::new(body);
+ *req.uri_mut() = uri;
+ self.request(req)
+ }
+
+ /// Send a constructed `Request` using this `Client`.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[cfg(feature = "runtime")]
+ /// # fn run () {
+ /// use hyper::{Body, Method, Client, Request};
+ ///
+ /// let client = Client::new();
+ ///
+ /// let req = Request::builder()
+ /// .method(Method::POST)
+ /// .uri("http://httpbin.org/post")
+ /// .body(Body::from("Hallo!"))
+ /// .expect("request builder");
+ ///
+ /// let future = client.request(req);
+ /// # }
+ /// # fn main() {}
+ /// ```
+ pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
+ let is_http_connect = req.method() == Method::CONNECT;
+ match req.version() {
+ Version::HTTP_11 => (),
+ Version::HTTP_10 => {
+ if is_http_connect {
+ warn!("CONNECT is not allowed for HTTP/1.0");
+ return ResponseFuture::new(future::err(
+ crate::Error::new_user_unsupported_request_method(),
+ ));
+ }
+ }
+ Version::HTTP_2 => (),
+ // completely unsupported HTTP version (like HTTP/0.9)!
+ other => return ResponseFuture::error_version(other),
+ };
+
+ let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
+ Ok(s) => s,
+ Err(err) => {
+ return ResponseFuture::new(future::err(err));
+ }
+ };
+
+ ResponseFuture::new(self.clone().retryably_send_request(req, pool_key))
+ }
+
+ async fn retryably_send_request(
+ self,
+ mut req: Request<B>,
+ pool_key: PoolKey,
+ ) -> crate::Result<Response<Body>> {
+ let uri = req.uri().clone();
+
+ loop {
+ req = match self.send_request(req, pool_key.clone()).await {
+ Ok(resp) => return Ok(resp),
+ Err(ClientError::Normal(err)) => return Err(err),
+ Err(ClientError::Canceled {
+ connection_reused,
+ mut req,
+ reason,
+ }) => {
+ if !self.config.retry_canceled_requests || !connection_reused {
+ // if client disabled, don't retry
+ // a fresh connection means we definitely can't retry
+ return Err(reason);
+ }
+
+ trace!(
+ "unstarted request canceled, trying again (reason={:?})",
+ reason
+ );
+ *req.uri_mut() = uri.clone();
+ req
+ }
+ }
+ }
+ }
+
+ async fn send_request(
+ &self,
+ mut req: Request<B>,
+ pool_key: PoolKey,
+ ) -> Result<Response<Body>, ClientError<B>> {
+ let mut pooled = match self.connection_for(pool_key).await {
+ Ok(pooled) => pooled,
+ Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
+ Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
+ return Err(ClientError::Canceled {
+ connection_reused: true,
+ req,
+ reason,
+ })
+ }
+ };
+
+ if pooled.is_http1() {
+ if req.version() == Version::HTTP_2 {
+ warn!("Connection is HTTP/1, but request requires HTTP/2");
+ return Err(ClientError::Normal(
+ crate::Error::new_user_unsupported_version(),
+ ));
+ }
+
+ if self.config.set_host {
+ let uri = req.uri().clone();
+ req.headers_mut().entry(HOST).or_insert_with(|| {
+ let hostname = uri.host().expect("authority implies host");
+ if let Some(port) = get_non_default_port(&uri) {
+ let s = format!("{}:{}", hostname, port);
+ HeaderValue::from_str(&s)
+ } else {
+ HeaderValue::from_str(hostname)
+ }
+ .expect("uri host is valid header value")
+ });
+ }
+
+ // CONNECT always sends authority-form, so check it first...
+ if req.method() == Method::CONNECT {
+ authority_form(req.uri_mut());
+ } else if pooled.conn_info.is_proxied {
+ absolute_form(req.uri_mut());
+ } else {
+ origin_form(req.uri_mut());
+ }
+ } else if req.method() == Method::CONNECT {
+ authority_form(req.uri_mut());
+ }
+
+ let fut = pooled
+ .send_request_retryable(req)
+ .map_err(ClientError::map_with_reused(pooled.is_reused()));
+
+ // If the Connector included 'extra' info, add to Response...
+ let extra_info = pooled.conn_info.extra.clone();
+ let fut = fut.map_ok(move |mut res| {
+ if let Some(extra) = extra_info {
+ extra.set(res.extensions_mut());
+ }
+ res
+ });
+
+ // As of futures@0.1.21, there is a race condition in the mpsc
+ // channel, such that sending when the receiver is closing can
+ // result in the message being stuck inside the queue. It won't
+ // ever notify until the Sender side is dropped.
+ //
+ // To counteract this, we must check if our senders 'want' channel
+ // has been closed after having tried to send. If so, error out...
+ if pooled.is_closed() {
+ return fut.await;
+ }
+
+ let mut res = fut.await?;
+
+ // If pooled is HTTP/2, we can toss this reference immediately.
+ //
+ // when pooled is dropped, it will try to insert back into the
+ // pool. To delay that, spawn a future that completes once the
+ // sender is ready again.
+ //
+ // This *should* only be once the related `Connection` has polled
+ // for a new request to start.
+ //
+ // It won't be ready if there is a body to stream.
+ if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
+ drop(pooled);
+ } else if !res.body().is_end_stream() {
+ let (delayed_tx, delayed_rx) = oneshot::channel();
+ res.body_mut().delayed_eof(delayed_rx);
+ let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
+ // At this point, `pooled` is dropped, and had a chance
+ // to insert into the pool (if conn was idle)
+ drop(delayed_tx);
+ });
+
+ self.conn_builder.exec.execute(on_idle);
+ } else {
+ // There's no body to delay, but the connection isn't
+ // ready yet. Only re-insert when it's ready
+ let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
+
+ self.conn_builder.exec.execute(on_idle);
+ }
+
+ Ok(res)
+ }
+
+ async fn connection_for(
+ &self,
+ pool_key: PoolKey,
+ ) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
+ // This actually races 2 different futures to try to get a ready
+ // connection the fastest, and to reduce connection churn.
+ //
+ // - If the pool has an idle connection waiting, that's used
+ // immediately.
+ // - Otherwise, the Connector is asked to start connecting to
+ // the destination Uri.
+ // - Meanwhile, the pool Checkout is watching to see if any other
+ // request finishes and tries to insert an idle connection.
+ // - If a new connection is started, but the Checkout wins after
+ // (an idle connection became available first), the started
+ // connection future is spawned into the runtime to complete,
+ // and then be inserted into the pool as an idle connection.
+ let checkout = self.pool.checkout(pool_key.clone());
+ let connect = self.connect_to(pool_key);
+ let is_ver_h2 = self.config.ver == Ver::Http2;
+
+ // The order of the `select` is depended on below...
+
+ match future::select(checkout, connect).await {
+ // Checkout won, connect future may have been started or not.
+ //
+ // If it has, let it finish and insert back into the pool,
+ // so as to not waste the socket...
+ Either::Left((Ok(checked_out), connecting)) => {
+ // This depends on the `select` above having the correct
+ // order, such that if the checkout future were ready
+ // immediately, the connect future will never have been
+ // started.
+ //
+ // If it *wasn't* ready yet, then the connect future will
+ // have been started...
+ if connecting.started() {
+ let bg = connecting
+ .map_err(|err| {
+ trace!("background connect error: {}", err);
+ })
+ .map(|_pooled| {
+ // dropping here should just place it in
+ // the Pool for us...
+ });
+ // An execute error here isn't important, we're just trying
+ // to prevent a waste of a socket...
+ self.conn_builder.exec.execute(bg);
+ }
+ Ok(checked_out)
+ }
+ // Connect won, checkout can just be dropped.
+ Either::Right((Ok(connected), _checkout)) => Ok(connected),
+ // Either checkout or connect could get canceled:
+ //
+ // 1. Connect is canceled if this is HTTP/2 and there is
+ // an outstanding HTTP/2 connecting task.
+ // 2. Checkout is canceled if the pool cannot deliver an
+ // idle connection reliably.
+ //
+ // In both cases, we should just wait for the other future.
+ Either::Left((Err(err), connecting)) => {
+ if err.is_canceled() {
+ connecting.await.map_err(ClientConnectError::Normal)
+ } else {
+ Err(ClientConnectError::Normal(err))
+ }
+ }
+ Either::Right((Err(err), checkout)) => {
+ if err.is_canceled() {
+ checkout.await.map_err(move |err| {
+ if is_ver_h2
+ && err.is_canceled()
+ && err.find_source::<CheckoutIsClosedError>().is_some()
+ {
+ ClientConnectError::H2CheckoutIsClosed(err)
+ } else {
+ ClientConnectError::Normal(err)
+ }
+ })
+ } else {
+ Err(ClientConnectError::Normal(err))
+ }
+ }
+ }
+ }
+
+ fn connect_to(
+ &self,
+ pool_key: PoolKey,
+ ) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
+ let executor = self.conn_builder.exec.clone();
+ let pool = self.pool.clone();
+ #[cfg(not(feature = "http2"))]
+ let conn_builder = self.conn_builder.clone();
+ #[cfg(feature = "http2")]
+ let mut conn_builder = self.conn_builder.clone();
+ let ver = self.config.ver;
+ let is_ver_h2 = ver == Ver::Http2;
+ let connector = self.connector.clone();
+ let dst = domain_as_uri(pool_key.clone());
+ hyper_lazy(move || {
+ // Try to take a "connecting lock".
+ //
+ // If the pool_key is for HTTP/2, and there is already a
+ // connection being established, then this can't take a
+ // second lock. The "connect_to" future is Canceled.
+ let connecting = match pool.connecting(&pool_key, ver) {
+ Some(lock) => lock,
+ None => {
+ let canceled =
+ crate::Error::new_canceled().with("HTTP/2 connection in progress");
+ return Either::Right(future::err(canceled));
+ }
+ };
+ Either::Left(
+ connector
+ .connect(connect::sealed::Internal, dst)
+ .map_err(crate::Error::new_connect)
+ .and_then(move |io| {
+ let connected = io.connected();
+ // If ALPN is h2 and we aren't http2_only already,
+ // then we need to convert our pool checkout into
+ // a single HTTP2 one.
+ let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
+ match connecting.alpn_h2(&pool) {
+ Some(lock) => {
+ trace!("ALPN negotiated h2, updating pool");
+ lock
+ }
+ None => {
+ // Another connection has already upgraded,
+ // the pool checkout should finish up for us.
+ let canceled = crate::Error::new_canceled()
+ .with("ALPN upgraded to HTTP/2");
+ return Either::Right(future::err(canceled));
+ }
+ }
+ } else {
+ connecting
+ };
+
+ #[cfg_attr(not(feature = "http2"), allow(unused))]
+ let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
+ #[cfg(feature = "http2")]
+ {
+ conn_builder.http2_only(is_h2);
+ }
+
+ Either::Left(Box::pin(async move {
+ let (tx, conn) = conn_builder.handshake(io).await?;
+
+ trace!("handshake complete, spawning background dispatcher task");
+ executor.execute(
+ conn.map_err(|e| debug!("client connection error: {}", e))
+ .map(|_| ()),
+ );
+
+ // Wait for 'conn' to ready up before we
+ // declare this tx as usable
+ let tx = tx.when_ready().await?;
+
+ let tx = {
+ #[cfg(feature = "http2")]
+ {
+ if is_h2 {
+ PoolTx::Http2(tx.into_http2())
+ } else {
+ PoolTx::Http1(tx)
+ }
+ }
+ #[cfg(not(feature = "http2"))]
+ PoolTx::Http1(tx)
+ };
+
+ Ok(pool.pooled(
+ connecting,
+ PoolClient {
+ conn_info: connected,
+ tx,
+ },
+ ))
+ }))
+ }),
+ )
+ })
+ }
+}
+
+impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
+where
+ C: Connect + Clone + Send + Sync + 'static,
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Response = Response<Body>;
+ type Error = crate::Error;
+ type Future = ResponseFuture;
+
+ fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<B>) -> Self::Future {
+ self.request(req)
+ }
+}
+
+impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
+where
+ C: Connect + Clone + Send + Sync + 'static,
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Response = Response<Body>;
+ type Error = crate::Error;
+ type Future = ResponseFuture;
+
+ fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<B>) -> Self::Future {
+ self.request(req)
+ }
+}
+
+impl<C: Clone, B> Clone for Client<C, B> {
+ fn clone(&self) -> Client<C, B> {
+ Client {
+ config: self.config.clone(),
+ conn_builder: self.conn_builder.clone(),
+ connector: self.connector.clone(),
+ pool: self.pool.clone(),
+ }
+ }
+}
+
+impl<C, B> fmt::Debug for Client<C, B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Client").finish()
+ }
+}
+
+// ===== impl ResponseFuture =====
+
+impl ResponseFuture {
+ fn new<F>(value: F) -> Self
+ where
+ F: Future<Output = crate::Result<Response<Body>>> + Send + 'static,
+ {
+ Self {
+ inner: SyncWrapper::new(Box::pin(value))
+ }
+ }
+
+ fn error_version(ver: Version) -> Self {
+ warn!("Request has unsupported version \"{:?}\"", ver);
+ ResponseFuture::new(Box::pin(future::err(
+ crate::Error::new_user_unsupported_version(),
+ )))
+ }
+}
+
+impl fmt::Debug for ResponseFuture {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Future<Response>")
+ }
+}
+
+impl Future for ResponseFuture {
+ type Output = crate::Result<Response<Body>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.inner.get_mut().as_mut().poll(cx)
+ }
+}
+
+// ===== impl PoolClient =====
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+struct PoolClient<B> {
+ conn_info: Connected,
+ tx: PoolTx<B>,
+}
+
+enum PoolTx<B> {
+ Http1(conn::SendRequest<B>),
+ #[cfg(feature = "http2")]
+ Http2(conn::Http2SendRequest<B>),
+}
+
+impl<B> PoolClient<B> {
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ match self.tx {
+ PoolTx::Http1(ref mut tx) => tx.poll_ready(cx),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(_) => Poll::Ready(Ok(())),
+ }
+ }
+
+ fn is_http1(&self) -> bool {
+ !self.is_http2()
+ }
+
+ fn is_http2(&self) -> bool {
+ match self.tx {
+ PoolTx::Http1(_) => false,
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(_) => true,
+ }
+ }
+
+ fn is_ready(&self) -> bool {
+ match self.tx {
+ PoolTx::Http1(ref tx) => tx.is_ready(),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(ref tx) => tx.is_ready(),
+ }
+ }
+
+ fn is_closed(&self) -> bool {
+ match self.tx {
+ PoolTx::Http1(ref tx) => tx.is_closed(),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(ref tx) => tx.is_closed(),
+ }
+ }
+}
+
+impl<B: HttpBody + 'static> PoolClient<B> {
+ fn send_request_retryable(
+ &mut self,
+ req: Request<B>,
+ ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
+ where
+ B: Send,
+ {
+ match self.tx {
+ #[cfg(not(feature = "http2"))]
+ PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
+ #[cfg(feature = "http2")]
+ PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
+ }
+ }
+}
+
+impl<B> Poolable for PoolClient<B>
+where
+ B: Send + 'static,
+{
+ fn is_open(&self) -> bool {
+ match self.tx {
+ PoolTx::Http1(ref tx) => tx.is_ready(),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(ref tx) => tx.is_ready(),
+ }
+ }
+
+ fn reserve(self) -> Reservation<Self> {
+ match self.tx {
+ PoolTx::Http1(tx) => Reservation::Unique(PoolClient {
+ conn_info: self.conn_info,
+ tx: PoolTx::Http1(tx),
+ }),
+ #[cfg(feature = "http2")]
+ PoolTx::Http2(tx) => {
+ let b = PoolClient {
+ conn_info: self.conn_info.clone(),
+ tx: PoolTx::Http2(tx.clone()),
+ };
+ let a = PoolClient {
+ conn_info: self.conn_info,
+ tx: PoolTx::Http2(tx),
+ };
+ Reservation::Shared(a, b)
+ }
+ }
+ }
+
+ fn can_share(&self) -> bool {
+ self.is_http2()
+ }
+}
+
+// ===== impl ClientError =====
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+enum ClientError<B> {
+ Normal(crate::Error),
+ Canceled {
+ connection_reused: bool,
+ req: Request<B>,
+ reason: crate::Error,
+ },
+}
+
+impl<B> ClientError<B> {
+ fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
+ move |(err, orig_req)| {
+ if let Some(req) = orig_req {
+ ClientError::Canceled {
+ connection_reused: conn_reused,
+ reason: err,
+ req,
+ }
+ } else {
+ ClientError::Normal(err)
+ }
+ }
+ }
+}
+
+enum ClientConnectError {
+ Normal(crate::Error),
+ H2CheckoutIsClosed(crate::Error),
+}
+
+/// A marker to identify what version a pooled connection is.
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub(super) enum Ver {
+ Auto,
+ Http2,
+}
+
+fn origin_form(uri: &mut Uri) {
+ let path = match uri.path_and_query() {
+ Some(path) if path.as_str() != "/" => {
+ let mut parts = ::http::uri::Parts::default();
+ parts.path_and_query = Some(path.clone());
+ Uri::from_parts(parts).expect("path is valid uri")
+ }
+ _none_or_just_slash => {
+ debug_assert!(Uri::default() == "/");
+ Uri::default()
+ }
+ };
+ *uri = path
+}
+
+fn absolute_form(uri: &mut Uri) {
+ debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
+ debug_assert!(
+ uri.authority().is_some(),
+ "absolute_form needs an authority"
+ );
+ // If the URI is to HTTPS, and the connector claimed to be a proxy,
+ // then it *should* have tunneled, and so we don't want to send
+ // absolute-form in that case.
+ if uri.scheme() == Some(&Scheme::HTTPS) {
+ origin_form(uri);
+ }
+}
+
+fn authority_form(uri: &mut Uri) {
+ if let Some(path) = uri.path_and_query() {
+ // `https://hyper.rs` would parse with `/` path, don't
+ // annoy people about that...
+ if path != "/" {
+ warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
+ }
+ }
+ *uri = match uri.authority() {
+ Some(auth) => {
+ let mut parts = ::http::uri::Parts::default();
+ parts.authority = Some(auth.clone());
+ Uri::from_parts(parts).expect("authority is valid")
+ }
+ None => {
+ unreachable!("authority_form with relative uri");
+ }
+ };
+}
+
+fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> crate::Result<PoolKey> {
+ let uri_clone = uri.clone();
+ match (uri_clone.scheme(), uri_clone.authority()) {
+ (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
+ (None, Some(auth)) if is_http_connect => {
+ let scheme = match auth.port_u16() {
+ Some(443) => {
+ set_scheme(uri, Scheme::HTTPS);
+ Scheme::HTTPS
+ }
+ _ => {
+ set_scheme(uri, Scheme::HTTP);
+ Scheme::HTTP
+ }
+ };
+ Ok((scheme, auth.clone()))
+ }
+ _ => {
+ debug!("Client requires absolute-form URIs, received: {:?}", uri);
+ Err(crate::Error::new_user_absolute_uri_required())
+ }
+ }
+}
+
+fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
+ http::uri::Builder::new()
+ .scheme(scheme)
+ .authority(auth)
+ .path_and_query("/")
+ .build()
+ .expect("domain is valid Uri")
+}
+
+fn set_scheme(uri: &mut Uri, scheme: Scheme) {
+ debug_assert!(
+ uri.scheme().is_none(),
+ "set_scheme expects no existing scheme"
+ );
+ let old = mem::replace(uri, Uri::default());
+ let mut parts: ::http::uri::Parts = old.into();
+ parts.scheme = Some(scheme);
+ parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
+ *uri = Uri::from_parts(parts).expect("scheme is valid");
+}
+
+fn get_non_default_port(uri: &Uri) -> Option<Port<&str>> {
+ match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
+ (Some(443), true) => None,
+ (Some(80), false) => None,
+ _ => uri.port(),
+ }
+}
+
+fn is_schema_secure(uri: &Uri) -> bool {
+ uri.scheme_str()
+ .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
+ .unwrap_or_default()
+}
+
+/// A builder to configure a new [`Client`](Client).
+///
+/// # Example
+///
+/// ```
+/// # #[cfg(feature = "runtime")]
+/// # fn run () {
+/// use std::time::Duration;
+/// use hyper::Client;
+///
+/// let client = Client::builder()
+/// .pool_idle_timeout(Duration::from_secs(30))
+/// .http2_only(true)
+/// .build_http();
+/// # let infer: Client<_, hyper::Body> = client;
+/// # drop(infer);
+/// # }
+/// # fn main() {}
+/// ```
+#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
+#[derive(Clone)]
+pub struct Builder {
+ client_config: Config,
+ conn_builder: conn::Builder,
+ pool_config: pool::Config,
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self {
+ client_config: Config {
+ retry_canceled_requests: true,
+ set_host: true,
+ ver: Ver::Auto,
+ },
+ conn_builder: conn::Builder::new(),
+ pool_config: pool::Config {
+ idle_timeout: Some(Duration::from_secs(90)),
+ max_idle_per_host: std::usize::MAX,
+ },
+ }
+ }
+}
+
+impl Builder {
+ #[doc(hidden)]
+ #[deprecated(
+ note = "name is confusing, to disable the connection pool, call pool_max_idle_per_host(0)"
+ )]
+ pub fn keep_alive(&mut self, val: bool) -> &mut Self {
+ if !val {
+ // disable
+ self.pool_max_idle_per_host(0)
+ } else if self.pool_config.max_idle_per_host == 0 {
+ // enable
+ self.pool_max_idle_per_host(std::usize::MAX)
+ } else {
+ // already enabled
+ self
+ }
+ }
+
+ #[doc(hidden)]
+ #[deprecated(note = "renamed to `pool_idle_timeout`")]
+ pub fn keep_alive_timeout<D>(&mut self, val: D) -> &mut Self
+ where
+ D: Into<Option<Duration>>,
+ {
+ self.pool_idle_timeout(val)
+ }
+
+ /// Set an optional timeout for idle sockets being kept-alive.
+ ///
+ /// Pass `None` to disable timeout.
+ ///
+ /// Default is 90 seconds.
+ pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
+ where
+ D: Into<Option<Duration>>,
+ {
+ self.pool_config.idle_timeout = val.into();
+ self
+ }
+
+ #[doc(hidden)]
+ #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
+ pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
+ self.pool_config.max_idle_per_host = max_idle;
+ self
+ }
+
+ /// Sets the maximum idle connection per host allowed in the pool.
+ ///
+ /// Default is `usize::MAX` (no limit).
+ pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
+ self.pool_config.max_idle_per_host = max_idle;
+ self
+ }
+
+ // HTTP/1 options
+
+ /// Sets the exact size of the read buffer to *always* use.
+ ///
+ /// Note that setting this option unsets the `http1_max_buf_size` option.
+ ///
+ /// Default is an adaptive read buffer.
+ pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
+ self.conn_builder.http1_read_buf_exact_size(Some(sz));
+ self
+ }
+
+ /// Set the maximum buffer size for the connection.
+ ///
+ /// Default is ~400kb.
+ ///
+ /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
+ ///
+ /// # Panics
+ ///
+ /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
+ self.conn_builder.http1_max_buf_size(max);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will accept spaces between header names
+ /// and the colon that follow them in responses.
+ ///
+ /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
+ /// parsing.
+ ///
+ /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
+ /// to say about it:
+ ///
+ /// > No whitespace is allowed between the header field-name and colon. In
+ /// > the past, differences in the handling of such whitespace have led to
+ /// > security vulnerabilities in request routing and response handling. A
+ /// > server MUST reject any received request message that contains
+ /// > whitespace between a header field-name and colon with a response code
+ /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
+ /// > response message before forwarding the message downstream.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ ///
+ /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
+ pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
+ self.conn_builder
+ .http1_allow_spaces_after_header_name_in_responses(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will accept obsolete line folding for
+ /// header values.
+ ///
+ /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
+ /// to say about it:
+ ///
+ /// > A server that receives an obs-fold in a request message that is not
+ /// > within a message/http container MUST either reject the message by
+ /// > sending a 400 (Bad Request), preferably with a representation
+ /// > explaining that obsolete line folding is unacceptable, or replace
+ /// > each received obs-fold with one or more SP octets prior to
+ /// > interpreting the field value or forwarding the message downstream.
+ ///
+ /// > A proxy or gateway that receives an obs-fold in a response message
+ /// > that is not within a message/http container MUST either discard the
+ /// > message and replace it with a 502 (Bad Gateway) response, preferably
+ /// > with a representation explaining that unacceptable line folding was
+ /// > received, or replace each received obs-fold with one or more SP
+ /// > octets prior to interpreting the field value or forwarding the
+ /// > message downstream.
+ ///
+ /// > A user agent that receives an obs-fold in a response message that is
+ /// > not within a message/http container MUST replace each received
+ /// > obs-fold with one or more SP octets prior to interpreting the field
+ /// > value.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ ///
+ /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
+ pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
+ self.conn_builder
+ .http1_allow_obsolete_multiline_headers_in_responses(val);
+ self
+ }
+
+ /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
+ ///
+ /// This mimicks the behaviour of major browsers. You probably don't want this.
+ /// You should only want this if you are implementing a proxy whose main
+ /// purpose is to sit in front of browsers whose users access arbitrary content
+ /// which may be malformed, and they expect everything that works without
+ /// the proxy to keep working with the proxy.
+ ///
+ /// This option will prevent Hyper's client from returning an error encountered
+ /// when parsing a header, except if the error was caused by the character NUL
+ /// (ASCII code 0), as Chrome specifically always reject those.
+ ///
+ /// The ignorable errors are:
+ /// * empty header names;
+ /// * characters that are not allowed in header names, except for `\0` and `\r`;
+ /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
+ /// spaces and tabs between the header name and the colon;
+ /// * missing colon between header name and colon;
+ /// * characters that are not allowed in header values except for `\0` and `\r`.
+ ///
+ /// If an ignorable error is encountered, the parser tries to find the next
+ /// line in the input to resume parsing the rest of the headers. An error
+ /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
+ /// looking for the next line.
+ pub fn http1_ignore_invalid_headers_in_responses(
+ &mut self,
+ val: bool,
+ ) -> &mut Builder {
+ self.conn_builder
+ .http1_ignore_invalid_headers_in_responses(val);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Setting this to true will force hyper to use queued strategy
+ /// which may eliminate unnecessary cloning on some TLS backends
+ ///
+ /// Default is `auto`. In this mode hyper will try to guess which
+ /// mode to use
+ pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
+ self.conn_builder.http1_writev(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will write header names as title case at
+ /// the socket level.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
+ self.conn_builder.http1_title_case_headers(val);
+ self
+ }
+
+ /// Set whether to support preserving original header cases.
+ ///
+ /// Currently, this will record the original cases received, and store them
+ /// in a private extension on the `Response`. It will also look for and use
+ /// such an extension in any provided `Request`.
+ ///
+ /// Since the relevant extension is still private, there is no way to
+ /// interact with the original cases. The only effect this can have now is
+ /// to forward the cases in a proxy-like fashion.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
+ self.conn_builder.http1_preserve_header_case(val);
+ self
+ }
+
+ /// Set whether HTTP/0.9 responses should be tolerated.
+ ///
+ /// Default is false.
+ pub fn http09_responses(&mut self, val: bool) -> &mut Self {
+ self.conn_builder.http09_responses(val);
+ self
+ }
+
+ /// Set whether the connection **must** use HTTP/2.
+ ///
+ /// The destination must either allow HTTP2 Prior Knowledge, or the
+ /// `Connect` should be configured to do use ALPN to upgrade to `h2`
+ /// as part of the connection process. This will not make the `Client`
+ /// utilize ALPN by itself.
+ ///
+ /// Note that setting this to true prevents HTTP/1 from being allowed.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_only(&mut self, val: bool) -> &mut Self {
+ self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ self.conn_builder
+ .http2_initial_stream_window_size(sz.into());
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_connection_window_size(
+ &mut self,
+ sz: impl Into<Option<u32>>,
+ ) -> &mut Self {
+ self.conn_builder
+ .http2_initial_connection_window_size(sz.into());
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
+ self.conn_builder.http2_adaptive_window(enabled);
+ self
+ }
+
+ /// Sets the maximum frame size to use for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ self.conn_builder.http2_max_frame_size(sz);
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_interval(
+ &mut self,
+ interval: impl Into<Option<Duration>>,
+ ) -> &mut Self {
+ self.conn_builder.http2_keep_alive_interval(interval);
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
+ self.conn_builder.http2_keep_alive_timeout(timeout);
+ self
+ }
+
+ /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
+ ///
+ /// If disabled, keep-alive pings are only sent while there are open
+ /// request/responses streams. If enabled, pings are also sent when no
+ /// streams are active. Does nothing if `http2_keep_alive_interval` is
+ /// disabled.
+ ///
+ /// Default is `false`.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
+ self.conn_builder.http2_keep_alive_while_idle(enabled);
+ self
+ }
+
+ /// Sets the maximum number of HTTP2 concurrent locally reset streams.
+ ///
+ /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
+ /// details.
+ ///
+ /// The default value is determined by the `h2` crate.
+ ///
+ /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
+ self.conn_builder.http2_max_concurrent_reset_streams(max);
+ self
+ }
+
+ /// Set the maximum write buffer size for each HTTP/2 stream.
+ ///
+ /// Default is currently 1MB, but may change.
+ ///
+ /// # Panics
+ ///
+ /// The value must be no larger than `u32::MAX`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
+ self.conn_builder.http2_max_send_buf_size(max);
+ self
+ }
+
+ /// Set whether to retry requests that get disrupted before ever starting
+ /// to write.
+ ///
+ /// This means a request that is queued, and gets given an idle, reused
+ /// connection, and then encounters an error immediately as the idle
+ /// connection was found to be unusable.
+ ///
+ /// When this is set to `false`, the related `ResponseFuture` would instead
+ /// resolve to an `Error::Cancel`.
+ ///
+ /// Default is `true`.
+ #[inline]
+ pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
+ self.client_config.retry_canceled_requests = val;
+ self
+ }
+
+ /// Set whether to automatically add the `Host` header to requests.
+ ///
+ /// If true, and a request does not include a `Host` header, one will be
+ /// added automatically, derived from the authority of the `Uri`.
+ ///
+ /// Default is `true`.
+ #[inline]
+ pub fn set_host(&mut self, val: bool) -> &mut Self {
+ self.client_config.set_host = val;
+ self
+ }
+
+ /// Provide an executor to execute background `Connection` tasks.
+ pub fn executor<E>(&mut self, exec: E) -> &mut Self
+ where
+ E: Executor<BoxSendFuture> + Send + Sync + 'static,
+ {
+ self.conn_builder.executor(exec);
+ self
+ }
+
+ /// Builder a client with this configuration and the default `HttpConnector`.
+ #[cfg(feature = "tcp")]
+ pub fn build_http<B>(&self) -> Client<HttpConnector, B>
+ where
+ B: HttpBody + Send,
+ B::Data: Send,
+ {
+ let mut connector = HttpConnector::new();
+ if self.pool_config.is_enabled() {
+ connector.set_keepalive(self.pool_config.idle_timeout);
+ }
+ self.build(connector)
+ }
+
+ /// Combine the configuration of this builder with a connector to create a `Client`.
+ pub fn build<C, B>(&self, connector: C) -> Client<C, B>
+ where
+ C: Connect + Clone,
+ B: HttpBody + Send,
+ B::Data: Send,
+ {
+ Client {
+ config: self.client_config,
+ conn_builder: self.conn_builder.clone(),
+ connector,
+ pool: Pool::new(self.pool_config, &self.conn_builder.exec),
+ }
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Builder")
+ .field("client_config", &self.client_config)
+ .field("conn_builder", &self.conn_builder)
+ .field("pool_config", &self.pool_config)
+ .finish()
+ }
+}
+
+#[cfg(test)]
+mod unit_tests {
+ use super::*;
+
+ #[test]
+ fn response_future_is_sync() {
+ fn assert_sync<T: Sync>() {}
+ assert_sync::<ResponseFuture>();
+ }
+
+ #[test]
+ fn set_relative_uri_with_implicit_path() {
+ let mut uri = "http://hyper.rs".parse().unwrap();
+ origin_form(&mut uri);
+ assert_eq!(uri.to_string(), "/");
+ }
+
+ #[test]
+ fn test_origin_form() {
+ let mut uri = "http://hyper.rs/guides".parse().unwrap();
+ origin_form(&mut uri);
+ assert_eq!(uri.to_string(), "/guides");
+
+ let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap();
+ origin_form(&mut uri);
+ assert_eq!(uri.to_string(), "/guides?foo=bar");
+ }
+
+ #[test]
+ fn test_absolute_form() {
+ let mut uri = "http://hyper.rs/guides".parse().unwrap();
+ absolute_form(&mut uri);
+ assert_eq!(uri.to_string(), "http://hyper.rs/guides");
+
+ let mut uri = "https://hyper.rs/guides".parse().unwrap();
+ absolute_form(&mut uri);
+ assert_eq!(uri.to_string(), "/guides");
+ }
+
+ #[test]
+ fn test_authority_form() {
+ let _ = pretty_env_logger::try_init();
+
+ let mut uri = "http://hyper.rs".parse().unwrap();
+ authority_form(&mut uri);
+ assert_eq!(uri.to_string(), "hyper.rs");
+
+ let mut uri = "hyper.rs".parse().unwrap();
+ authority_form(&mut uri);
+ assert_eq!(uri.to_string(), "hyper.rs");
+ }
+
+ #[test]
+ fn test_extract_domain_connect_no_port() {
+ let mut uri = "hyper.rs".parse().unwrap();
+ let (scheme, host) = extract_domain(&mut uri, true).expect("extract domain");
+ assert_eq!(scheme, *"http");
+ assert_eq!(host, "hyper.rs");
+ }
+
+ #[test]
+ fn test_is_secure() {
+ assert_eq!(
+ is_schema_secure(&"http://hyper.rs".parse::<Uri>().unwrap()),
+ false
+ );
+ assert_eq!(is_schema_secure(&"hyper.rs".parse::<Uri>().unwrap()), false);
+ assert_eq!(
+ is_schema_secure(&"wss://hyper.rs".parse::<Uri>().unwrap()),
+ true
+ );
+ assert_eq!(
+ is_schema_secure(&"ws://hyper.rs".parse::<Uri>().unwrap()),
+ false
+ );
+ }
+
+ #[test]
+ fn test_get_non_default_port() {
+ assert!(get_non_default_port(&"http://hyper.rs".parse::<Uri>().unwrap()).is_none());
+ assert!(get_non_default_port(&"http://hyper.rs:80".parse::<Uri>().unwrap()).is_none());
+ assert!(get_non_default_port(&"https://hyper.rs:443".parse::<Uri>().unwrap()).is_none());
+ assert!(get_non_default_port(&"hyper.rs:80".parse::<Uri>().unwrap()).is_none());
+
+ assert_eq!(
+ get_non_default_port(&"http://hyper.rs:123".parse::<Uri>().unwrap())
+ .unwrap()
+ .as_u16(),
+ 123
+ );
+ assert_eq!(
+ get_non_default_port(&"https://hyper.rs:80".parse::<Uri>().unwrap())
+ .unwrap()
+ .as_u16(),
+ 80
+ );
+ assert_eq!(
+ get_non_default_port(&"hyper.rs:123".parse::<Uri>().unwrap())
+ .unwrap()
+ .as_u16(),
+ 123
+ );
+ }
+}
diff --git a/third_party/rust/hyper/src/client/conn.rs b/third_party/rust/hyper/src/client/conn.rs
new file mode 100644
index 0000000000..3eb12b4204
--- /dev/null
+++ b/third_party/rust/hyper/src/client/conn.rs
@@ -0,0 +1,1113 @@
+//! Lower-level client connection API.
+//!
+//! The types in this module are to provide a lower-level API based around a
+//! single connection. Connecting to a host, pooling connections, and the like
+//! are not handled at this level. This module provides the building blocks to
+//! customize those things externally.
+//!
+//! If don't have need to manage connections yourself, consider using the
+//! higher-level [Client](super) API.
+//!
+//! ## Example
+//! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream
+//! ```no_run
+//! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))]
+//! # mod rt {
+//! use tower::ServiceExt;
+//! use http::{Request, StatusCode};
+//! use hyper::{client::conn, Body};
+//! use tokio::net::TcpStream;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let target_stream = TcpStream::connect("example.com:80").await?;
+//!
+//! let (mut request_sender, connection) = conn::handshake(target_stream).await?;
+//!
+//! // spawn a task to poll the connection and drive the HTTP state
+//! tokio::spawn(async move {
+//! if let Err(e) = connection.await {
+//! eprintln!("Error in connection: {}", e);
+//! }
+//! });
+//!
+//! let request = Request::builder()
+//! // We need to manually add the host header because SendRequest does not
+//! .header("Host", "example.com")
+//! .method("GET")
+//! .body(Body::from(""))?;
+//! let response = request_sender.send_request(request).await?;
+//! assert!(response.status() == StatusCode::OK);
+//!
+//! // To send via the same connection again, it may not work as it may not be ready,
+//! // so we have to wait until the request_sender becomes ready.
+//! request_sender.ready().await?;
+//! let request = Request::builder()
+//! .header("Host", "example.com")
+//! .method("GET")
+//! .body(Body::from(""))?;
+//! let response = request_sender.send_request(request).await?;
+//! assert!(response.status() == StatusCode::OK);
+//! Ok(())
+//! }
+//!
+//! # }
+//! ```
+
+use std::error::Error as StdError;
+use std::fmt;
+#[cfg(not(all(feature = "http1", feature = "http2")))]
+use std::marker::PhantomData;
+use std::sync::Arc;
+#[cfg(all(feature = "runtime", feature = "http2"))]
+use std::time::Duration;
+
+use bytes::Bytes;
+use futures_util::future::{self, Either, FutureExt as _};
+use httparse::ParserConfig;
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tower_service::Service;
+use tracing::{debug, trace};
+
+use super::dispatch;
+use crate::body::HttpBody;
+#[cfg(not(all(feature = "http1", feature = "http2")))]
+use crate::common::Never;
+use crate::common::{
+ exec::{BoxSendFuture, Exec},
+ task, Future, Pin, Poll,
+};
+use crate::proto;
+use crate::rt::Executor;
+#[cfg(feature = "http1")]
+use crate::upgrade::Upgraded;
+use crate::{Body, Request, Response};
+
+#[cfg(feature = "http1")]
+type Http1Dispatcher<T, B> =
+ proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
+
+#[cfg(not(feature = "http1"))]
+type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>);
+
+#[cfg(feature = "http2")]
+type Http2ClientTask<B> = proto::h2::ClientTask<B>;
+
+#[cfg(not(feature = "http2"))]
+type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>);
+
+pin_project! {
+ #[project = ProtoClientProj]
+ enum ProtoClient<T, B>
+ where
+ B: HttpBody,
+ {
+ H1 {
+ #[pin]
+ h1: Http1Dispatcher<T, B>,
+ },
+ H2 {
+ #[pin]
+ h2: Http2ClientTask<B>,
+ },
+ }
+}
+
+/// Returns a handshake future over some IO.
+///
+/// This is a shortcut for `Builder::new().handshake(io)`.
+/// See [`client::conn`](crate::client::conn) for more.
+pub async fn handshake<T>(
+ io: T,
+) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
+where
+ T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+{
+ Builder::new().handshake(io).await
+}
+
+/// The sender side of an established connection.
+pub struct SendRequest<B> {
+ dispatch: dispatch::Sender<Request<B>, Response<Body>>,
+}
+
+/// A future that processes all HTTP state for the IO object.
+///
+/// In most cases, this should just be spawned into an executor, so that it
+/// can process incoming and outgoing messages, notice hangups, and the like.
+#[must_use = "futures do nothing unless polled"]
+pub struct Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + Send + 'static,
+ B: HttpBody + 'static,
+{
+ inner: Option<ProtoClient<T, B>>,
+}
+
+/// A builder to configure an HTTP connection.
+///
+/// After setting options, the builder is used to create a handshake future.
+#[derive(Clone, Debug)]
+pub struct Builder {
+ pub(super) exec: Exec,
+ h09_responses: bool,
+ h1_parser_config: ParserConfig,
+ h1_writev: Option<bool>,
+ h1_title_case_headers: bool,
+ h1_preserve_header_case: bool,
+ #[cfg(feature = "ffi")]
+ h1_preserve_header_order: bool,
+ h1_read_buf_exact_size: Option<usize>,
+ h1_max_buf_size: Option<usize>,
+ #[cfg(feature = "ffi")]
+ h1_headers_raw: bool,
+ #[cfg(feature = "http2")]
+ h2_builder: proto::h2::client::Config,
+ version: Proto,
+}
+
+#[derive(Clone, Debug)]
+enum Proto {
+ #[cfg(feature = "http1")]
+ Http1,
+ #[cfg(feature = "http2")]
+ Http2,
+}
+
+/// A future returned by `SendRequest::send_request`.
+///
+/// Yields a `Response` if successful.
+#[must_use = "futures do nothing unless polled"]
+pub struct ResponseFuture {
+ inner: ResponseFutureState,
+}
+
+enum ResponseFutureState {
+ Waiting(dispatch::Promise<Response<Body>>),
+ // Option is to be able to `take()` it in `poll`
+ Error(Option<crate::Error>),
+}
+
+/// Deconstructed parts of a `Connection`.
+///
+/// This allows taking apart a `Connection` at a later time, in order to
+/// reclaim the IO object, and additional related pieces.
+#[derive(Debug)]
+pub struct Parts<T> {
+ /// The original IO object used in the handshake.
+ pub io: T,
+ /// A buffer of bytes that have been read but not processed as HTTP.
+ ///
+ /// For instance, if the `Connection` is used for an HTTP upgrade request,
+ /// it is possible the server sent back the first bytes of the new protocol
+ /// along with the response upgrade.
+ ///
+ /// You will want to check for any existing bytes if you plan to continue
+ /// communicating on the IO object.
+ pub read_buf: Bytes,
+ _inner: (),
+}
+
+// ========== internal client api
+
+// A `SendRequest` that can be cloned to send HTTP2 requests.
+// private for now, probably not a great idea of a type...
+#[must_use = "futures do nothing unless polled"]
+#[cfg(feature = "http2")]
+pub(super) struct Http2SendRequest<B> {
+ dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
+}
+
+// ===== impl SendRequest
+
+impl<B> SendRequest<B> {
+ /// Polls to determine whether this sender can be used yet for a request.
+ ///
+ /// If the associated connection is closed, this returns an Error.
+ pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ self.dispatch.poll_ready(cx)
+ }
+
+ pub(super) async fn when_ready(self) -> crate::Result<Self> {
+ let mut me = Some(self);
+ future::poll_fn(move |cx| {
+ ready!(me.as_mut().unwrap().poll_ready(cx))?;
+ Poll::Ready(Ok(me.take().unwrap()))
+ })
+ .await
+ }
+
+ pub(super) fn is_ready(&self) -> bool {
+ self.dispatch.is_ready()
+ }
+
+ pub(super) fn is_closed(&self) -> bool {
+ self.dispatch.is_closed()
+ }
+
+ #[cfg(feature = "http2")]
+ pub(super) fn into_http2(self) -> Http2SendRequest<B> {
+ Http2SendRequest {
+ dispatch: self.dispatch.unbound(),
+ }
+ }
+}
+
+impl<B> SendRequest<B>
+where
+ B: HttpBody + 'static,
+{
+ /// Sends a `Request` on the associated connection.
+ ///
+ /// Returns a future that if successful, yields the `Response`.
+ ///
+ /// # Note
+ ///
+ /// There are some key differences in what automatic things the `Client`
+ /// does for you that will not be done here:
+ ///
+ /// - `Client` requires absolute-form `Uri`s, since the scheme and
+ /// authority are needed to connect. They aren't required here.
+ /// - Since the `Client` requires absolute-form `Uri`s, it can add
+ /// the `Host` header based on it. You must add a `Host` header yourself
+ /// before calling this method.
+ /// - Since absolute-form `Uri`s are not required, if received, they will
+ /// be serialized as-is.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use http::header::HOST;
+ /// # use hyper::client::conn::SendRequest;
+ /// # use hyper::Body;
+ /// use hyper::Request;
+ ///
+ /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
+ /// // build a Request
+ /// let req = Request::builder()
+ /// .uri("/foo/bar")
+ /// .header(HOST, "hyper.rs")
+ /// .body(Body::empty())
+ /// .unwrap();
+ ///
+ /// // send it and await a Response
+ /// let res = tx.send_request(req).await?;
+ /// // assert the Response
+ /// assert!(res.status().is_success());
+ /// # Ok(())
+ /// # }
+ /// # fn main() {}
+ /// ```
+ pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
+ let inner = match self.dispatch.send(req) {
+ Ok(rx) => ResponseFutureState::Waiting(rx),
+ Err(_req) => {
+ debug!("connection was not ready");
+ let err = crate::Error::new_canceled().with("connection was not ready");
+ ResponseFutureState::Error(Some(err))
+ }
+ };
+
+ ResponseFuture { inner }
+ }
+
+ pub(super) fn send_request_retryable(
+ &mut self,
+ req: Request<B>,
+ ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
+ where
+ B: Send,
+ {
+ match self.dispatch.try_send(req) {
+ Ok(rx) => {
+ Either::Left(rx.then(move |res| {
+ match res {
+ Ok(Ok(res)) => future::ok(res),
+ Ok(Err(err)) => future::err(err),
+ // this is definite bug if it happens, but it shouldn't happen!
+ Err(_) => panic!("dispatch dropped without returning error"),
+ }
+ }))
+ }
+ Err(req) => {
+ debug!("connection was not ready");
+ let err = crate::Error::new_canceled().with("connection was not ready");
+ Either::Right(future::err((err, Some(req))))
+ }
+ }
+ }
+}
+
+impl<B> Service<Request<B>> for SendRequest<B>
+where
+ B: HttpBody + 'static,
+{
+ type Response = Response<Body>;
+ type Error = crate::Error;
+ type Future = ResponseFuture;
+
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_ready(cx)
+ }
+
+ fn call(&mut self, req: Request<B>) -> Self::Future {
+ self.send_request(req)
+ }
+}
+
+impl<B> fmt::Debug for SendRequest<B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("SendRequest").finish()
+ }
+}
+
+// ===== impl Http2SendRequest
+
+#[cfg(feature = "http2")]
+impl<B> Http2SendRequest<B> {
+ pub(super) fn is_ready(&self) -> bool {
+ self.dispatch.is_ready()
+ }
+
+ pub(super) fn is_closed(&self) -> bool {
+ self.dispatch.is_closed()
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<B> Http2SendRequest<B>
+where
+ B: HttpBody + 'static,
+{
+ pub(super) fn send_request_retryable(
+ &mut self,
+ req: Request<B>,
+ ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
+ where
+ B: Send,
+ {
+ match self.dispatch.try_send(req) {
+ Ok(rx) => {
+ Either::Left(rx.then(move |res| {
+ match res {
+ Ok(Ok(res)) => future::ok(res),
+ Ok(Err(err)) => future::err(err),
+ // this is definite bug if it happens, but it shouldn't happen!
+ Err(_) => panic!("dispatch dropped without returning error"),
+ }
+ }))
+ }
+ Err(req) => {
+ debug!("connection was not ready");
+ let err = crate::Error::new_canceled().with("connection was not ready");
+ Either::Right(future::err((err, Some(req))))
+ }
+ }
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<B> fmt::Debug for Http2SendRequest<B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Http2SendRequest").finish()
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<B> Clone for Http2SendRequest<B> {
+ fn clone(&self) -> Self {
+ Http2SendRequest {
+ dispatch: self.dispatch.clone(),
+ }
+ }
+}
+
+// ===== impl Connection
+
+impl<T, B> Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ B: HttpBody + Unpin + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ /// Return the inner IO object, and additional information.
+ ///
+ /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
+ pub fn into_parts(self) -> Parts<T> {
+ match self.inner.expect("already upgraded") {
+ #[cfg(feature = "http1")]
+ ProtoClient::H1 { h1 } => {
+ let (io, read_buf, _) = h1.into_inner();
+ Parts {
+ io,
+ read_buf,
+ _inner: (),
+ }
+ }
+ ProtoClient::H2 { .. } => {
+ panic!("http2 cannot into_inner");
+ }
+
+ #[cfg(not(feature = "http1"))]
+ ProtoClient::H1 { h1 } => match h1.0 {},
+ }
+ }
+
+ /// Poll the connection for completion, but without calling `shutdown`
+ /// on the underlying IO.
+ ///
+ /// This is useful to allow running a connection while doing an HTTP
+ /// upgrade. Once the upgrade is completed, the connection would be "done",
+ /// but it is not desired to actually shutdown the IO object. Instead you
+ /// would take it back using `into_parts`.
+ ///
+ /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
+ /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
+ /// to work with this function; or use the `without_shutdown` wrapper.
+ pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ match *self.inner.as_mut().expect("already upgraded") {
+ #[cfg(feature = "http1")]
+ ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
+ #[cfg(feature = "http2")]
+ ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
+
+ #[cfg(not(feature = "http1"))]
+ ProtoClient::H1 { ref mut h1 } => match h1.0 {},
+ #[cfg(not(feature = "http2"))]
+ ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
+ }
+ }
+
+ /// Prevent shutdown of the underlying IO object at the end of service the request,
+ /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
+ pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
+ let mut conn = Some(self);
+ future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
+ ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
+ Poll::Ready(Ok(conn.take().unwrap().into_parts()))
+ })
+ }
+
+ /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
+ ///
+ /// This setting is configured by the server peer by sending the
+ /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
+ /// This method returns the currently acknowledged value received from the
+ /// remote.
+ ///
+ /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
+ /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
+ #[cfg(feature = "http2")]
+ pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
+ match self.inner.as_ref().unwrap() {
+ ProtoClient::H1 { .. } => false,
+ ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
+ }
+ }
+}
+
+impl<T, B> Future for Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Output = crate::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
+ proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
+ #[cfg(feature = "http1")]
+ proto::Dispatched::Upgrade(pending) => match self.inner.take() {
+ Some(ProtoClient::H1 { h1 }) => {
+ let (io, buf, _) = h1.into_inner();
+ pending.fulfill(Upgraded::new(io, buf));
+ Poll::Ready(Ok(()))
+ }
+ _ => {
+ drop(pending);
+ unreachable!("Upgrade expects h1");
+ }
+ },
+ }
+ }
+}
+
+impl<T, B> fmt::Debug for Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
+ B: HttpBody + 'static,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Connection").finish()
+ }
+}
+
+// ===== impl Builder
+
+impl Builder {
+ /// Creates a new connection builder.
+ #[inline]
+ pub fn new() -> Builder {
+ Builder {
+ exec: Exec::Default,
+ h09_responses: false,
+ h1_writev: None,
+ h1_read_buf_exact_size: None,
+ h1_parser_config: Default::default(),
+ h1_title_case_headers: false,
+ h1_preserve_header_case: false,
+ #[cfg(feature = "ffi")]
+ h1_preserve_header_order: false,
+ h1_max_buf_size: None,
+ #[cfg(feature = "ffi")]
+ h1_headers_raw: false,
+ #[cfg(feature = "http2")]
+ h2_builder: Default::default(),
+ #[cfg(feature = "http1")]
+ version: Proto::Http1,
+ #[cfg(not(feature = "http1"))]
+ version: Proto::Http2,
+ }
+ }
+
+ /// Provide an executor to execute background HTTP2 tasks.
+ pub fn executor<E>(&mut self, exec: E) -> &mut Builder
+ where
+ E: Executor<BoxSendFuture> + Send + Sync + 'static,
+ {
+ self.exec = Exec::Executor(Arc::new(exec));
+ self
+ }
+
+ /// Set whether HTTP/0.9 responses should be tolerated.
+ ///
+ /// Default is false.
+ pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
+ self.h09_responses = enabled;
+ self
+ }
+
+ /// Set whether HTTP/1 connections will accept spaces between header names
+ /// and the colon that follow them in responses.
+ ///
+ /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
+ /// to say about it:
+ ///
+ /// > No whitespace is allowed between the header field-name and colon. In
+ /// > the past, differences in the handling of such whitespace have led to
+ /// > security vulnerabilities in request routing and response handling. A
+ /// > server MUST reject any received request message that contains
+ /// > whitespace between a header field-name and colon with a response code
+ /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
+ /// > response message before forwarding the message downstream.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ ///
+ /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
+ pub fn http1_allow_spaces_after_header_name_in_responses(
+ &mut self,
+ enabled: bool,
+ ) -> &mut Builder {
+ self.h1_parser_config
+ .allow_spaces_after_header_name_in_responses(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will accept obsolete line folding for
+ /// header values.
+ ///
+ /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
+ /// parsing.
+ ///
+ /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
+ /// to say about it:
+ ///
+ /// > A server that receives an obs-fold in a request message that is not
+ /// > within a message/http container MUST either reject the message by
+ /// > sending a 400 (Bad Request), preferably with a representation
+ /// > explaining that obsolete line folding is unacceptable, or replace
+ /// > each received obs-fold with one or more SP octets prior to
+ /// > interpreting the field value or forwarding the message downstream.
+ ///
+ /// > A proxy or gateway that receives an obs-fold in a response message
+ /// > that is not within a message/http container MUST either discard the
+ /// > message and replace it with a 502 (Bad Gateway) response, preferably
+ /// > with a representation explaining that unacceptable line folding was
+ /// > received, or replace each received obs-fold with one or more SP
+ /// > octets prior to interpreting the field value or forwarding the
+ /// > message downstream.
+ ///
+ /// > A user agent that receives an obs-fold in a response message that is
+ /// > not within a message/http container MUST replace each received
+ /// > obs-fold with one or more SP octets prior to interpreting the field
+ /// > value.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ ///
+ /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
+ pub fn http1_allow_obsolete_multiline_headers_in_responses(
+ &mut self,
+ enabled: bool,
+ ) -> &mut Builder {
+ self.h1_parser_config
+ .allow_obsolete_multiline_headers_in_responses(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will silently ignored malformed header lines.
+ ///
+ /// If this is enabled and and a header line does not start with a valid header
+ /// name, or does not include a colon at all, the line will be silently ignored
+ /// and no error will be reported.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn http1_ignore_invalid_headers_in_responses(
+ &mut self,
+ enabled: bool,
+ ) -> &mut Builder {
+ self.h1_parser_config
+ .ignore_invalid_headers_in_responses(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections should try to use vectored writes,
+ /// or always flatten into a single buffer.
+ ///
+ /// Note that setting this to false may mean more copies of body data,
+ /// but may also improve performance when an IO transport doesn't
+ /// support vectored writes well, such as most TLS implementations.
+ ///
+ /// Setting this to true will force hyper to use queued strategy
+ /// which may eliminate unnecessary cloning on some TLS backends
+ ///
+ /// Default is `auto`. In this mode hyper will try to guess which
+ /// mode to use
+ pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
+ self.h1_writev = Some(enabled);
+ self
+ }
+
+ /// Set whether HTTP/1 connections will write header names as title case at
+ /// the socket level.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
+ self.h1_title_case_headers = enabled;
+ self
+ }
+
+ /// Set whether to support preserving original header cases.
+ ///
+ /// Currently, this will record the original cases received, and store them
+ /// in a private extension on the `Response`. It will also look for and use
+ /// such an extension in any provided `Request`.
+ ///
+ /// Since the relevant extension is still private, there is no way to
+ /// interact with the original cases. The only effect this can have now is
+ /// to forward the cases in a proxy-like fashion.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
+ self.h1_preserve_header_case = enabled;
+ self
+ }
+
+ /// Set whether to support preserving original header order.
+ ///
+ /// Currently, this will record the order in which headers are received, and store this
+ /// ordering in a private extension on the `Response`. It will also look for and use
+ /// such an extension in any provided `Request`.
+ ///
+ /// Note that this setting does not affect HTTP/2.
+ ///
+ /// Default is false.
+ #[cfg(feature = "ffi")]
+ pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
+ self.h1_preserve_header_order = enabled;
+ self
+ }
+
+ /// Sets the exact size of the read buffer to *always* use.
+ ///
+ /// Note that setting this option unsets the `http1_max_buf_size` option.
+ ///
+ /// Default is an adaptive read buffer.
+ pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
+ self.h1_read_buf_exact_size = sz;
+ self.h1_max_buf_size = None;
+ self
+ }
+
+ /// Set the maximum buffer size for the connection.
+ ///
+ /// Default is ~400kb.
+ ///
+ /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
+ ///
+ /// # Panics
+ ///
+ /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
+ #[cfg(feature = "http1")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
+ pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
+ assert!(
+ max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
+ "the max_buf_size cannot be smaller than the minimum that h1 specifies."
+ );
+
+ self.h1_max_buf_size = Some(max);
+ self.h1_read_buf_exact_size = None;
+ self
+ }
+
+ #[cfg(feature = "ffi")]
+ pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
+ self.h1_headers_raw = enabled;
+ self
+ }
+
+ /// Sets whether HTTP2 is required.
+ ///
+ /// Default is false.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
+ if enabled {
+ self.version = Proto::Http2
+ }
+ self
+ }
+
+ /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
+ /// stream-level flow control.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ ///
+ /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_stream_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets the max connection-level flow control for HTTP2
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_initial_connection_window_size(
+ &mut self,
+ sz: impl Into<Option<u32>>,
+ ) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.adaptive_window = false;
+ self.h2_builder.initial_conn_window_size = sz;
+ }
+ self
+ }
+
+ /// Sets whether to use an adaptive flow control.
+ ///
+ /// Enabling this will override the limits set in
+ /// `http2_initial_stream_window_size` and
+ /// `http2_initial_connection_window_size`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
+ use proto::h2::SPEC_WINDOW_SIZE;
+
+ self.h2_builder.adaptive_window = enabled;
+ if enabled {
+ self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
+ self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
+ }
+ self
+ }
+
+ /// Sets the maximum frame size to use for HTTP2.
+ ///
+ /// Passing `None` will do nothing.
+ ///
+ /// If not set, hyper will use a default.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
+ if let Some(sz) = sz.into() {
+ self.h2_builder.max_frame_size = sz;
+ }
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a
+ /// connection alive.
+ ///
+ /// Pass `None` to disable HTTP2 keep-alive.
+ ///
+ /// Default is currently disabled.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_interval(
+ &mut self,
+ interval: impl Into<Option<Duration>>,
+ ) -> &mut Self {
+ self.h2_builder.keep_alive_interval = interval.into();
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will
+ /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
+ ///
+ /// Default is 20 seconds.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
+ self.h2_builder.keep_alive_timeout = timeout;
+ self
+ }
+
+ /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
+ ///
+ /// If disabled, keep-alive pings are only sent while there are open
+ /// request/responses streams. If enabled, pings are also sent when no
+ /// streams are active. Does nothing if `http2_keep_alive_interval` is
+ /// disabled.
+ ///
+ /// Default is `false`.
+ ///
+ /// # Cargo Feature
+ ///
+ /// Requires the `runtime` cargo feature to be enabled.
+ #[cfg(feature = "runtime")]
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
+ self.h2_builder.keep_alive_while_idle = enabled;
+ self
+ }
+
+ /// Sets the maximum number of HTTP2 concurrent locally reset streams.
+ ///
+ /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
+ /// details.
+ ///
+ /// The default value is determined by the `h2` crate.
+ ///
+ /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
+ self.h2_builder.max_concurrent_reset_streams = Some(max);
+ self
+ }
+
+ /// Set the maximum write buffer size for each HTTP/2 stream.
+ ///
+ /// Default is currently 1MB, but may change.
+ ///
+ /// # Panics
+ ///
+ /// The value must be no larger than `u32::MAX`.
+ #[cfg(feature = "http2")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
+ pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
+ assert!(max <= std::u32::MAX as usize);
+ self.h2_builder.max_send_buffer_size = max;
+ self
+ }
+
+ /// Constructs a connection with the configured options and IO.
+ /// See [`client::conn`](crate::client::conn) for more.
+ ///
+ /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
+ /// do nothing.
+ pub fn handshake<T, B>(
+ &self,
+ io: T,
+ ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
+ where
+ T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ B: HttpBody + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ let opts = self.clone();
+
+ async move {
+ trace!("client handshake {:?}", opts.version);
+
+ let (tx, rx) = dispatch::channel();
+ let proto = match opts.version {
+ #[cfg(feature = "http1")]
+ Proto::Http1 => {
+ let mut conn = proto::Conn::new(io);
+ conn.set_h1_parser_config(opts.h1_parser_config);
+ if let Some(writev) = opts.h1_writev {
+ if writev {
+ conn.set_write_strategy_queue();
+ } else {
+ conn.set_write_strategy_flatten();
+ }
+ }
+ if opts.h1_title_case_headers {
+ conn.set_title_case_headers();
+ }
+ if opts.h1_preserve_header_case {
+ conn.set_preserve_header_case();
+ }
+ #[cfg(feature = "ffi")]
+ if opts.h1_preserve_header_order {
+ conn.set_preserve_header_order();
+ }
+ if opts.h09_responses {
+ conn.set_h09_responses();
+ }
+
+ #[cfg(feature = "ffi")]
+ conn.set_raw_headers(opts.h1_headers_raw);
+
+ if let Some(sz) = opts.h1_read_buf_exact_size {
+ conn.set_read_buf_exact_size(sz);
+ }
+ if let Some(max) = opts.h1_max_buf_size {
+ conn.set_max_buf_size(max);
+ }
+ let cd = proto::h1::dispatch::Client::new(rx);
+ let dispatch = proto::h1::Dispatcher::new(cd, conn);
+ ProtoClient::H1 { h1: dispatch }
+ }
+ #[cfg(feature = "http2")]
+ Proto::Http2 => {
+ let h2 =
+ proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
+ .await?;
+ ProtoClient::H2 { h2 }
+ }
+ };
+
+ Ok((
+ SendRequest { dispatch: tx },
+ Connection { inner: Some(proto) },
+ ))
+ }
+ }
+}
+
+// ===== impl ResponseFuture
+
+impl Future for ResponseFuture {
+ type Output = crate::Result<Response<Body>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match self.inner {
+ ResponseFutureState::Waiting(ref mut rx) => {
+ Pin::new(rx).poll(cx).map(|res| match res {
+ Ok(Ok(resp)) => Ok(resp),
+ Ok(Err(err)) => Err(err),
+ // this is definite bug if it happens, but it shouldn't happen!
+ Err(_canceled) => panic!("dispatch dropped without returning error"),
+ })
+ }
+ ResponseFutureState::Error(ref mut err) => {
+ Poll::Ready(Err(err.take().expect("polled after ready")))
+ }
+ }
+ }
+}
+
+impl fmt::Debug for ResponseFuture {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ResponseFuture").finish()
+ }
+}
+
+// ===== impl ProtoClient
+
+impl<T, B> Future for ProtoClient<T, B>
+where
+ T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Output = crate::Result<proto::Dispatched>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match self.project() {
+ #[cfg(feature = "http1")]
+ ProtoClientProj::H1 { h1 } => h1.poll(cx),
+ #[cfg(feature = "http2")]
+ ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
+
+ #[cfg(not(feature = "http1"))]
+ ProtoClientProj::H1 { h1 } => match h1.0 {},
+ #[cfg(not(feature = "http2"))]
+ ProtoClientProj::H2 { h2, .. } => match h2.0 {},
+ }
+ }
+}
+
+// assert trait markers
+
+trait AssertSend: Send {}
+trait AssertSendSync: Send + Sync {}
+
+#[doc(hidden)]
+impl<B: Send> AssertSendSync for SendRequest<B> {}
+
+#[doc(hidden)]
+impl<T: Send, B: Send> AssertSend for Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + Send + 'static,
+ B: HttpBody + 'static,
+ B::Data: Send,
+{
+}
+
+#[doc(hidden)]
+impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
+where
+ T: AsyncRead + AsyncWrite + Send + 'static,
+ B: HttpBody + 'static,
+ B::Data: Send + Sync + 'static,
+{
+}
+
+#[doc(hidden)]
+impl AssertSendSync for Builder {}
+
+#[doc(hidden)]
+impl AssertSend for ResponseFuture {}
diff --git a/third_party/rust/hyper/src/client/connect/dns.rs b/third_party/rust/hyper/src/client/connect/dns.rs
new file mode 100644
index 0000000000..e4465078b3
--- /dev/null
+++ b/third_party/rust/hyper/src/client/connect/dns.rs
@@ -0,0 +1,425 @@
+//! DNS Resolution used by the `HttpConnector`.
+//!
+//! This module contains:
+//!
+//! - A [`GaiResolver`](GaiResolver) that is the default resolver for the
+//! `HttpConnector`.
+//! - The `Name` type used as an argument to custom resolvers.
+//!
+//! # Resolvers are `Service`s
+//!
+//! A resolver is just a
+//! `Service<Name, Response = impl Iterator<Item = SocketAddr>>`.
+//!
+//! A simple resolver that ignores the name and always returns a specific
+//! address:
+//!
+//! ```rust,ignore
+//! use std::{convert::Infallible, iter, net::SocketAddr};
+//!
+//! let resolver = tower::service_fn(|_name| async {
+//! Ok::<_, Infallible>(iter::once(SocketAddr::from(([127, 0, 0, 1], 8080))))
+//! });
+//! ```
+use std::error::Error;
+use std::future::Future;
+use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
+use std::pin::Pin;
+use std::str::FromStr;
+use std::task::{self, Poll};
+use std::{fmt, io, vec};
+
+use tokio::task::JoinHandle;
+use tower_service::Service;
+use tracing::debug;
+
+pub(super) use self::sealed::Resolve;
+
+/// A domain name to resolve into IP addresses.
+#[derive(Clone, Hash, Eq, PartialEq)]
+pub struct Name {
+ host: Box<str>,
+}
+
+/// A resolver using blocking `getaddrinfo` calls in a threadpool.
+#[derive(Clone)]
+pub struct GaiResolver {
+ _priv: (),
+}
+
+/// An iterator of IP addresses returned from `getaddrinfo`.
+pub struct GaiAddrs {
+ inner: SocketAddrs,
+}
+
+/// A future to resolve a name returned by `GaiResolver`.
+pub struct GaiFuture {
+ inner: JoinHandle<Result<SocketAddrs, io::Error>>,
+}
+
+impl Name {
+ pub(super) fn new(host: Box<str>) -> Name {
+ Name { host }
+ }
+
+ /// View the hostname as a string slice.
+ pub fn as_str(&self) -> &str {
+ &self.host
+ }
+}
+
+impl fmt::Debug for Name {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&self.host, f)
+ }
+}
+
+impl fmt::Display for Name {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Display::fmt(&self.host, f)
+ }
+}
+
+impl FromStr for Name {
+ type Err = InvalidNameError;
+
+ fn from_str(host: &str) -> Result<Self, Self::Err> {
+ // Possibly add validation later
+ Ok(Name::new(host.into()))
+ }
+}
+
+/// Error indicating a given string was not a valid domain name.
+#[derive(Debug)]
+pub struct InvalidNameError(());
+
+impl fmt::Display for InvalidNameError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("Not a valid domain name")
+ }
+}
+
+impl Error for InvalidNameError {}
+
+impl GaiResolver {
+ /// Construct a new `GaiResolver`.
+ pub fn new() -> Self {
+ GaiResolver { _priv: () }
+ }
+}
+
+impl Service<Name> for GaiResolver {
+ type Response = GaiAddrs;
+ type Error = io::Error;
+ type Future = GaiFuture;
+
+ fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, name: Name) -> Self::Future {
+ let blocking = tokio::task::spawn_blocking(move || {
+ debug!("resolving host={:?}", name.host);
+ (&*name.host, 0)
+ .to_socket_addrs()
+ .map(|i| SocketAddrs { iter: i })
+ });
+
+ GaiFuture { inner: blocking }
+ }
+}
+
+impl fmt::Debug for GaiResolver {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("GaiResolver")
+ }
+}
+
+impl Future for GaiFuture {
+ type Output = Result<GaiAddrs, io::Error>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.inner).poll(cx).map(|res| match res {
+ Ok(Ok(addrs)) => Ok(GaiAddrs { inner: addrs }),
+ Ok(Err(err)) => Err(err),
+ Err(join_err) => {
+ if join_err.is_cancelled() {
+ Err(io::Error::new(io::ErrorKind::Interrupted, join_err))
+ } else {
+ panic!("gai background task failed: {:?}", join_err)
+ }
+ }
+ })
+ }
+}
+
+impl fmt::Debug for GaiFuture {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("GaiFuture")
+ }
+}
+
+impl Drop for GaiFuture {
+ fn drop(&mut self) {
+ self.inner.abort();
+ }
+}
+
+impl Iterator for GaiAddrs {
+ type Item = SocketAddr;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.inner.next()
+ }
+}
+
+impl fmt::Debug for GaiAddrs {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("GaiAddrs")
+ }
+}
+
+pub(super) struct SocketAddrs {
+ iter: vec::IntoIter<SocketAddr>,
+}
+
+impl SocketAddrs {
+ pub(super) fn new(addrs: Vec<SocketAddr>) -> Self {
+ SocketAddrs {
+ iter: addrs.into_iter(),
+ }
+ }
+
+ pub(super) fn try_parse(host: &str, port: u16) -> Option<SocketAddrs> {
+ if let Ok(addr) = host.parse::<Ipv4Addr>() {
+ let addr = SocketAddrV4::new(addr, port);
+ return Some(SocketAddrs {
+ iter: vec![SocketAddr::V4(addr)].into_iter(),
+ });
+ }
+ if let Ok(addr) = host.parse::<Ipv6Addr>() {
+ let addr = SocketAddrV6::new(addr, port, 0, 0);
+ return Some(SocketAddrs {
+ iter: vec![SocketAddr::V6(addr)].into_iter(),
+ });
+ }
+ None
+ }
+
+ #[inline]
+ fn filter(self, predicate: impl FnMut(&SocketAddr) -> bool) -> SocketAddrs {
+ SocketAddrs::new(self.iter.filter(predicate).collect())
+ }
+
+ pub(super) fn split_by_preference(
+ self,
+ local_addr_ipv4: Option<Ipv4Addr>,
+ local_addr_ipv6: Option<Ipv6Addr>,
+ ) -> (SocketAddrs, SocketAddrs) {
+ match (local_addr_ipv4, local_addr_ipv6) {
+ (Some(_), None) => (self.filter(SocketAddr::is_ipv4), SocketAddrs::new(vec![])),
+ (None, Some(_)) => (self.filter(SocketAddr::is_ipv6), SocketAddrs::new(vec![])),
+ _ => {
+ let preferring_v6 = self
+ .iter
+ .as_slice()
+ .first()
+ .map(SocketAddr::is_ipv6)
+ .unwrap_or(false);
+
+ let (preferred, fallback) = self
+ .iter
+ .partition::<Vec<_>, _>(|addr| addr.is_ipv6() == preferring_v6);
+
+ (SocketAddrs::new(preferred), SocketAddrs::new(fallback))
+ }
+ }
+ }
+
+ pub(super) fn is_empty(&self) -> bool {
+ self.iter.as_slice().is_empty()
+ }
+
+ pub(super) fn len(&self) -> usize {
+ self.iter.as_slice().len()
+ }
+}
+
+impl Iterator for SocketAddrs {
+ type Item = SocketAddr;
+ #[inline]
+ fn next(&mut self) -> Option<SocketAddr> {
+ self.iter.next()
+ }
+}
+
+/*
+/// A resolver using `getaddrinfo` calls via the `tokio_executor::threadpool::blocking` API.
+///
+/// Unlike the `GaiResolver` this will not spawn dedicated threads, but only works when running on the
+/// multi-threaded Tokio runtime.
+#[cfg(feature = "runtime")]
+#[derive(Clone, Debug)]
+pub struct TokioThreadpoolGaiResolver(());
+
+/// The future returned by `TokioThreadpoolGaiResolver`.
+#[cfg(feature = "runtime")]
+#[derive(Debug)]
+pub struct TokioThreadpoolGaiFuture {
+ name: Name,
+}
+
+#[cfg(feature = "runtime")]
+impl TokioThreadpoolGaiResolver {
+ /// Creates a new DNS resolver that will use tokio threadpool's blocking
+ /// feature.
+ ///
+ /// **Requires** its futures to be run on the threadpool runtime.
+ pub fn new() -> Self {
+ TokioThreadpoolGaiResolver(())
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl Service<Name> for TokioThreadpoolGaiResolver {
+ type Response = GaiAddrs;
+ type Error = io::Error;
+ type Future = TokioThreadpoolGaiFuture;
+
+ fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, name: Name) -> Self::Future {
+ TokioThreadpoolGaiFuture { name }
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl Future for TokioThreadpoolGaiFuture {
+ type Output = Result<GaiAddrs, io::Error>;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match ready!(tokio_executor::threadpool::blocking(|| (
+ self.name.as_str(),
+ 0
+ )
+ .to_socket_addrs()))
+ {
+ Ok(Ok(iter)) => Poll::Ready(Ok(GaiAddrs {
+ inner: IpAddrs { iter },
+ })),
+ Ok(Err(e)) => Poll::Ready(Err(e)),
+ // a BlockingError, meaning not on a tokio_executor::threadpool :(
+ Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))),
+ }
+ }
+}
+*/
+
+mod sealed {
+ use super::{SocketAddr, Name};
+ use crate::common::{task, Future, Poll};
+ use tower_service::Service;
+
+ // "Trait alias" for `Service<Name, Response = Addrs>`
+ pub trait Resolve {
+ type Addrs: Iterator<Item = SocketAddr>;
+ type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
+ type Future: Future<Output = Result<Self::Addrs, Self::Error>>;
+
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
+ fn resolve(&mut self, name: Name) -> Self::Future;
+ }
+
+ impl<S> Resolve for S
+ where
+ S: Service<Name>,
+ S::Response: Iterator<Item = SocketAddr>,
+ S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
+ {
+ type Addrs = S::Response;
+ type Error = S::Error;
+ type Future = S::Future;
+
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Service::poll_ready(self, cx)
+ }
+
+ fn resolve(&mut self, name: Name) -> Self::Future {
+ Service::call(self, name)
+ }
+ }
+}
+
+pub(super) async fn resolve<R>(resolver: &mut R, name: Name) -> Result<R::Addrs, R::Error>
+where
+ R: Resolve,
+{
+ futures_util::future::poll_fn(|cx| resolver.poll_ready(cx)).await?;
+ resolver.resolve(name).await
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::{Ipv4Addr, Ipv6Addr};
+
+ #[test]
+ fn test_ip_addrs_split_by_preference() {
+ let ip_v4 = Ipv4Addr::new(127, 0, 0, 1);
+ let ip_v6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1);
+ let v4_addr = (ip_v4, 80).into();
+ let v6_addr = (ip_v6, 80).into();
+
+ let (mut preferred, mut fallback) = SocketAddrs {
+ iter: vec![v4_addr, v6_addr].into_iter(),
+ }
+ .split_by_preference(None, None);
+ assert!(preferred.next().unwrap().is_ipv4());
+ assert!(fallback.next().unwrap().is_ipv6());
+
+ let (mut preferred, mut fallback) = SocketAddrs {
+ iter: vec![v6_addr, v4_addr].into_iter(),
+ }
+ .split_by_preference(None, None);
+ assert!(preferred.next().unwrap().is_ipv6());
+ assert!(fallback.next().unwrap().is_ipv4());
+
+ let (mut preferred, mut fallback) = SocketAddrs {
+ iter: vec![v4_addr, v6_addr].into_iter(),
+ }
+ .split_by_preference(Some(ip_v4), Some(ip_v6));
+ assert!(preferred.next().unwrap().is_ipv4());
+ assert!(fallback.next().unwrap().is_ipv6());
+
+ let (mut preferred, mut fallback) = SocketAddrs {
+ iter: vec![v6_addr, v4_addr].into_iter(),
+ }
+ .split_by_preference(Some(ip_v4), Some(ip_v6));
+ assert!(preferred.next().unwrap().is_ipv6());
+ assert!(fallback.next().unwrap().is_ipv4());
+
+ let (mut preferred, fallback) = SocketAddrs {
+ iter: vec![v4_addr, v6_addr].into_iter(),
+ }
+ .split_by_preference(Some(ip_v4), None);
+ assert!(preferred.next().unwrap().is_ipv4());
+ assert!(fallback.is_empty());
+
+ let (mut preferred, fallback) = SocketAddrs {
+ iter: vec![v4_addr, v6_addr].into_iter(),
+ }
+ .split_by_preference(None, Some(ip_v6));
+ assert!(preferred.next().unwrap().is_ipv6());
+ assert!(fallback.is_empty());
+ }
+
+ #[test]
+ fn test_name_from_str() {
+ const DOMAIN: &str = "test.example.com";
+ let name = Name::from_str(DOMAIN).expect("Should be a valid domain");
+ assert_eq!(name.as_str(), DOMAIN);
+ assert_eq!(name.to_string(), DOMAIN);
+ }
+}
diff --git a/third_party/rust/hyper/src/client/connect/http.rs b/third_party/rust/hyper/src/client/connect/http.rs
new file mode 100644
index 0000000000..afe7b155eb
--- /dev/null
+++ b/third_party/rust/hyper/src/client/connect/http.rs
@@ -0,0 +1,1007 @@
+use std::error::Error as StdError;
+use std::fmt;
+use std::future::Future;
+use std::io;
+use std::marker::PhantomData;
+use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{self, Poll};
+use std::time::Duration;
+
+use futures_util::future::Either;
+use http::uri::{Scheme, Uri};
+use pin_project_lite::pin_project;
+use tokio::net::{TcpSocket, TcpStream};
+use tokio::time::Sleep;
+use tracing::{debug, trace, warn};
+
+use super::dns::{self, resolve, GaiResolver, Resolve};
+use super::{Connected, Connection};
+//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
+
+/// A connector for the `http` scheme.
+///
+/// Performs DNS resolution in a thread pool, and then connects over TCP.
+///
+/// # Note
+///
+/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
+/// transport information such as the remote socket address used.
+#[cfg_attr(docsrs, doc(cfg(feature = "tcp")))]
+#[derive(Clone)]
+pub struct HttpConnector<R = GaiResolver> {
+ config: Arc<Config>,
+ resolver: R,
+}
+
+/// Extra information about the transport when an HttpConnector is used.
+///
+/// # Example
+///
+/// ```
+/// # async fn doc() -> hyper::Result<()> {
+/// use hyper::Uri;
+/// use hyper::client::{Client, connect::HttpInfo};
+///
+/// let client = Client::new();
+/// let uri = Uri::from_static("http://example.com");
+///
+/// let res = client.get(uri).await?;
+/// res
+/// .extensions()
+/// .get::<HttpInfo>()
+/// .map(|info| {
+/// println!("remote addr = {}", info.remote_addr());
+/// });
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Note
+///
+/// If a different connector is used besides [`HttpConnector`](HttpConnector),
+/// this value will not exist in the extensions. Consult that specific
+/// connector to see what "extra" information it might provide to responses.
+#[derive(Clone, Debug)]
+pub struct HttpInfo {
+ remote_addr: SocketAddr,
+ local_addr: SocketAddr,
+}
+
+#[derive(Clone)]
+struct Config {
+ connect_timeout: Option<Duration>,
+ enforce_http: bool,
+ happy_eyeballs_timeout: Option<Duration>,
+ keep_alive_timeout: Option<Duration>,
+ local_address_ipv4: Option<Ipv4Addr>,
+ local_address_ipv6: Option<Ipv6Addr>,
+ nodelay: bool,
+ reuse_address: bool,
+ send_buffer_size: Option<usize>,
+ recv_buffer_size: Option<usize>,
+}
+
+// ===== impl HttpConnector =====
+
+impl HttpConnector {
+ /// Construct a new HttpConnector.
+ pub fn new() -> HttpConnector {
+ HttpConnector::new_with_resolver(GaiResolver::new())
+ }
+}
+
+/*
+#[cfg(feature = "runtime")]
+impl HttpConnector<TokioThreadpoolGaiResolver> {
+ /// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`.
+ ///
+ /// This resolver **requires** the threadpool runtime to be used.
+ pub fn new_with_tokio_threadpool_resolver() -> Self {
+ HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new())
+ }
+}
+*/
+
+impl<R> HttpConnector<R> {
+ /// Construct a new HttpConnector.
+ ///
+ /// Takes a [`Resolver`](crate::client::connect::dns#resolvers-are-services) to handle DNS lookups.
+ pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
+ HttpConnector {
+ config: Arc::new(Config {
+ connect_timeout: None,
+ enforce_http: true,
+ happy_eyeballs_timeout: Some(Duration::from_millis(300)),
+ keep_alive_timeout: None,
+ local_address_ipv4: None,
+ local_address_ipv6: None,
+ nodelay: false,
+ reuse_address: false,
+ send_buffer_size: None,
+ recv_buffer_size: None,
+ }),
+ resolver,
+ }
+ }
+
+ /// Option to enforce all `Uri`s have the `http` scheme.
+ ///
+ /// Enabled by default.
+ #[inline]
+ pub fn enforce_http(&mut self, is_enforced: bool) {
+ self.config_mut().enforce_http = is_enforced;
+ }
+
+ /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
+ ///
+ /// If `None`, the option will not be set.
+ ///
+ /// Default is `None`.
+ #[inline]
+ pub fn set_keepalive(&mut self, dur: Option<Duration>) {
+ self.config_mut().keep_alive_timeout = dur;
+ }
+
+ /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
+ ///
+ /// Default is `false`.
+ #[inline]
+ pub fn set_nodelay(&mut self, nodelay: bool) {
+ self.config_mut().nodelay = nodelay;
+ }
+
+ /// Sets the value of the SO_SNDBUF option on the socket.
+ #[inline]
+ pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
+ self.config_mut().send_buffer_size = size;
+ }
+
+ /// Sets the value of the SO_RCVBUF option on the socket.
+ #[inline]
+ pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
+ self.config_mut().recv_buffer_size = size;
+ }
+
+ /// Set that all sockets are bound to the configured address before connection.
+ ///
+ /// If `None`, the sockets will not be bound.
+ ///
+ /// Default is `None`.
+ #[inline]
+ pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
+ let (v4, v6) = match addr {
+ Some(IpAddr::V4(a)) => (Some(a), None),
+ Some(IpAddr::V6(a)) => (None, Some(a)),
+ _ => (None, None),
+ };
+
+ let cfg = self.config_mut();
+
+ cfg.local_address_ipv4 = v4;
+ cfg.local_address_ipv6 = v6;
+ }
+
+ /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's
+ /// preferences) before connection.
+ #[inline]
+ pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) {
+ let cfg = self.config_mut();
+
+ cfg.local_address_ipv4 = Some(addr_ipv4);
+ cfg.local_address_ipv6 = Some(addr_ipv6);
+ }
+
+ /// Set the connect timeout.
+ ///
+ /// If a domain resolves to multiple IP addresses, the timeout will be
+ /// evenly divided across them.
+ ///
+ /// Default is `None`.
+ #[inline]
+ pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
+ self.config_mut().connect_timeout = dur;
+ }
+
+ /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
+ ///
+ /// If hostname resolves to both IPv4 and IPv6 addresses and connection
+ /// cannot be established using preferred address family before timeout
+ /// elapses, then connector will in parallel attempt connection using other
+ /// address family.
+ ///
+ /// If `None`, parallel connection attempts are disabled.
+ ///
+ /// Default is 300 milliseconds.
+ ///
+ /// [RFC 6555]: https://tools.ietf.org/html/rfc6555
+ #[inline]
+ pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
+ self.config_mut().happy_eyeballs_timeout = dur;
+ }
+
+ /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
+ ///
+ /// Default is `false`.
+ #[inline]
+ pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
+ self.config_mut().reuse_address = reuse_address;
+ self
+ }
+
+ // private
+
+ fn config_mut(&mut self) -> &mut Config {
+ // If the are HttpConnector clones, this will clone the inner
+ // config. So mutating the config won't ever affect previous
+ // clones.
+ Arc::make_mut(&mut self.config)
+ }
+}
+
+static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http";
+static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing";
+static INVALID_MISSING_HOST: &str = "invalid URL, host is missing";
+
+// R: Debug required for now to allow adding it to debug output later...
+impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("HttpConnector").finish()
+ }
+}
+
+impl<R> tower_service::Service<Uri> for HttpConnector<R>
+where
+ R: Resolve + Clone + Send + Sync + 'static,
+ R::Future: Send,
+{
+ type Response = TcpStream;
+ type Error = ConnectError;
+ type Future = HttpConnecting<R>;
+
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?;
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, dst: Uri) -> Self::Future {
+ let mut self_ = self.clone();
+ HttpConnecting {
+ fut: Box::pin(async move { self_.call_async(dst).await }),
+ _marker: PhantomData,
+ }
+ }
+}
+
+fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> {
+ trace!(
+ "Http::connect; scheme={:?}, host={:?}, port={:?}",
+ dst.scheme(),
+ dst.host(),
+ dst.port(),
+ );
+
+ if config.enforce_http {
+ if dst.scheme() != Some(&Scheme::HTTP) {
+ return Err(ConnectError {
+ msg: INVALID_NOT_HTTP.into(),
+ cause: None,
+ });
+ }
+ } else if dst.scheme().is_none() {
+ return Err(ConnectError {
+ msg: INVALID_MISSING_SCHEME.into(),
+ cause: None,
+ });
+ }
+
+ let host = match dst.host() {
+ Some(s) => s,
+ None => {
+ return Err(ConnectError {
+ msg: INVALID_MISSING_HOST.into(),
+ cause: None,
+ })
+ }
+ };
+ let port = match dst.port() {
+ Some(port) => port.as_u16(),
+ None => {
+ if dst.scheme() == Some(&Scheme::HTTPS) {
+ 443
+ } else {
+ 80
+ }
+ }
+ };
+
+ Ok((host, port))
+}
+
+impl<R> HttpConnector<R>
+where
+ R: Resolve,
+{
+ async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
+ let config = &self.config;
+
+ let (host, port) = get_host_port(config, &dst)?;
+ let host = host.trim_start_matches('[').trim_end_matches(']');
+
+ // If the host is already an IP addr (v4 or v6),
+ // skip resolving the dns and start connecting right away.
+ let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) {
+ addrs
+ } else {
+ let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
+ .await
+ .map_err(ConnectError::dns)?;
+ let addrs = addrs
+ .map(|mut addr| {
+ addr.set_port(port);
+ addr
+ })
+ .collect();
+ dns::SocketAddrs::new(addrs)
+ };
+
+ let c = ConnectingTcp::new(addrs, config);
+
+ let sock = c.connect().await?;
+
+ if let Err(e) = sock.set_nodelay(config.nodelay) {
+ warn!("tcp set_nodelay error: {}", e);
+ }
+
+ Ok(sock)
+ }
+}
+
+impl Connection for TcpStream {
+ fn connected(&self) -> Connected {
+ let connected = Connected::new();
+ if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) {
+ connected.extra(HttpInfo { remote_addr, local_addr })
+ } else {
+ connected
+ }
+ }
+}
+
+impl HttpInfo {
+ /// Get the remote address of the transport used.
+ pub fn remote_addr(&self) -> SocketAddr {
+ self.remote_addr
+ }
+
+ /// Get the local address of the transport used.
+ pub fn local_addr(&self) -> SocketAddr {
+ self.local_addr
+ }
+}
+
+pin_project! {
+ // Not publicly exported (so missing_docs doesn't trigger).
+ //
+ // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
+ // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
+ // (and thus we can change the type in the future).
+ #[must_use = "futures do nothing unless polled"]
+ #[allow(missing_debug_implementations)]
+ pub struct HttpConnecting<R> {
+ #[pin]
+ fut: BoxConnecting,
+ _marker: PhantomData<R>,
+ }
+}
+
+type ConnectResult = Result<TcpStream, ConnectError>;
+type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;
+
+impl<R: Resolve> Future for HttpConnecting<R> {
+ type Output = ConnectResult;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ self.project().fut.poll(cx)
+ }
+}
+
+// Not publicly exported (so missing_docs doesn't trigger).
+pub struct ConnectError {
+ msg: Box<str>,
+ cause: Option<Box<dyn StdError + Send + Sync>>,
+}
+
+impl ConnectError {
+ fn new<S, E>(msg: S, cause: E) -> ConnectError
+ where
+ S: Into<Box<str>>,
+ E: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ ConnectError {
+ msg: msg.into(),
+ cause: Some(cause.into()),
+ }
+ }
+
+ fn dns<E>(cause: E) -> ConnectError
+ where
+ E: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ ConnectError::new("dns error", cause)
+ }
+
+ fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError
+ where
+ S: Into<Box<str>>,
+ E: Into<Box<dyn StdError + Send + Sync>>,
+ {
+ move |cause| ConnectError::new(msg, cause)
+ }
+}
+
+impl fmt::Debug for ConnectError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if let Some(ref cause) = self.cause {
+ f.debug_tuple("ConnectError")
+ .field(&self.msg)
+ .field(cause)
+ .finish()
+ } else {
+ self.msg.fmt(f)
+ }
+ }
+}
+
+impl fmt::Display for ConnectError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str(&self.msg)?;
+
+ if let Some(ref cause) = self.cause {
+ write!(f, ": {}", cause)?;
+ }
+
+ Ok(())
+ }
+}
+
+impl StdError for ConnectError {
+ fn source(&self) -> Option<&(dyn StdError + 'static)> {
+ self.cause.as_ref().map(|e| &**e as _)
+ }
+}
+
+struct ConnectingTcp<'a> {
+ preferred: ConnectingTcpRemote,
+ fallback: Option<ConnectingTcpFallback>,
+ config: &'a Config,
+}
+
+impl<'a> ConnectingTcp<'a> {
+ fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self {
+ if let Some(fallback_timeout) = config.happy_eyeballs_timeout {
+ let (preferred_addrs, fallback_addrs) = remote_addrs
+ .split_by_preference(config.local_address_ipv4, config.local_address_ipv6);
+ if fallback_addrs.is_empty() {
+ return ConnectingTcp {
+ preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
+ fallback: None,
+ config,
+ };
+ }
+
+ ConnectingTcp {
+ preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout),
+ fallback: Some(ConnectingTcpFallback {
+ delay: tokio::time::sleep(fallback_timeout),
+ remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout),
+ }),
+ config,
+ }
+ } else {
+ ConnectingTcp {
+ preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout),
+ fallback: None,
+ config,
+ }
+ }
+ }
+}
+
+struct ConnectingTcpFallback {
+ delay: Sleep,
+ remote: ConnectingTcpRemote,
+}
+
+struct ConnectingTcpRemote {
+ addrs: dns::SocketAddrs,
+ connect_timeout: Option<Duration>,
+}
+
+impl ConnectingTcpRemote {
+ fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self {
+ let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
+
+ Self {
+ addrs,
+ connect_timeout,
+ }
+ }
+}
+
+impl ConnectingTcpRemote {
+ async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> {
+ let mut err = None;
+ for addr in &mut self.addrs {
+ debug!("connecting to {}", addr);
+ match connect(&addr, config, self.connect_timeout)?.await {
+ Ok(tcp) => {
+ debug!("connected to {}", addr);
+ return Ok(tcp);
+ }
+ Err(e) => {
+ trace!("connect error for {}: {:?}", addr, e);
+ err = Some(e);
+ }
+ }
+ }
+
+ match err {
+ Some(e) => Err(e),
+ None => Err(ConnectError::new(
+ "tcp connect error",
+ std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"),
+ )),
+ }
+ }
+}
+
+fn bind_local_address(
+ socket: &socket2::Socket,
+ dst_addr: &SocketAddr,
+ local_addr_ipv4: &Option<Ipv4Addr>,
+ local_addr_ipv6: &Option<Ipv6Addr>,
+) -> io::Result<()> {
+ match (*dst_addr, local_addr_ipv4, local_addr_ipv6) {
+ (SocketAddr::V4(_), Some(addr), _) => {
+ socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?;
+ }
+ (SocketAddr::V6(_), _, Some(addr)) => {
+ socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?;
+ }
+ _ => {
+ if cfg!(windows) {
+ // Windows requires a socket be bound before calling connect
+ let any: SocketAddr = match *dst_addr {
+ SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(),
+ SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(),
+ };
+ socket.bind(&any.into())?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+fn connect(
+ addr: &SocketAddr,
+ config: &Config,
+ connect_timeout: Option<Duration>,
+) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> {
+ // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the
+ // keepalive timeout, it would be nice to use that instead of socket2,
+ // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance...
+ use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type};
+ use std::convert::TryInto;
+
+ let domain = Domain::for_address(*addr);
+ let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
+ .map_err(ConnectError::m("tcp open error"))?;
+
+ // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is
+ // responsible for ensuring O_NONBLOCK is set.
+ socket
+ .set_nonblocking(true)
+ .map_err(ConnectError::m("tcp set_nonblocking error"))?;
+
+ if let Some(dur) = config.keep_alive_timeout {
+ let conf = TcpKeepalive::new().with_time(dur);
+ if let Err(e) = socket.set_tcp_keepalive(&conf) {
+ warn!("tcp set_keepalive error: {}", e);
+ }
+ }
+
+ bind_local_address(
+ &socket,
+ addr,
+ &config.local_address_ipv4,
+ &config.local_address_ipv6,
+ )
+ .map_err(ConnectError::m("tcp bind local error"))?;
+
+ #[cfg(unix)]
+ let socket = unsafe {
+ // Safety: `from_raw_fd` is only safe to call if ownership of the raw
+ // file descriptor is transferred. Since we call `into_raw_fd` on the
+ // socket2 socket, it gives up ownership of the fd and will not close
+ // it, so this is safe.
+ use std::os::unix::io::{FromRawFd, IntoRawFd};
+ TcpSocket::from_raw_fd(socket.into_raw_fd())
+ };
+ #[cfg(windows)]
+ let socket = unsafe {
+ // Safety: `from_raw_socket` is only safe to call if ownership of the raw
+ // Windows SOCKET is transferred. Since we call `into_raw_socket` on the
+ // socket2 socket, it gives up ownership of the SOCKET and will not close
+ // it, so this is safe.
+ use std::os::windows::io::{FromRawSocket, IntoRawSocket};
+ TcpSocket::from_raw_socket(socket.into_raw_socket())
+ };
+
+ if config.reuse_address {
+ if let Err(e) = socket.set_reuseaddr(true) {
+ warn!("tcp set_reuse_address error: {}", e);
+ }
+ }
+
+ if let Some(size) = config.send_buffer_size {
+ if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) {
+ warn!("tcp set_buffer_size error: {}", e);
+ }
+ }
+
+ if let Some(size) = config.recv_buffer_size {
+ if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) {
+ warn!("tcp set_recv_buffer_size error: {}", e);
+ }
+ }
+
+ let connect = socket.connect(*addr);
+ Ok(async move {
+ match connect_timeout {
+ Some(dur) => match tokio::time::timeout(dur, connect).await {
+ Ok(Ok(s)) => Ok(s),
+ Ok(Err(e)) => Err(e),
+ Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
+ },
+ None => connect.await,
+ }
+ .map_err(ConnectError::m("tcp connect error"))
+ })
+}
+
+impl ConnectingTcp<'_> {
+ async fn connect(mut self) -> Result<TcpStream, ConnectError> {
+ match self.fallback {
+ None => self.preferred.connect(self.config).await,
+ Some(mut fallback) => {
+ let preferred_fut = self.preferred.connect(self.config);
+ futures_util::pin_mut!(preferred_fut);
+
+ let fallback_fut = fallback.remote.connect(self.config);
+ futures_util::pin_mut!(fallback_fut);
+
+ let fallback_delay = fallback.delay;
+ futures_util::pin_mut!(fallback_delay);
+
+ let (result, future) =
+ match futures_util::future::select(preferred_fut, fallback_delay).await {
+ Either::Left((result, _fallback_delay)) => {
+ (result, Either::Right(fallback_fut))
+ }
+ Either::Right(((), preferred_fut)) => {
+ // Delay is done, start polling both the preferred and the fallback
+ futures_util::future::select(preferred_fut, fallback_fut)
+ .await
+ .factor_first()
+ }
+ };
+
+ if result.is_err() {
+ // Fallback to the remaining future (could be preferred or fallback)
+ // if we get an error
+ future.await
+ } else {
+ result
+ }
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io;
+
+ use ::http::Uri;
+
+ use super::super::sealed::{Connect, ConnectSvc};
+ use super::{Config, ConnectError, HttpConnector};
+
+ async fn connect<C>(
+ connector: C,
+ dst: Uri,
+ ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error>
+ where
+ C: Connect,
+ {
+ connector.connect(super::super::sealed::Internal, dst).await
+ }
+
+ #[tokio::test]
+ async fn test_errors_enforce_http() {
+ let dst = "https://example.domain/foo/bar?baz".parse().unwrap();
+ let connector = HttpConnector::new();
+
+ let err = connect(connector, dst).await.unwrap_err();
+ assert_eq!(&*err.msg, super::INVALID_NOT_HTTP);
+ }
+
+ #[cfg(any(target_os = "linux", target_os = "macos"))]
+ fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) {
+ use std::net::{IpAddr, TcpListener};
+
+ let mut ip_v4 = None;
+ let mut ip_v6 = None;
+
+ let ips = pnet_datalink::interfaces()
+ .into_iter()
+ .flat_map(|i| i.ips.into_iter().map(|n| n.ip()));
+
+ for ip in ips {
+ match ip {
+ IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip),
+ IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip),
+ _ => (),
+ }
+
+ if ip_v4.is_some() && ip_v6.is_some() {
+ break;
+ }
+ }
+
+ (ip_v4, ip_v6)
+ }
+
+ #[tokio::test]
+ async fn test_errors_missing_scheme() {
+ let dst = "example.domain".parse().unwrap();
+ let mut connector = HttpConnector::new();
+ connector.enforce_http(false);
+
+ let err = connect(connector, dst).await.unwrap_err();
+ assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME);
+ }
+
+ // NOTE: pnet crate that we use in this test doesn't compile on Windows
+ #[cfg(any(target_os = "linux", target_os = "macos"))]
+ #[tokio::test]
+ async fn local_address() {
+ use std::net::{IpAddr, TcpListener};
+ let _ = pretty_env_logger::try_init();
+
+ let (bind_ip_v4, bind_ip_v6) = get_local_ips();
+ let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
+ let port = server4.local_addr().unwrap().port();
+ let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap();
+
+ let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move {
+ let mut connector = HttpConnector::new();
+
+ match (bind_ip_v4, bind_ip_v6) {
+ (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6),
+ (Some(v4), None) => connector.set_local_address(Some(v4.into())),
+ (None, Some(v6)) => connector.set_local_address(Some(v6.into())),
+ _ => unreachable!(),
+ }
+
+ connect(connector, dst.parse().unwrap()).await.unwrap();
+
+ let (_, client_addr) = server.accept().unwrap();
+
+ assert_eq!(client_addr.ip(), expected_ip);
+ };
+
+ if let Some(ip) = bind_ip_v4 {
+ assert_client_ip(format!("http://127.0.0.1:{}", port), server4, ip.into()).await;
+ }
+
+ if let Some(ip) = bind_ip_v6 {
+ assert_client_ip(format!("http://[::1]:{}", port), server6, ip.into()).await;
+ }
+ }
+
+ #[test]
+ #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
+ fn client_happy_eyeballs() {
+ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
+ use std::time::{Duration, Instant};
+
+ use super::dns;
+ use super::ConnectingTcp;
+
+ let _ = pretty_env_logger::try_init();
+ let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
+ let addr = server4.local_addr().unwrap();
+ let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let local_timeout = Duration::default();
+ let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
+ let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
+ let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
+ + Duration::from_millis(250);
+
+ let scenarios = &[
+ // Fast primary, without fallback.
+ (&[local_ipv4_addr()][..], 4, local_timeout, false),
+ (&[local_ipv6_addr()][..], 6, local_timeout, false),
+ // Fast primary, with (unused) fallback.
+ (
+ &[local_ipv4_addr(), local_ipv6_addr()][..],
+ 4,
+ local_timeout,
+ false,
+ ),
+ (
+ &[local_ipv6_addr(), local_ipv4_addr()][..],
+ 6,
+ local_timeout,
+ false,
+ ),
+ // Unreachable + fast primary, without fallback.
+ (
+ &[unreachable_ipv4_addr(), local_ipv4_addr()][..],
+ 4,
+ unreachable_v4_timeout,
+ false,
+ ),
+ (
+ &[unreachable_ipv6_addr(), local_ipv6_addr()][..],
+ 6,
+ unreachable_v6_timeout,
+ false,
+ ),
+ // Unreachable + fast primary, with (unused) fallback.
+ (
+ &[
+ unreachable_ipv4_addr(),
+ local_ipv4_addr(),
+ local_ipv6_addr(),
+ ][..],
+ 4,
+ unreachable_v4_timeout,
+ false,
+ ),
+ (
+ &[
+ unreachable_ipv6_addr(),
+ local_ipv6_addr(),
+ local_ipv4_addr(),
+ ][..],
+ 6,
+ unreachable_v6_timeout,
+ true,
+ ),
+ // Slow primary, with (used) fallback.
+ (
+ &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
+ 6,
+ fallback_timeout,
+ false,
+ ),
+ (
+ &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
+ 4,
+ fallback_timeout,
+ true,
+ ),
+ // Slow primary, with (used) unreachable + fast fallback.
+ (
+ &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
+ 6,
+ fallback_timeout + unreachable_v6_timeout,
+ false,
+ ),
+ (
+ &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
+ 4,
+ fallback_timeout + unreachable_v4_timeout,
+ true,
+ ),
+ ];
+
+ // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network.
+ // Otherwise, connection to "slow" IPv6 address will error-out immediately.
+ let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
+
+ for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
+ if needs_ipv6_access && !ipv6_accessible {
+ continue;
+ }
+
+ let (start, stream) = rt
+ .block_on(async move {
+ let addrs = hosts
+ .iter()
+ .map(|host| (host.clone(), addr.port()).into())
+ .collect();
+ let cfg = Config {
+ local_address_ipv4: None,
+ local_address_ipv6: None,
+ connect_timeout: None,
+ keep_alive_timeout: None,
+ happy_eyeballs_timeout: Some(fallback_timeout),
+ nodelay: false,
+ reuse_address: false,
+ enforce_http: false,
+ send_buffer_size: None,
+ recv_buffer_size: None,
+ };
+ let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg);
+ let start = Instant::now();
+ Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?))
+ })
+ .unwrap();
+ let res = if stream.peer_addr().unwrap().is_ipv4() {
+ 4
+ } else {
+ 6
+ };
+ let duration = start.elapsed();
+
+ // Allow actual duration to be +/- 150ms off.
+ let min_duration = if timeout >= Duration::from_millis(150) {
+ timeout - Duration::from_millis(150)
+ } else {
+ Duration::default()
+ };
+ let max_duration = timeout + Duration::from_millis(150);
+
+ assert_eq!(res, family);
+ assert!(duration >= min_duration);
+ assert!(duration <= max_duration);
+ }
+
+ fn local_ipv4_addr() -> IpAddr {
+ Ipv4Addr::new(127, 0, 0, 1).into()
+ }
+
+ fn local_ipv6_addr() -> IpAddr {
+ Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
+ }
+
+ fn unreachable_ipv4_addr() -> IpAddr {
+ Ipv4Addr::new(127, 0, 0, 2).into()
+ }
+
+ fn unreachable_ipv6_addr() -> IpAddr {
+ Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
+ }
+
+ fn slow_ipv4_addr() -> IpAddr {
+ // RFC 6890 reserved IPv4 address.
+ Ipv4Addr::new(198, 18, 0, 25).into()
+ }
+
+ fn slow_ipv6_addr() -> IpAddr {
+ // RFC 6890 reserved IPv6 address.
+ Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
+ }
+
+ fn measure_connect(addr: IpAddr) -> (bool, Duration) {
+ let start = Instant::now();
+ let result =
+ std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1));
+
+ let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
+ let duration = start.elapsed();
+ (reachable, duration)
+ }
+ }
+}
diff --git a/third_party/rust/hyper/src/client/connect/mod.rs b/third_party/rust/hyper/src/client/connect/mod.rs
new file mode 100644
index 0000000000..862a0e65c1
--- /dev/null
+++ b/third_party/rust/hyper/src/client/connect/mod.rs
@@ -0,0 +1,412 @@
+//! Connectors used by the `Client`.
+//!
+//! This module contains:
+//!
+//! - A default [`HttpConnector`][] that does DNS resolution and establishes
+//! connections over TCP.
+//! - Types to build custom connectors.
+//!
+//! # Connectors
+//!
+//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
+//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
+//! and [`Connection`][].
+//!
+//! ## Custom Connectors
+//!
+//! A simple connector that ignores the `Uri` destination and always returns
+//! a TCP connection to the same address could be written like this:
+//!
+//! ```rust,ignore
+//! let connector = tower::service_fn(|_dst| async {
+//! tokio::net::TcpStream::connect("127.0.0.1:1337")
+//! })
+//! ```
+//!
+//! Or, fully written out:
+//!
+//! ```
+//! # #[cfg(feature = "runtime")]
+//! # mod rt {
+//! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
+//! use hyper::{service::Service, Uri};
+//! use tokio::net::TcpStream;
+//!
+//! #[derive(Clone)]
+//! struct LocalConnector;
+//!
+//! impl Service<Uri> for LocalConnector {
+//! type Response = TcpStream;
+//! type Error = std::io::Error;
+//! // We can't "name" an `async` generated future.
+//! type Future = Pin<Box<
+//! dyn Future<Output = Result<Self::Response, Self::Error>> + Send
+//! >>;
+//!
+//! fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+//! // This connector is always ready, but others might not be.
+//! Poll::Ready(Ok(()))
+//! }
+//!
+//! fn call(&mut self, _: Uri) -> Self::Future {
+//! Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
+//! }
+//! }
+//! # }
+//! ```
+//!
+//! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a
+//! better starting place to extend from.
+//!
+//! Using either of the above connector examples, it can be used with the
+//! `Client` like this:
+//!
+//! ```
+//! # #[cfg(feature = "runtime")]
+//! # fn rt () {
+//! # let connector = hyper::client::HttpConnector::new();
+//! // let connector = ...
+//!
+//! let client = hyper::Client::builder()
+//! .build::<_, hyper::Body>(connector);
+//! # }
+//! ```
+//!
+//!
+//! [`HttpConnector`]: HttpConnector
+//! [`Service`]: crate::service::Service
+//! [`Uri`]: ::http::Uri
+//! [`AsyncRead`]: tokio::io::AsyncRead
+//! [`AsyncWrite`]: tokio::io::AsyncWrite
+//! [`Connection`]: Connection
+use std::fmt;
+
+use ::http::Extensions;
+
+cfg_feature! {
+ #![feature = "tcp"]
+
+ pub use self::http::{HttpConnector, HttpInfo};
+
+ pub mod dns;
+ mod http;
+}
+
+cfg_feature! {
+ #![any(feature = "http1", feature = "http2")]
+
+ pub use self::sealed::Connect;
+}
+
+/// Describes a type returned by a connector.
+pub trait Connection {
+ /// Return metadata describing the connection.
+ fn connected(&self) -> Connected;
+}
+
+/// Extra information about the connected transport.
+///
+/// This can be used to inform recipients about things like if ALPN
+/// was used, or if connected to an HTTP proxy.
+#[derive(Debug)]
+pub struct Connected {
+ pub(super) alpn: Alpn,
+ pub(super) is_proxied: bool,
+ pub(super) extra: Option<Extra>,
+}
+
+pub(super) struct Extra(Box<dyn ExtraInner>);
+
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub(super) enum Alpn {
+ H2,
+ None,
+}
+
+impl Connected {
+ /// Create new `Connected` type with empty metadata.
+ pub fn new() -> Connected {
+ Connected {
+ alpn: Alpn::None,
+ is_proxied: false,
+ extra: None,
+ }
+ }
+
+ /// Set whether the connected transport is to an HTTP proxy.
+ ///
+ /// This setting will affect if HTTP/1 requests written on the transport
+ /// will have the request-target in absolute-form or origin-form:
+ ///
+ /// - When `proxy(false)`:
+ ///
+ /// ```http
+ /// GET /guide HTTP/1.1
+ /// ```
+ ///
+ /// - When `proxy(true)`:
+ ///
+ /// ```http
+ /// GET http://hyper.rs/guide HTTP/1.1
+ /// ```
+ ///
+ /// Default is `false`.
+ pub fn proxy(mut self, is_proxied: bool) -> Connected {
+ self.is_proxied = is_proxied;
+ self
+ }
+
+ /// Determines if the connected transport is to an HTTP proxy.
+ pub fn is_proxied(&self) -> bool {
+ self.is_proxied
+ }
+
+ /// Set extra connection information to be set in the extensions of every `Response`.
+ pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
+ if let Some(prev) = self.extra {
+ self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
+ } else {
+ self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
+ }
+ self
+ }
+
+ /// Copies the extra connection information into an `Extensions` map.
+ pub fn get_extras(&self, extensions: &mut Extensions) {
+ if let Some(extra) = &self.extra {
+ extra.set(extensions);
+ }
+ }
+
+ /// Set that the connected transport negotiated HTTP/2 as its next protocol.
+ pub fn negotiated_h2(mut self) -> Connected {
+ self.alpn = Alpn::H2;
+ self
+ }
+
+ /// Determines if the connected transport negotiated HTTP/2 as its next protocol.
+ pub fn is_negotiated_h2(&self) -> bool {
+ self.alpn == Alpn::H2
+ }
+
+ // Don't public expose that `Connected` is `Clone`, unsure if we want to
+ // keep that contract...
+ #[cfg(feature = "http2")]
+ pub(super) fn clone(&self) -> Connected {
+ Connected {
+ alpn: self.alpn.clone(),
+ is_proxied: self.is_proxied,
+ extra: self.extra.clone(),
+ }
+ }
+}
+
+// ===== impl Extra =====
+
+impl Extra {
+ pub(super) fn set(&self, res: &mut Extensions) {
+ self.0.set(res);
+ }
+}
+
+impl Clone for Extra {
+ fn clone(&self) -> Extra {
+ Extra(self.0.clone_box())
+ }
+}
+
+impl fmt::Debug for Extra {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Extra").finish()
+ }
+}
+
+trait ExtraInner: Send + Sync {
+ fn clone_box(&self) -> Box<dyn ExtraInner>;
+ fn set(&self, res: &mut Extensions);
+}
+
+// This indirection allows the `Connected` to have a type-erased "extra" value,
+// while that type still knows its inner extra type. This allows the correct
+// TypeId to be used when inserting into `res.extensions_mut()`.
+#[derive(Clone)]
+struct ExtraEnvelope<T>(T);
+
+impl<T> ExtraInner for ExtraEnvelope<T>
+where
+ T: Clone + Send + Sync + 'static,
+{
+ fn clone_box(&self) -> Box<dyn ExtraInner> {
+ Box::new(self.clone())
+ }
+
+ fn set(&self, res: &mut Extensions) {
+ res.insert(self.0.clone());
+ }
+}
+
+struct ExtraChain<T>(Box<dyn ExtraInner>, T);
+
+impl<T: Clone> Clone for ExtraChain<T> {
+ fn clone(&self) -> Self {
+ ExtraChain(self.0.clone_box(), self.1.clone())
+ }
+}
+
+impl<T> ExtraInner for ExtraChain<T>
+where
+ T: Clone + Send + Sync + 'static,
+{
+ fn clone_box(&self) -> Box<dyn ExtraInner> {
+ Box::new(self.clone())
+ }
+
+ fn set(&self, res: &mut Extensions) {
+ self.0.set(res);
+ res.insert(self.1.clone());
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+pub(super) mod sealed {
+ use std::error::Error as StdError;
+
+ use ::http::Uri;
+ use tokio::io::{AsyncRead, AsyncWrite};
+
+ use super::Connection;
+ use crate::common::{Future, Unpin};
+
+ /// Connect to a destination, returning an IO transport.
+ ///
+ /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
+ /// ready connection.
+ ///
+ /// # Trait Alias
+ ///
+ /// This is really just an *alias* for the `tower::Service` trait, with
+ /// additional bounds set for convenience *inside* hyper. You don't actually
+ /// implement this trait, but `tower::Service<Uri>` instead.
+ // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
+ // fit the `Connect` bounds because of the blanket impl for `Service`.
+ pub trait Connect: Sealed + Sized {
+ #[doc(hidden)]
+ type _Svc: ConnectSvc;
+ #[doc(hidden)]
+ fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
+ }
+
+ pub trait ConnectSvc {
+ type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
+ type Error: Into<Box<dyn StdError + Send + Sync>>;
+ type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
+
+ fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
+ }
+
+ impl<S, T> Connect for S
+ where
+ S: tower_service::Service<Uri, Response = T> + Send + 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S::Future: Unpin + Send,
+ T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
+ {
+ type _Svc = S;
+
+ fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
+ crate::service::oneshot(self, dst)
+ }
+ }
+
+ impl<S, T> ConnectSvc for S
+ where
+ S: tower_service::Service<Uri, Response = T> + Send + 'static,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S::Future: Unpin + Send,
+ T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
+ {
+ type Connection = T;
+ type Error = S::Error;
+ type Future = crate::service::Oneshot<S, Uri>;
+
+ fn connect(self, _: Internal, dst: Uri) -> Self::Future {
+ crate::service::oneshot(self, dst)
+ }
+ }
+
+ impl<S, T> Sealed for S
+ where
+ S: tower_service::Service<Uri, Response = T> + Send,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S::Future: Unpin + Send,
+ T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
+ {
+ }
+
+ pub trait Sealed {}
+ #[allow(missing_debug_implementations)]
+ pub struct Internal;
+}
+
+#[cfg(test)]
+mod tests {
+ use super::Connected;
+
+ #[derive(Clone, Debug, PartialEq)]
+ struct Ex1(usize);
+
+ #[derive(Clone, Debug, PartialEq)]
+ struct Ex2(&'static str);
+
+ #[derive(Clone, Debug, PartialEq)]
+ struct Ex3(&'static str);
+
+ #[test]
+ fn test_connected_extra() {
+ let c1 = Connected::new().extra(Ex1(41));
+
+ let mut ex = ::http::Extensions::new();
+
+ assert_eq!(ex.get::<Ex1>(), None);
+
+ c1.extra.as_ref().expect("c1 extra").set(&mut ex);
+
+ assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
+ }
+
+ #[test]
+ fn test_connected_extra_chain() {
+ // If a user composes connectors and at each stage, there's "extra"
+ // info to attach, it shouldn't override the previous extras.
+
+ let c1 = Connected::new()
+ .extra(Ex1(45))
+ .extra(Ex2("zoom"))
+ .extra(Ex3("pew pew"));
+
+ let mut ex1 = ::http::Extensions::new();
+
+ assert_eq!(ex1.get::<Ex1>(), None);
+ assert_eq!(ex1.get::<Ex2>(), None);
+ assert_eq!(ex1.get::<Ex3>(), None);
+
+ c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
+
+ assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
+ assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
+ assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
+
+ // Just like extensions, inserting the same type overrides previous type.
+ let c2 = Connected::new()
+ .extra(Ex1(33))
+ .extra(Ex2("hiccup"))
+ .extra(Ex1(99));
+
+ let mut ex2 = ::http::Extensions::new();
+
+ c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
+
+ assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
+ assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
+ }
+}
diff --git a/third_party/rust/hyper/src/client/dispatch.rs b/third_party/rust/hyper/src/client/dispatch.rs
new file mode 100644
index 0000000000..0d70dbccea
--- /dev/null
+++ b/third_party/rust/hyper/src/client/dispatch.rs
@@ -0,0 +1,436 @@
+#[cfg(feature = "http2")]
+use std::future::Future;
+
+use futures_util::FutureExt;
+use tokio::sync::{mpsc, oneshot};
+
+#[cfg(feature = "http2")]
+use crate::common::Pin;
+use crate::common::{task, Poll};
+
+pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
+pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
+
+pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
+ let (tx, rx) = mpsc::unbounded_channel();
+ let (giver, taker) = want::new();
+ let tx = Sender {
+ buffered_once: false,
+ giver,
+ inner: tx,
+ };
+ let rx = Receiver { inner: rx, taker };
+ (tx, rx)
+}
+
+/// A bounded sender of requests and callbacks for when responses are ready.
+///
+/// While the inner sender is unbounded, the Giver is used to determine
+/// if the Receiver is ready for another request.
+pub(crate) struct Sender<T, U> {
+ /// One message is always allowed, even if the Receiver hasn't asked
+ /// for it yet. This boolean keeps track of whether we've sent one
+ /// without notice.
+ buffered_once: bool,
+ /// The Giver helps watch that the the Receiver side has been polled
+ /// when the queue is empty. This helps us know when a request and
+ /// response have been fully processed, and a connection is ready
+ /// for more.
+ giver: want::Giver,
+ /// Actually bounded by the Giver, plus `buffered_once`.
+ inner: mpsc::UnboundedSender<Envelope<T, U>>,
+}
+
+/// An unbounded version.
+///
+/// Cannot poll the Giver, but can still use it to determine if the Receiver
+/// has been dropped. However, this version can be cloned.
+#[cfg(feature = "http2")]
+pub(crate) struct UnboundedSender<T, U> {
+ /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
+ giver: want::SharedGiver,
+ inner: mpsc::UnboundedSender<Envelope<T, U>>,
+}
+
+impl<T, U> Sender<T, U> {
+ pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
+ self.giver
+ .poll_want(cx)
+ .map_err(|_| crate::Error::new_closed())
+ }
+
+ pub(crate) fn is_ready(&self) -> bool {
+ self.giver.is_wanting()
+ }
+
+ pub(crate) fn is_closed(&self) -> bool {
+ self.giver.is_canceled()
+ }
+
+ fn can_send(&mut self) -> bool {
+ if self.giver.give() || !self.buffered_once {
+ // If the receiver is ready *now*, then of course we can send.
+ //
+ // If the receiver isn't ready yet, but we don't have anything
+ // in the channel yet, then allow one message.
+ self.buffered_once = true;
+ true
+ } else {
+ false
+ }
+ }
+
+ pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
+ if !self.can_send() {
+ return Err(val);
+ }
+ let (tx, rx) = oneshot::channel();
+ self.inner
+ .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
+ .map(move |_| rx)
+ .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
+ }
+
+ pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
+ if !self.can_send() {
+ return Err(val);
+ }
+ let (tx, rx) = oneshot::channel();
+ self.inner
+ .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
+ .map(move |_| rx)
+ .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
+ }
+
+ #[cfg(feature = "http2")]
+ pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
+ UnboundedSender {
+ giver: self.giver.shared(),
+ inner: self.inner,
+ }
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<T, U> UnboundedSender<T, U> {
+ pub(crate) fn is_ready(&self) -> bool {
+ !self.giver.is_canceled()
+ }
+
+ pub(crate) fn is_closed(&self) -> bool {
+ self.giver.is_canceled()
+ }
+
+ pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
+ let (tx, rx) = oneshot::channel();
+ self.inner
+ .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
+ .map(move |_| rx)
+ .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
+ }
+}
+
+#[cfg(feature = "http2")]
+impl<T, U> Clone for UnboundedSender<T, U> {
+ fn clone(&self) -> Self {
+ UnboundedSender {
+ giver: self.giver.clone(),
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+pub(crate) struct Receiver<T, U> {
+ inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
+ taker: want::Taker,
+}
+
+impl<T, U> Receiver<T, U> {
+ pub(crate) fn poll_recv(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<(T, Callback<T, U>)>> {
+ match self.inner.poll_recv(cx) {
+ Poll::Ready(item) => {
+ Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
+ }
+ Poll::Pending => {
+ self.taker.want();
+ Poll::Pending
+ }
+ }
+ }
+
+ #[cfg(feature = "http1")]
+ pub(crate) fn close(&mut self) {
+ self.taker.cancel();
+ self.inner.close();
+ }
+
+ #[cfg(feature = "http1")]
+ pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
+ match self.inner.recv().now_or_never() {
+ Some(Some(mut env)) => env.0.take(),
+ _ => None,
+ }
+ }
+}
+
+impl<T, U> Drop for Receiver<T, U> {
+ fn drop(&mut self) {
+ // Notify the giver about the closure first, before dropping
+ // the mpsc::Receiver.
+ self.taker.cancel();
+ }
+}
+
+struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
+
+impl<T, U> Drop for Envelope<T, U> {
+ fn drop(&mut self) {
+ if let Some((val, cb)) = self.0.take() {
+ cb.send(Err((
+ crate::Error::new_canceled().with("connection closed"),
+ Some(val),
+ )));
+ }
+ }
+}
+
+pub(crate) enum Callback<T, U> {
+ Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
+ NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
+}
+
+impl<T, U> Drop for Callback<T, U> {
+ fn drop(&mut self) {
+ // FIXME(nox): What errors do we want here?
+ let error = crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
+ "user code panicked"
+ } else {
+ "runtime dropped the dispatch task"
+ });
+
+ match self {
+ Callback::Retry(tx) => {
+ if let Some(tx) = tx.take() {
+ let _ = tx.send(Err((error, None)));
+ }
+ }
+ Callback::NoRetry(tx) => {
+ if let Some(tx) = tx.take() {
+ let _ = tx.send(Err(error));
+ }
+ }
+ }
+ }
+}
+
+impl<T, U> Callback<T, U> {
+ #[cfg(feature = "http2")]
+ pub(crate) fn is_canceled(&self) -> bool {
+ match *self {
+ Callback::Retry(Some(ref tx)) => tx.is_closed(),
+ Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
+ _ => unreachable!(),
+ }
+ }
+
+ pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> {
+ match *self {
+ Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
+ Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
+ _ => unreachable!(),
+ }
+ }
+
+ pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
+ match self {
+ Callback::Retry(ref mut tx) => {
+ let _ = tx.take().unwrap().send(val);
+ }
+ Callback::NoRetry(ref mut tx) => {
+ let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
+ }
+ }
+ }
+
+ #[cfg(feature = "http2")]
+ pub(crate) async fn send_when(
+ self,
+ mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin,
+ ) {
+ use futures_util::future;
+ use tracing::trace;
+
+ let mut cb = Some(self);
+
+ // "select" on this callback being canceled, and the future completing
+ future::poll_fn(move |cx| {
+ match Pin::new(&mut when).poll(cx) {
+ Poll::Ready(Ok(res)) => {
+ cb.take().expect("polled after complete").send(Ok(res));
+ Poll::Ready(())
+ }
+ Poll::Pending => {
+ // check if the callback is canceled
+ ready!(cb.as_mut().unwrap().poll_canceled(cx));
+ trace!("send_when canceled");
+ Poll::Ready(())
+ }
+ Poll::Ready(Err(err)) => {
+ cb.take().expect("polled after complete").send(Err(err));
+ Poll::Ready(())
+ }
+ }
+ })
+ .await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ #[cfg(feature = "nightly")]
+ extern crate test;
+
+ use std::future::Future;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ use super::{channel, Callback, Receiver};
+
+ #[derive(Debug)]
+ struct Custom(i32);
+
+ impl<T, U> Future for Receiver<T, U> {
+ type Output = Option<(T, Callback<T, U>)>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.poll_recv(cx)
+ }
+ }
+
+ /// Helper to check if the future is ready after polling once.
+ struct PollOnce<'a, F>(&'a mut F);
+
+ impl<F, T> Future for PollOnce<'_, F>
+ where
+ F: Future<Output = T> + Unpin,
+ {
+ type Output = Option<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match Pin::new(&mut self.0).poll(cx) {
+ Poll::Ready(_) => Poll::Ready(Some(())),
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn drop_receiver_sends_cancel_errors() {
+ let _ = pretty_env_logger::try_init();
+
+ let (mut tx, mut rx) = channel::<Custom, ()>();
+
+ // must poll once for try_send to succeed
+ assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
+
+ let promise = tx.try_send(Custom(43)).unwrap();
+ drop(rx);
+
+ let fulfilled = promise.await;
+ let err = fulfilled
+ .expect("fulfilled")
+ .expect_err("promise should error");
+ match (err.0.kind(), err.1) {
+ (&crate::error::Kind::Canceled, Some(_)) => (),
+ e => panic!("expected Error::Cancel(_), found {:?}", e),
+ }
+ }
+
+ #[tokio::test]
+ async fn sender_checks_for_want_on_send() {
+ let (mut tx, mut rx) = channel::<Custom, ()>();
+
+ // one is allowed to buffer, second is rejected
+ let _ = tx.try_send(Custom(1)).expect("1 buffered");
+ tx.try_send(Custom(2)).expect_err("2 not ready");
+
+ assert!(PollOnce(&mut rx).await.is_some(), "rx once");
+
+ // Even though 1 has been popped, only 1 could be buffered for the
+ // lifetime of the channel.
+ tx.try_send(Custom(2)).expect_err("2 still not ready");
+
+ assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
+
+ let _ = tx.try_send(Custom(2)).expect("2 ready");
+ }
+
+ #[cfg(feature = "http2")]
+ #[test]
+ fn unbounded_sender_doesnt_bound_on_want() {
+ let (tx, rx) = channel::<Custom, ()>();
+ let mut tx = tx.unbound();
+
+ let _ = tx.try_send(Custom(1)).unwrap();
+ let _ = tx.try_send(Custom(2)).unwrap();
+ let _ = tx.try_send(Custom(3)).unwrap();
+
+ drop(rx);
+
+ let _ = tx.try_send(Custom(4)).unwrap_err();
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn giver_queue_throughput(b: &mut test::Bencher) {
+ use crate::{Body, Request, Response};
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let (mut tx, mut rx) = channel::<Request<Body>, Response<Body>>();
+
+ b.iter(move || {
+ let _ = tx.send(Request::default()).unwrap();
+ rt.block_on(async {
+ loop {
+ let poll_once = PollOnce(&mut rx);
+ let opt = poll_once.await;
+ if opt.is_none() {
+ break;
+ }
+ }
+ });
+ })
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn giver_queue_not_ready(b: &mut test::Bencher) {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let (_tx, mut rx) = channel::<i32, ()>();
+ b.iter(move || {
+ rt.block_on(async {
+ let poll_once = PollOnce(&mut rx);
+ assert!(poll_once.await.is_none());
+ });
+ })
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn giver_queue_cancel(b: &mut test::Bencher) {
+ let (_tx, mut rx) = channel::<i32, ()>();
+
+ b.iter(move || {
+ rx.taker.cancel();
+ })
+ }
+}
diff --git a/third_party/rust/hyper/src/client/mod.rs b/third_party/rust/hyper/src/client/mod.rs
new file mode 100644
index 0000000000..734bda8819
--- /dev/null
+++ b/third_party/rust/hyper/src/client/mod.rs
@@ -0,0 +1,68 @@
+//! HTTP Client
+//!
+//! There are two levels of APIs provided for construct HTTP clients:
+//!
+//! - The higher-level [`Client`](Client) type.
+//! - The lower-level [`conn`](conn) module.
+//!
+//! # Client
+//!
+//! The [`Client`](Client) is the main way to send HTTP requests to a server.
+//! The default `Client` provides these things on top of the lower-level API:
+//!
+//! - A default **connector**, able to resolve hostnames and connect to
+//! destinations over plain-text TCP.
+//! - A **pool** of existing connections, allowing better performance when
+//! making multiple requests to the same hostname.
+//! - Automatic setting of the `Host` header, based on the request `Uri`.
+//! - Automatic request **retries** when a pooled connection is closed by the
+//! server before any bytes have been written.
+//!
+//! Many of these features can configured, by making use of
+//! [`Client::builder`](Client::builder).
+//!
+//! ## Example
+//!
+//! For a small example program simply fetching a URL, take a look at the
+//! [full client example](https://github.com/hyperium/hyper/blob/master/examples/client.rs).
+//!
+//! ```
+//! # #[cfg(all(feature = "tcp", feature = "client", any(feature = "http1", feature = "http2")))]
+//! # async fn fetch_httpbin() -> hyper::Result<()> {
+//! use hyper::{body::HttpBody as _, Client, Uri};
+//!
+//! let client = Client::new();
+//!
+//! // Make a GET /ip to 'http://httpbin.org'
+//! let res = client.get(Uri::from_static("http://httpbin.org/ip")).await?;
+//!
+//! // And then, if the request gets a response...
+//! println!("status: {}", res.status());
+//!
+//! // Concatenate the body stream into a single buffer...
+//! let buf = hyper::body::to_bytes(res).await?;
+//!
+//! println!("body: {:?}", buf);
+//! # Ok(())
+//! # }
+//! # fn main () {}
+//! ```
+
+#[cfg(feature = "tcp")]
+pub use self::connect::HttpConnector;
+
+pub mod connect;
+#[cfg(all(test, feature = "runtime"))]
+mod tests;
+
+cfg_feature! {
+ #![any(feature = "http1", feature = "http2")]
+
+ pub use self::client::{Builder, Client, ResponseFuture};
+
+ mod client;
+ pub mod conn;
+ pub(super) mod dispatch;
+ mod pool;
+ pub mod service;
+}
diff --git a/third_party/rust/hyper/src/client/pool.rs b/third_party/rust/hyper/src/client/pool.rs
new file mode 100644
index 0000000000..b9772d688d
--- /dev/null
+++ b/third_party/rust/hyper/src/client/pool.rs
@@ -0,0 +1,1044 @@
+use std::collections::{HashMap, HashSet, VecDeque};
+use std::error::Error as StdError;
+use std::fmt;
+use std::ops::{Deref, DerefMut};
+use std::sync::{Arc, Mutex, Weak};
+
+#[cfg(not(feature = "runtime"))]
+use std::time::{Duration, Instant};
+
+use futures_channel::oneshot;
+#[cfg(feature = "runtime")]
+use tokio::time::{Duration, Instant, Interval};
+use tracing::{debug, trace};
+
+use super::client::Ver;
+use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Pool<T> {
+ // If the pool is disabled, this is None.
+ inner: Option<Arc<Mutex<PoolInner<T>>>>,
+}
+
+// Before using a pooled connection, make sure the sender is not dead.
+//
+// This is a trait to allow the `client::pool::tests` to work for `i32`.
+//
+// See https://github.com/hyperium/hyper/issues/1429
+pub(super) trait Poolable: Unpin + Send + Sized + 'static {
+ fn is_open(&self) -> bool;
+ /// Reserve this connection.
+ ///
+ /// Allows for HTTP/2 to return a shared reservation.
+ fn reserve(self) -> Reservation<Self>;
+ fn can_share(&self) -> bool;
+}
+
+/// When checking out a pooled connection, it might be that the connection
+/// only supports a single reservation, or it might be usable for many.
+///
+/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
+/// used for multiple requests.
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) enum Reservation<T> {
+ /// This connection could be used multiple times, the first one will be
+ /// reinserted into the `idle` pool, and the second will be given to
+ /// the `Checkout`.
+ #[cfg(feature = "http2")]
+ Shared(T, T),
+ /// This connection requires unique access. It will be returned after
+ /// use is complete.
+ Unique(T),
+}
+
+/// Simple type alias in case the key type needs to be adjusted.
+pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
+
+struct PoolInner<T> {
+ // A flag that a connection is being established, and the connection
+ // should be shared. This prevents making multiple HTTP/2 connections
+ // to the same host.
+ connecting: HashSet<Key>,
+ // These are internal Conns sitting in the event loop in the KeepAlive
+ // state, waiting to receive a new Request to send on the socket.
+ idle: HashMap<Key, Vec<Idle<T>>>,
+ max_idle_per_host: usize,
+ // These are outstanding Checkouts that are waiting for a socket to be
+ // able to send a Request one. This is used when "racing" for a new
+ // connection.
+ //
+ // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
+ // for the Pool to receive an idle Conn. When a Conn becomes idle,
+ // this list is checked for any parked Checkouts, and tries to notify
+ // them that the Conn could be used instead of waiting for a brand new
+ // connection.
+ waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
+ // A oneshot channel is used to allow the interval to be notified when
+ // the Pool completely drops. That way, the interval can cancel immediately.
+ #[cfg(feature = "runtime")]
+ idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
+ #[cfg(feature = "runtime")]
+ exec: Exec,
+ timeout: Option<Duration>,
+}
+
+// This is because `Weak::new()` *allocates* space for `T`, even if it
+// doesn't need it!
+struct WeakOpt<T>(Option<Weak<T>>);
+
+#[derive(Clone, Copy, Debug)]
+pub(super) struct Config {
+ pub(super) idle_timeout: Option<Duration>,
+ pub(super) max_idle_per_host: usize,
+}
+
+impl Config {
+ pub(super) fn is_enabled(&self) -> bool {
+ self.max_idle_per_host > 0
+ }
+}
+
+impl<T> Pool<T> {
+ pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
+ let inner = if config.is_enabled() {
+ Some(Arc::new(Mutex::new(PoolInner {
+ connecting: HashSet::new(),
+ idle: HashMap::new(),
+ #[cfg(feature = "runtime")]
+ idle_interval_ref: None,
+ max_idle_per_host: config.max_idle_per_host,
+ waiters: HashMap::new(),
+ #[cfg(feature = "runtime")]
+ exec: __exec.clone(),
+ timeout: config.idle_timeout,
+ })))
+ } else {
+ None
+ };
+
+ Pool { inner }
+ }
+
+ fn is_enabled(&self) -> bool {
+ self.inner.is_some()
+ }
+
+ #[cfg(test)]
+ pub(super) fn no_timer(&self) {
+ // Prevent an actual interval from being created for this pool...
+ #[cfg(feature = "runtime")]
+ {
+ let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
+ assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
+ let (tx, _) = oneshot::channel();
+ inner.idle_interval_ref = Some(tx);
+ }
+ }
+}
+
+impl<T: Poolable> Pool<T> {
+ /// Returns a `Checkout` which is a future that resolves if an idle
+ /// connection becomes available.
+ pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
+ Checkout {
+ key,
+ pool: self.clone(),
+ waiter: None,
+ }
+ }
+
+ /// Ensure that there is only ever 1 connecting task for HTTP/2
+ /// connections. This does nothing for HTTP/1.
+ pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
+ if ver == Ver::Http2 {
+ if let Some(ref enabled) = self.inner {
+ let mut inner = enabled.lock().unwrap();
+ return if inner.connecting.insert(key.clone()) {
+ let connecting = Connecting {
+ key: key.clone(),
+ pool: WeakOpt::downgrade(enabled),
+ };
+ Some(connecting)
+ } else {
+ trace!("HTTP/2 connecting already in progress for {:?}", key);
+ None
+ };
+ }
+ }
+
+ // else
+ Some(Connecting {
+ key: key.clone(),
+ // in HTTP/1's case, there is never a lock, so we don't
+ // need to do anything in Drop.
+ pool: WeakOpt::none(),
+ })
+ }
+
+ #[cfg(test)]
+ fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
+ self.inner.as_ref().expect("enabled").lock().expect("lock")
+ }
+
+ /* Used in client/tests.rs...
+ #[cfg(feature = "runtime")]
+ #[cfg(test)]
+ pub(super) fn h1_key(&self, s: &str) -> Key {
+ Arc::new(s.to_string())
+ }
+
+ #[cfg(feature = "runtime")]
+ #[cfg(test)]
+ pub(super) fn idle_count(&self, key: &Key) -> usize {
+ self
+ .locked()
+ .idle
+ .get(key)
+ .map(|list| list.len())
+ .unwrap_or(0)
+ }
+ */
+
+ pub(super) fn pooled(
+ &self,
+ #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
+ value: T,
+ ) -> Pooled<T> {
+ let (value, pool_ref) = if let Some(ref enabled) = self.inner {
+ match value.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_insert, to_return) => {
+ let mut inner = enabled.lock().unwrap();
+ inner.put(connecting.key.clone(), to_insert, enabled);
+ // Do this here instead of Drop for Connecting because we
+ // already have a lock, no need to lock the mutex twice.
+ inner.connected(&connecting.key);
+ // prevent the Drop of Connecting from repeating inner.connected()
+ connecting.pool = WeakOpt::none();
+
+ // Shared reservations don't need a reference to the pool,
+ // since the pool always keeps a copy.
+ (to_return, WeakOpt::none())
+ }
+ Reservation::Unique(value) => {
+ // Unique reservations must take a reference to the pool
+ // since they hope to reinsert once the reservation is
+ // completed
+ (value, WeakOpt::downgrade(enabled))
+ }
+ }
+ } else {
+ // If pool is not enabled, skip all the things...
+
+ // The Connecting should have had no pool ref
+ debug_assert!(connecting.pool.upgrade().is_none());
+
+ (value, WeakOpt::none())
+ };
+ Pooled {
+ key: connecting.key.clone(),
+ is_reused: false,
+ pool: pool_ref,
+ value: Some(value),
+ }
+ }
+
+ fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
+ debug!("reuse idle connection for {:?}", key);
+ // TODO: unhack this
+ // In Pool::pooled(), which is used for inserting brand new connections,
+ // there's some code that adjusts the pool reference taken depending
+ // on if the Reservation can be shared or is unique. By the time
+ // reuse() is called, the reservation has already been made, and
+ // we just have the final value, without knowledge of if this is
+ // unique or shared. So, the hack is to just assume Ver::Http2 means
+ // shared... :(
+ let mut pool_ref = WeakOpt::none();
+ if !value.can_share() {
+ if let Some(ref enabled) = self.inner {
+ pool_ref = WeakOpt::downgrade(enabled);
+ }
+ }
+
+ Pooled {
+ is_reused: true,
+ key: key.clone(),
+ pool: pool_ref,
+ value: Some(value),
+ }
+ }
+}
+
+/// Pop off this list, looking for a usable connection that hasn't expired.
+struct IdlePopper<'a, T> {
+ key: &'a Key,
+ list: &'a mut Vec<Idle<T>>,
+}
+
+impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
+ fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
+ while let Some(entry) = self.list.pop() {
+ // If the connection has been closed, or is older than our idle
+ // timeout, simply drop it and keep looking...
+ if !entry.value.is_open() {
+ trace!("removing closed connection for {:?}", self.key);
+ continue;
+ }
+ // TODO: Actually, since the `idle` list is pushed to the end always,
+ // that would imply that if *this* entry is expired, then anything
+ // "earlier" in the list would *have* to be expired also... Right?
+ //
+ // In that case, we could just break out of the loop and drop the
+ // whole list...
+ if expiration.expires(entry.idle_at) {
+ trace!("removing expired connection for {:?}", self.key);
+ continue;
+ }
+
+ let value = match entry.value.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_reinsert, to_checkout) => {
+ self.list.push(Idle {
+ idle_at: Instant::now(),
+ value: to_reinsert,
+ });
+ to_checkout
+ }
+ Reservation::Unique(unique) => unique,
+ };
+
+ return Some(Idle {
+ idle_at: entry.idle_at,
+ value,
+ });
+ }
+
+ None
+ }
+}
+
+impl<T: Poolable> PoolInner<T> {
+ fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
+ if value.can_share() && self.idle.contains_key(&key) {
+ trace!("put; existing idle HTTP/2 connection for {:?}", key);
+ return;
+ }
+ trace!("put; add idle connection for {:?}", key);
+ let mut remove_waiters = false;
+ let mut value = Some(value);
+ if let Some(waiters) = self.waiters.get_mut(&key) {
+ while let Some(tx) = waiters.pop_front() {
+ if !tx.is_canceled() {
+ let reserved = value.take().expect("value already sent");
+ let reserved = match reserved.reserve() {
+ #[cfg(feature = "http2")]
+ Reservation::Shared(to_keep, to_send) => {
+ value = Some(to_keep);
+ to_send
+ }
+ Reservation::Unique(uniq) => uniq,
+ };
+ match tx.send(reserved) {
+ Ok(()) => {
+ if value.is_none() {
+ break;
+ } else {
+ continue;
+ }
+ }
+ Err(e) => {
+ value = Some(e);
+ }
+ }
+ }
+
+ trace!("put; removing canceled waiter for {:?}", key);
+ }
+ remove_waiters = waiters.is_empty();
+ }
+ if remove_waiters {
+ self.waiters.remove(&key);
+ }
+
+ match value {
+ Some(value) => {
+ // borrow-check scope...
+ {
+ let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
+ if self.max_idle_per_host <= idle_list.len() {
+ trace!("max idle per host for {:?}, dropping connection", key);
+ return;
+ }
+
+ debug!("pooling idle connection for {:?}", key);
+ idle_list.push(Idle {
+ value,
+ idle_at: Instant::now(),
+ });
+ }
+
+ #[cfg(feature = "runtime")]
+ {
+ self.spawn_idle_interval(__pool_ref);
+ }
+ }
+ None => trace!("put; found waiter for {:?}", key),
+ }
+ }
+
+ /// A `Connecting` task is complete. Not necessarily successfully,
+ /// but the lock is going away, so clean up.
+ fn connected(&mut self, key: &Key) {
+ let existed = self.connecting.remove(key);
+ debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
+ // cancel any waiters. if there are any, it's because
+ // this Connecting task didn't complete successfully.
+ // those waiters would never receive a connection.
+ self.waiters.remove(key);
+ }
+
+ #[cfg(feature = "runtime")]
+ fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
+ let (dur, rx) = {
+ if self.idle_interval_ref.is_some() {
+ return;
+ }
+
+ if let Some(dur) = self.timeout {
+ let (tx, rx) = oneshot::channel();
+ self.idle_interval_ref = Some(tx);
+ (dur, rx)
+ } else {
+ return;
+ }
+ };
+
+ let interval = IdleTask {
+ interval: tokio::time::interval(dur),
+ pool: WeakOpt::downgrade(pool_ref),
+ pool_drop_notifier: rx,
+ };
+
+ self.exec.execute(interval);
+ }
+}
+
+impl<T> PoolInner<T> {
+ /// Any `FutureResponse`s that were created will have made a `Checkout`,
+ /// and possibly inserted into the pool that it is waiting for an idle
+ /// connection. If a user ever dropped that future, we need to clean out
+ /// those parked senders.
+ fn clean_waiters(&mut self, key: &Key) {
+ let mut remove_waiters = false;
+ if let Some(waiters) = self.waiters.get_mut(key) {
+ waiters.retain(|tx| !tx.is_canceled());
+ remove_waiters = waiters.is_empty();
+ }
+ if remove_waiters {
+ self.waiters.remove(key);
+ }
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl<T: Poolable> PoolInner<T> {
+ /// This should *only* be called by the IdleTask
+ fn clear_expired(&mut self) {
+ let dur = self.timeout.expect("interval assumes timeout");
+
+ let now = Instant::now();
+ //self.last_idle_check_at = now;
+
+ self.idle.retain(|key, values| {
+ values.retain(|entry| {
+ if !entry.value.is_open() {
+ trace!("idle interval evicting closed for {:?}", key);
+ return false;
+ }
+
+ // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
+ if now.saturating_duration_since(entry.idle_at) > dur {
+ trace!("idle interval evicting expired for {:?}", key);
+ return false;
+ }
+
+ // Otherwise, keep this value...
+ true
+ });
+
+ // returning false evicts this key/val
+ !values.is_empty()
+ });
+ }
+}
+
+impl<T> Clone for Pool<T> {
+ fn clone(&self) -> Pool<T> {
+ Pool {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
+// Note: The bounds `T: Poolable` is needed for the Drop impl.
+pub(super) struct Pooled<T: Poolable> {
+ value: Option<T>,
+ is_reused: bool,
+ key: Key,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+}
+
+impl<T: Poolable> Pooled<T> {
+ pub(super) fn is_reused(&self) -> bool {
+ self.is_reused
+ }
+
+ pub(super) fn is_pool_enabled(&self) -> bool {
+ self.pool.0.is_some()
+ }
+
+ fn as_ref(&self) -> &T {
+ self.value.as_ref().expect("not dropped")
+ }
+
+ fn as_mut(&mut self) -> &mut T {
+ self.value.as_mut().expect("not dropped")
+ }
+}
+
+impl<T: Poolable> Deref for Pooled<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ self.as_ref()
+ }
+}
+
+impl<T: Poolable> DerefMut for Pooled<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ self.as_mut()
+ }
+}
+
+impl<T: Poolable> Drop for Pooled<T> {
+ fn drop(&mut self) {
+ if let Some(value) = self.value.take() {
+ if !value.is_open() {
+ // If we *already* know the connection is done here,
+ // it shouldn't be re-inserted back into the pool.
+ return;
+ }
+
+ if let Some(pool) = self.pool.upgrade() {
+ if let Ok(mut inner) = pool.lock() {
+ inner.put(self.key.clone(), value, &pool);
+ }
+ } else if !value.can_share() {
+ trace!("pool dropped, dropping pooled ({:?})", self.key);
+ }
+ // Ver::Http2 is already in the Pool (or dead), so we wouldn't
+ // have an actual reference to the Pool.
+ }
+ }
+}
+
+impl<T: Poolable> fmt::Debug for Pooled<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Pooled").field("key", &self.key).finish()
+ }
+}
+
+struct Idle<T> {
+ idle_at: Instant,
+ value: T,
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Checkout<T> {
+ key: Key,
+ pool: Pool<T>,
+ waiter: Option<oneshot::Receiver<T>>,
+}
+
+#[derive(Debug)]
+pub(super) struct CheckoutIsClosedError;
+
+impl StdError for CheckoutIsClosedError {}
+
+impl fmt::Display for CheckoutIsClosedError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("checked out connection was closed")
+ }
+}
+
+impl<T: Poolable> Checkout<T> {
+ fn poll_waiter(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<Option<crate::Result<Pooled<T>>>> {
+ if let Some(mut rx) = self.waiter.take() {
+ match Pin::new(&mut rx).poll(cx) {
+ Poll::Ready(Ok(value)) => {
+ if value.is_open() {
+ Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
+ } else {
+ Poll::Ready(Some(Err(
+ crate::Error::new_canceled().with(CheckoutIsClosedError)
+ )))
+ }
+ }
+ Poll::Pending => {
+ self.waiter = Some(rx);
+ Poll::Pending
+ }
+ Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
+ crate::Error::new_canceled().with("request has been canceled")
+ ))),
+ }
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
+ let entry = {
+ let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
+ let expiration = Expiration::new(inner.timeout);
+ let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
+ trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
+ // A block to end the mutable borrow on list,
+ // so the map below can check is_empty()
+ {
+ let popper = IdlePopper {
+ key: &self.key,
+ list,
+ };
+ popper.pop(&expiration)
+ }
+ .map(|e| (e, list.is_empty()))
+ });
+
+ let (entry, empty) = if let Some((e, empty)) = maybe_entry {
+ (Some(e), empty)
+ } else {
+ // No entry found means nuke the list for sure.
+ (None, true)
+ };
+ if empty {
+ //TODO: This could be done with the HashMap::entry API instead.
+ inner.idle.remove(&self.key);
+ }
+
+ if entry.is_none() && self.waiter.is_none() {
+ let (tx, mut rx) = oneshot::channel();
+ trace!("checkout waiting for idle connection: {:?}", self.key);
+ inner
+ .waiters
+ .entry(self.key.clone())
+ .or_insert_with(VecDeque::new)
+ .push_back(tx);
+
+ // register the waker with this oneshot
+ assert!(Pin::new(&mut rx).poll(cx).is_pending());
+ self.waiter = Some(rx);
+ }
+
+ entry
+ };
+
+ entry.map(|e| self.pool.reuse(&self.key, e.value))
+ }
+}
+
+impl<T: Poolable> Future for Checkout<T> {
+ type Output = crate::Result<Pooled<T>>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
+ return Poll::Ready(Ok(pooled));
+ }
+
+ if let Some(pooled) = self.checkout(cx) {
+ Poll::Ready(Ok(pooled))
+ } else if !self.pool.is_enabled() {
+ Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
+ } else {
+ // There's a new waiter, already registered in self.checkout()
+ debug_assert!(self.waiter.is_some());
+ Poll::Pending
+ }
+ }
+}
+
+impl<T> Drop for Checkout<T> {
+ fn drop(&mut self) {
+ if self.waiter.take().is_some() {
+ trace!("checkout dropped for {:?}", self.key);
+ if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
+ inner.clean_waiters(&self.key);
+ }
+ }
+ }
+}
+
+// FIXME: allow() required due to `impl Trait` leaking types to this lint
+#[allow(missing_debug_implementations)]
+pub(super) struct Connecting<T: Poolable> {
+ key: Key,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+}
+
+impl<T: Poolable> Connecting<T> {
+ pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
+ debug_assert!(
+ self.pool.0.is_none(),
+ "Connecting::alpn_h2 but already Http2"
+ );
+
+ pool.connecting(&self.key, Ver::Http2)
+ }
+}
+
+impl<T: Poolable> Drop for Connecting<T> {
+ fn drop(&mut self) {
+ if let Some(pool) = self.pool.upgrade() {
+ // No need to panic on drop, that could abort!
+ if let Ok(mut inner) = pool.lock() {
+ inner.connected(&self.key);
+ }
+ }
+ }
+}
+
+struct Expiration(Option<Duration>);
+
+impl Expiration {
+ fn new(dur: Option<Duration>) -> Expiration {
+ Expiration(dur)
+ }
+
+ fn expires(&self, instant: Instant) -> bool {
+ match self.0 {
+ // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
+ Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
+ None => false,
+ }
+ }
+}
+
+#[cfg(feature = "runtime")]
+pin_project_lite::pin_project! {
+ struct IdleTask<T> {
+ #[pin]
+ interval: Interval,
+ pool: WeakOpt<Mutex<PoolInner<T>>>,
+ // This allows the IdleTask to be notified as soon as the entire
+ // Pool is fully dropped, and shutdown. This channel is never sent on,
+ // but Err(Canceled) will be received when the Pool is dropped.
+ #[pin]
+ pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
+ }
+}
+
+#[cfg(feature = "runtime")]
+impl<T: Poolable + 'static> Future for IdleTask<T> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ match this.pool_drop_notifier.as_mut().poll(cx) {
+ Poll::Ready(Ok(n)) => match n {},
+ Poll::Pending => (),
+ Poll::Ready(Err(_canceled)) => {
+ trace!("pool closed, canceling idle interval");
+ return Poll::Ready(());
+ }
+ }
+
+ ready!(this.interval.as_mut().poll_tick(cx));
+
+ if let Some(inner) = this.pool.upgrade() {
+ if let Ok(mut inner) = inner.lock() {
+ trace!("idle interval checking for expired");
+ inner.clear_expired();
+ continue;
+ }
+ }
+ return Poll::Ready(());
+ }
+ }
+}
+
+impl<T> WeakOpt<T> {
+ fn none() -> Self {
+ WeakOpt(None)
+ }
+
+ fn downgrade(arc: &Arc<T>) -> Self {
+ WeakOpt(Some(Arc::downgrade(arc)))
+ }
+
+ fn upgrade(&self) -> Option<Arc<T>> {
+ self.0.as_ref().and_then(Weak::upgrade)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::task::Poll;
+ use std::time::Duration;
+
+ use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
+ use crate::common::{exec::Exec, task, Future, Pin};
+
+ /// Test unique reservations.
+ #[derive(Debug, PartialEq, Eq)]
+ struct Uniq<T>(T);
+
+ impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
+ fn is_open(&self) -> bool {
+ true
+ }
+
+ fn reserve(self) -> Reservation<Self> {
+ Reservation::Unique(self)
+ }
+
+ fn can_share(&self) -> bool {
+ false
+ }
+ }
+
+ fn c<T: Poolable>(key: Key) -> Connecting<T> {
+ Connecting {
+ key,
+ pool: WeakOpt::none(),
+ }
+ }
+
+ fn host_key(s: &str) -> Key {
+ (http::uri::Scheme::HTTP, s.parse().expect("host key"))
+ }
+
+ fn pool_no_timer<T>() -> Pool<T> {
+ pool_max_idle_no_timer(::std::usize::MAX)
+ }
+
+ fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
+ let pool = Pool::new(
+ super::Config {
+ idle_timeout: Some(Duration::from_millis(100)),
+ max_idle_per_host: max_idle,
+ },
+ &Exec::Default,
+ );
+ pool.no_timer();
+ pool
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_smoke() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ drop(pooled);
+
+ match pool.checkout(key).await {
+ Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
+ Err(_) => panic!("not ready"),
+ };
+ }
+
+ /// Helper to check if the future is ready after polling once.
+ struct PollOnce<'a, F>(&'a mut F);
+
+ impl<F, T, U> Future for PollOnce<'_, F>
+ where
+ F: Future<Output = Result<T, U>> + Unpin,
+ {
+ type Output = Option<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ match Pin::new(&mut self.0).poll(cx) {
+ Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
+ Poll::Ready(Err(_)) => Poll::Ready(Some(())),
+ Poll::Pending => Poll::Ready(None),
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_returns_none_if_expired() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ drop(pooled);
+ tokio::time::sleep(pool.locked().timeout.unwrap()).await;
+ let mut checkout = pool.checkout(key);
+ let poll_once = PollOnce(&mut checkout);
+ let is_not_ready = poll_once.await.is_none();
+ assert!(is_not_ready);
+ }
+
+ #[cfg(feature = "runtime")]
+ #[tokio::test]
+ async fn test_pool_checkout_removes_expired() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(3)
+ );
+ tokio::time::sleep(pool.locked().timeout.unwrap()).await;
+
+ let mut checkout = pool.checkout(key.clone());
+ let poll_once = PollOnce(&mut checkout);
+ // checkout.await should clean out the expired
+ poll_once.await;
+ assert!(pool.locked().idle.get(&key).is_none());
+ }
+
+ #[test]
+ fn test_pool_max_idle_per_host() {
+ let pool = pool_max_idle_no_timer(2);
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ // pooled and dropped 3, max_idle should only allow 2
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(2)
+ );
+ }
+
+ #[cfg(feature = "runtime")]
+ #[tokio::test]
+ async fn test_pool_timer_removes_expired() {
+ let _ = pretty_env_logger::try_init();
+ tokio::time::pause();
+
+ let pool = Pool::new(
+ super::Config {
+ idle_timeout: Some(Duration::from_millis(10)),
+ max_idle_per_host: std::usize::MAX,
+ },
+ &Exec::Default,
+ );
+
+ let key = host_key("foo");
+
+ pool.pooled(c(key.clone()), Uniq(41));
+ pool.pooled(c(key.clone()), Uniq(5));
+ pool.pooled(c(key.clone()), Uniq(99));
+
+ assert_eq!(
+ pool.locked().idle.get(&key).map(|entries| entries.len()),
+ Some(3)
+ );
+
+ // Let the timer tick passed the expiration...
+ tokio::time::advance(Duration::from_millis(30)).await;
+ // Yield so the Interval can reap...
+ tokio::task::yield_now().await;
+
+ assert!(pool.locked().idle.get(&key).is_none());
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_task_unparked() {
+ use futures_util::future::join;
+ use futures_util::FutureExt;
+
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ let pooled = pool.pooled(c(key.clone()), Uniq(41));
+
+ let checkout = join(pool.checkout(key), async {
+ // the checkout future will park first,
+ // and then this lazy future will be polled, which will insert
+ // the pooled back into the pool
+ //
+ // this test makes sure that doing so will unpark the checkout
+ drop(pooled);
+ })
+ .map(|(entry, _)| entry);
+
+ assert_eq!(*checkout.await.unwrap(), Uniq(41));
+ }
+
+ #[tokio::test]
+ async fn test_pool_checkout_drop_cleans_up_waiters() {
+ let pool = pool_no_timer::<Uniq<i32>>();
+ let key = host_key("foo");
+
+ let mut checkout1 = pool.checkout(key.clone());
+ let mut checkout2 = pool.checkout(key.clone());
+
+ let poll_once1 = PollOnce(&mut checkout1);
+ let poll_once2 = PollOnce(&mut checkout2);
+
+ // first poll needed to get into Pool's parked
+ poll_once1.await;
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
+ poll_once2.await;
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
+
+ // on drop, clean up Pool
+ drop(checkout1);
+ assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
+
+ drop(checkout2);
+ assert!(pool.locked().waiters.get(&key).is_none());
+ }
+
+ #[derive(Debug)]
+ struct CanClose {
+ #[allow(unused)]
+ val: i32,
+ closed: bool,
+ }
+
+ impl Poolable for CanClose {
+ fn is_open(&self) -> bool {
+ !self.closed
+ }
+
+ fn reserve(self) -> Reservation<Self> {
+ Reservation::Unique(self)
+ }
+
+ fn can_share(&self) -> bool {
+ false
+ }
+ }
+
+ #[test]
+ fn pooled_drop_if_closed_doesnt_reinsert() {
+ let pool = pool_no_timer();
+ let key = host_key("foo");
+ pool.pooled(
+ c(key.clone()),
+ CanClose {
+ val: 57,
+ closed: true,
+ },
+ );
+
+ assert!(!pool.locked().idle.contains_key(&key));
+ }
+}
diff --git a/third_party/rust/hyper/src/client/service.rs b/third_party/rust/hyper/src/client/service.rs
new file mode 100644
index 0000000000..406f61edc9
--- /dev/null
+++ b/third_party/rust/hyper/src/client/service.rs
@@ -0,0 +1,89 @@
+//! Utilities used to interact with the Tower ecosystem.
+//!
+//! This module provides `Connect` which hook-ins into the Tower ecosystem.
+
+use std::error::Error as StdError;
+use std::future::Future;
+use std::marker::PhantomData;
+
+use tracing::debug;
+
+use super::conn::{Builder, SendRequest};
+use crate::{
+ body::HttpBody,
+ common::{task, Pin, Poll},
+ service::{MakeConnection, Service},
+};
+
+/// Creates a connection via `SendRequest`.
+///
+/// This accepts a `hyper::client::conn::Builder` and provides
+/// a `MakeService` implementation to create connections from some
+/// target `T`.
+#[derive(Debug)]
+pub struct Connect<C, B, T> {
+ inner: C,
+ builder: Builder,
+ _pd: PhantomData<fn(T, B)>,
+}
+
+impl<C, B, T> Connect<C, B, T> {
+ /// Create a new `Connect` with some inner connector `C` and a connection
+ /// builder.
+ pub fn new(inner: C, builder: Builder) -> Self {
+ Self {
+ inner,
+ builder,
+ _pd: PhantomData,
+ }
+ }
+}
+
+impl<C, B, T> Service<T> for Connect<C, B, T>
+where
+ C: MakeConnection<T>,
+ C::Connection: Unpin + Send + 'static,
+ C::Future: Send + 'static,
+ C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
+ B: HttpBody + Unpin + Send + 'static,
+ B::Data: Send + Unpin,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Response = SendRequest<B>;
+ type Error = crate::Error;
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
+
+ fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.inner
+ .poll_ready(cx)
+ .map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
+ }
+
+ fn call(&mut self, req: T) -> Self::Future {
+ let builder = self.builder.clone();
+ let io = self.inner.make_connection(req);
+
+ let fut = async move {
+ match io.await {
+ Ok(io) => match builder.handshake(io).await {
+ Ok((sr, conn)) => {
+ builder.exec.execute(async move {
+ if let Err(e) = conn.await {
+ debug!("connection error: {:?}", e);
+ }
+ });
+ Ok(sr)
+ }
+ Err(e) => Err(e),
+ },
+ Err(e) => {
+ let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
+ Err(err)
+ }
+ }
+ };
+
+ Box::pin(fut)
+ }
+}
diff --git a/third_party/rust/hyper/src/client/tests.rs b/third_party/rust/hyper/src/client/tests.rs
new file mode 100644
index 0000000000..0a281a637d
--- /dev/null
+++ b/third_party/rust/hyper/src/client/tests.rs
@@ -0,0 +1,286 @@
+use std::io;
+
+use futures_util::future;
+use tokio::net::TcpStream;
+
+use super::Client;
+
+#[tokio::test]
+async fn client_connect_uri_argument() {
+ let connector = tower::service_fn(|dst: http::Uri| {
+ assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP));
+ assert_eq!(dst.host(), Some("example.local"));
+ assert_eq!(dst.port(), None);
+ assert_eq!(dst.path(), "/", "path should be removed");
+
+ future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
+ });
+
+ let client = Client::builder().build::<_, crate::Body>(connector);
+ let _ = client
+ .get("http://example.local/and/a/path".parse().unwrap())
+ .await
+ .expect_err("response should fail");
+}
+
+/*
+// FIXME: re-implement tests with `async/await`
+#[test]
+fn retryable_request() {
+ let _ = pretty_env_logger::try_init();
+
+ let mut rt = Runtime::new().expect("new rt");
+ let mut connector = MockConnector::new();
+
+ let sock1 = connector.mock("http://mock.local");
+ let sock2 = connector.mock("http://mock.local");
+
+ let client = Client::builder()
+ .build::<_, crate::Body>(connector);
+
+ client.pool.no_timer();
+
+ {
+
+ let req = Request::builder()
+ .uri("http://mock.local/a")
+ .body(Default::default())
+ .unwrap();
+ let res1 = client.request(req);
+ let srv1 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
+ rt.block_on(res1.join(srv1)).expect("res1");
+ }
+ drop(sock1);
+
+ let req = Request::builder()
+ .uri("http://mock.local/b")
+ .body(Default::default())
+ .unwrap();
+ let res2 = client.request(req)
+ .map(|res| {
+ assert_eq!(res.status().as_u16(), 222);
+ });
+ let srv2 = poll_fn(|| {
+ try_ready!(sock2.read(&mut [0u8; 512]));
+ try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
+
+ rt.block_on(res2.join(srv2)).expect("res2");
+}
+
+#[test]
+fn conn_reset_after_write() {
+ let _ = pretty_env_logger::try_init();
+
+ let mut rt = Runtime::new().expect("new rt");
+ let mut connector = MockConnector::new();
+
+ let sock1 = connector.mock("http://mock.local");
+
+ let client = Client::builder()
+ .build::<_, crate::Body>(connector);
+
+ client.pool.no_timer();
+
+ {
+ let req = Request::builder()
+ .uri("http://mock.local/a")
+ .body(Default::default())
+ .unwrap();
+ let res1 = client.request(req);
+ let srv1 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
+ rt.block_on(res1.join(srv1)).expect("res1");
+ }
+
+ let req = Request::builder()
+ .uri("http://mock.local/a")
+ .body(Default::default())
+ .unwrap();
+ let res2 = client.request(req);
+ let mut sock1 = Some(sock1);
+ let srv2 = poll_fn(|| {
+ // We purposefully keep the socket open until the client
+ // has written the second request, and THEN disconnect.
+ //
+ // Not because we expect servers to be jerks, but to trigger
+ // state where we write on an assumedly good connection, and
+ // only reset the close AFTER we wrote bytes.
+ try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
+ sock1.take();
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
+ let err = rt.block_on(res2.join(srv2)).expect_err("res2");
+ assert!(err.is_incomplete_message(), "{:?}", err);
+}
+
+#[test]
+fn checkout_win_allows_connect_future_to_be_pooled() {
+ let _ = pretty_env_logger::try_init();
+
+ let mut rt = Runtime::new().expect("new rt");
+ let mut connector = MockConnector::new();
+
+
+ let (tx, rx) = oneshot::channel::<()>();
+ let sock1 = connector.mock("http://mock.local");
+ let sock2 = connector.mock_fut("http://mock.local", rx);
+
+ let client = Client::builder()
+ .build::<_, crate::Body>(connector);
+
+ client.pool.no_timer();
+
+ let uri = "http://mock.local/a".parse::<crate::Uri>().expect("uri parse");
+
+ // First request just sets us up to have a connection able to be put
+ // back in the pool. *However*, it doesn't insert immediately. The
+ // body has 1 pending byte, and we will only drain in request 2, once
+ // the connect future has been started.
+ let mut body = {
+ let res1 = client.get(uri.clone())
+ .map(|res| res.into_body().concat2());
+ let srv1 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ // Chunked is used so as to force 2 body reads.
+ try_ready!(sock1.write(b"\
+ HTTP/1.1 200 OK\r\n\
+ transfer-encoding: chunked\r\n\
+ \r\n\
+ 1\r\nx\r\n\
+ 0\r\n\r\n\
+ "));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
+
+ rt.block_on(res1.join(srv1)).expect("res1").0
+ };
+
+
+ // The second request triggers the only mocked connect future, but then
+ // the drained body allows the first socket to go back to the pool,
+ // "winning" the checkout race.
+ {
+ let res2 = client.get(uri.clone());
+ let drain = poll_fn(move || {
+ body.poll()
+ });
+ let srv2 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
+
+ rt.block_on(res2.join(drain).join(srv2)).expect("res2");
+ }
+
+ // "Release" the mocked connect future, and let the runtime spin once so
+ // it's all setup...
+ {
+ let mut tx = Some(tx);
+ let client = &client;
+ let key = client.pool.h1_key("http://mock.local");
+ let mut tick_cnt = 0;
+ let fut = poll_fn(move || {
+ tx.take();
+
+ if client.pool.idle_count(&key) == 0 {
+ tick_cnt += 1;
+ assert!(tick_cnt < 10, "ticked too many times waiting for idle");
+ trace!("no idle yet; tick count: {}", tick_cnt);
+ ::futures::task::current().notify();
+ Ok(Async::NotReady)
+ } else {
+ Ok::<_, ()>(Async::Ready(()))
+ }
+ });
+ rt.block_on(fut).unwrap();
+ }
+
+ // Third request just tests out that the "loser" connection was pooled. If
+ // it isn't, this will panic since the MockConnector doesn't have any more
+ // mocks to give out.
+ {
+ let res3 = client.get(uri);
+ let srv3 = poll_fn(|| {
+ try_ready!(sock2.read(&mut [0u8; 512]));
+ try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv3 poll_fn error: {}", e));
+
+ rt.block_on(res3.join(srv3)).expect("res3");
+ }
+}
+
+#[cfg(feature = "nightly")]
+#[bench]
+fn bench_http1_get_0b(b: &mut test::Bencher) {
+ let _ = pretty_env_logger::try_init();
+
+ let mut rt = Runtime::new().expect("new rt");
+ let mut connector = MockConnector::new();
+
+
+ let client = Client::builder()
+ .build::<_, crate::Body>(connector.clone());
+
+ client.pool.no_timer();
+
+ let uri = Uri::from_static("http://mock.local/a");
+
+ b.iter(move || {
+ let sock1 = connector.mock("http://mock.local");
+ let res1 = client
+ .get(uri.clone())
+ .and_then(|res| {
+ res.into_body().for_each(|_| Ok(()))
+ });
+ let srv1 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
+ rt.block_on(res1.join(srv1)).expect("res1");
+ });
+}
+
+#[cfg(feature = "nightly")]
+#[bench]
+fn bench_http1_get_10b(b: &mut test::Bencher) {
+ let _ = pretty_env_logger::try_init();
+
+ let mut rt = Runtime::new().expect("new rt");
+ let mut connector = MockConnector::new();
+
+
+ let client = Client::builder()
+ .build::<_, crate::Body>(connector.clone());
+
+ client.pool.no_timer();
+
+ let uri = Uri::from_static("http://mock.local/a");
+
+ b.iter(move || {
+ let sock1 = connector.mock("http://mock.local");
+ let res1 = client
+ .get(uri.clone())
+ .and_then(|res| {
+ res.into_body().for_each(|_| Ok(()))
+ });
+ let srv1 = poll_fn(|| {
+ try_ready!(sock1.read(&mut [0u8; 512]));
+ try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n0123456789"));
+ Ok(Async::Ready(()))
+ }).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
+ rt.block_on(res1.join(srv1)).expect("res1");
+ });
+}
+*/