diff options
Diffstat (limited to 'third_party/rust/hyper/src/client/connect/http.rs')
-rw-r--r-- | third_party/rust/hyper/src/client/connect/http.rs | 1007 |
1 files changed, 1007 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/client/connect/http.rs b/third_party/rust/hyper/src/client/connect/http.rs new file mode 100644 index 0000000000..afe7b155eb --- /dev/null +++ b/third_party/rust/hyper/src/client/connect/http.rs @@ -0,0 +1,1007 @@ +use std::error::Error as StdError; +use std::fmt; +use std::future::Future; +use std::io; +use std::marker::PhantomData; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{self, Poll}; +use std::time::Duration; + +use futures_util::future::Either; +use http::uri::{Scheme, Uri}; +use pin_project_lite::pin_project; +use tokio::net::{TcpSocket, TcpStream}; +use tokio::time::Sleep; +use tracing::{debug, trace, warn}; + +use super::dns::{self, resolve, GaiResolver, Resolve}; +use super::{Connected, Connection}; +//#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver; + +/// A connector for the `http` scheme. +/// +/// Performs DNS resolution in a thread pool, and then connects over TCP. +/// +/// # Note +/// +/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes +/// transport information such as the remote socket address used. +#[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] +#[derive(Clone)] +pub struct HttpConnector<R = GaiResolver> { + config: Arc<Config>, + resolver: R, +} + +/// Extra information about the transport when an HttpConnector is used. +/// +/// # Example +/// +/// ``` +/// # async fn doc() -> hyper::Result<()> { +/// use hyper::Uri; +/// use hyper::client::{Client, connect::HttpInfo}; +/// +/// let client = Client::new(); +/// let uri = Uri::from_static("http://example.com"); +/// +/// let res = client.get(uri).await?; +/// res +/// .extensions() +/// .get::<HttpInfo>() +/// .map(|info| { +/// println!("remote addr = {}", info.remote_addr()); +/// }); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Note +/// +/// If a different connector is used besides [`HttpConnector`](HttpConnector), +/// this value will not exist in the extensions. Consult that specific +/// connector to see what "extra" information it might provide to responses. +#[derive(Clone, Debug)] +pub struct HttpInfo { + remote_addr: SocketAddr, + local_addr: SocketAddr, +} + +#[derive(Clone)] +struct Config { + connect_timeout: Option<Duration>, + enforce_http: bool, + happy_eyeballs_timeout: Option<Duration>, + keep_alive_timeout: Option<Duration>, + local_address_ipv4: Option<Ipv4Addr>, + local_address_ipv6: Option<Ipv6Addr>, + nodelay: bool, + reuse_address: bool, + send_buffer_size: Option<usize>, + recv_buffer_size: Option<usize>, +} + +// ===== impl HttpConnector ===== + +impl HttpConnector { + /// Construct a new HttpConnector. + pub fn new() -> HttpConnector { + HttpConnector::new_with_resolver(GaiResolver::new()) + } +} + +/* +#[cfg(feature = "runtime")] +impl HttpConnector<TokioThreadpoolGaiResolver> { + /// Construct a new HttpConnector using the `TokioThreadpoolGaiResolver`. + /// + /// This resolver **requires** the threadpool runtime to be used. + pub fn new_with_tokio_threadpool_resolver() -> Self { + HttpConnector::new_with_resolver(TokioThreadpoolGaiResolver::new()) + } +} +*/ + +impl<R> HttpConnector<R> { + /// Construct a new HttpConnector. + /// + /// Takes a [`Resolver`](crate::client::connect::dns#resolvers-are-services) to handle DNS lookups. + pub fn new_with_resolver(resolver: R) -> HttpConnector<R> { + HttpConnector { + config: Arc::new(Config { + connect_timeout: None, + enforce_http: true, + happy_eyeballs_timeout: Some(Duration::from_millis(300)), + keep_alive_timeout: None, + local_address_ipv4: None, + local_address_ipv6: None, + nodelay: false, + reuse_address: false, + send_buffer_size: None, + recv_buffer_size: None, + }), + resolver, + } + } + + /// Option to enforce all `Uri`s have the `http` scheme. + /// + /// Enabled by default. + #[inline] + pub fn enforce_http(&mut self, is_enforced: bool) { + self.config_mut().enforce_http = is_enforced; + } + + /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. + /// + /// If `None`, the option will not be set. + /// + /// Default is `None`. + #[inline] + pub fn set_keepalive(&mut self, dur: Option<Duration>) { + self.config_mut().keep_alive_timeout = dur; + } + + /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. + /// + /// Default is `false`. + #[inline] + pub fn set_nodelay(&mut self, nodelay: bool) { + self.config_mut().nodelay = nodelay; + } + + /// Sets the value of the SO_SNDBUF option on the socket. + #[inline] + pub fn set_send_buffer_size(&mut self, size: Option<usize>) { + self.config_mut().send_buffer_size = size; + } + + /// Sets the value of the SO_RCVBUF option on the socket. + #[inline] + pub fn set_recv_buffer_size(&mut self, size: Option<usize>) { + self.config_mut().recv_buffer_size = size; + } + + /// Set that all sockets are bound to the configured address before connection. + /// + /// If `None`, the sockets will not be bound. + /// + /// Default is `None`. + #[inline] + pub fn set_local_address(&mut self, addr: Option<IpAddr>) { + let (v4, v6) = match addr { + Some(IpAddr::V4(a)) => (Some(a), None), + Some(IpAddr::V6(a)) => (None, Some(a)), + _ => (None, None), + }; + + let cfg = self.config_mut(); + + cfg.local_address_ipv4 = v4; + cfg.local_address_ipv6 = v6; + } + + /// Set that all sockets are bound to the configured IPv4 or IPv6 address (depending on host's + /// preferences) before connection. + #[inline] + pub fn set_local_addresses(&mut self, addr_ipv4: Ipv4Addr, addr_ipv6: Ipv6Addr) { + let cfg = self.config_mut(); + + cfg.local_address_ipv4 = Some(addr_ipv4); + cfg.local_address_ipv6 = Some(addr_ipv6); + } + + /// Set the connect timeout. + /// + /// If a domain resolves to multiple IP addresses, the timeout will be + /// evenly divided across them. + /// + /// Default is `None`. + #[inline] + pub fn set_connect_timeout(&mut self, dur: Option<Duration>) { + self.config_mut().connect_timeout = dur; + } + + /// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm. + /// + /// If hostname resolves to both IPv4 and IPv6 addresses and connection + /// cannot be established using preferred address family before timeout + /// elapses, then connector will in parallel attempt connection using other + /// address family. + /// + /// If `None`, parallel connection attempts are disabled. + /// + /// Default is 300 milliseconds. + /// + /// [RFC 6555]: https://tools.ietf.org/html/rfc6555 + #[inline] + pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) { + self.config_mut().happy_eyeballs_timeout = dur; + } + + /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. + /// + /// Default is `false`. + #[inline] + pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { + self.config_mut().reuse_address = reuse_address; + self + } + + // private + + fn config_mut(&mut self) -> &mut Config { + // If the are HttpConnector clones, this will clone the inner + // config. So mutating the config won't ever affect previous + // clones. + Arc::make_mut(&mut self.config) + } +} + +static INVALID_NOT_HTTP: &str = "invalid URL, scheme is not http"; +static INVALID_MISSING_SCHEME: &str = "invalid URL, scheme is missing"; +static INVALID_MISSING_HOST: &str = "invalid URL, host is missing"; + +// R: Debug required for now to allow adding it to debug output later... +impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HttpConnector").finish() + } +} + +impl<R> tower_service::Service<Uri> for HttpConnector<R> +where + R: Resolve + Clone + Send + Sync + 'static, + R::Future: Send, +{ + type Response = TcpStream; + type Error = ConnectError; + type Future = HttpConnecting<R>; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.resolver.poll_ready(cx)).map_err(ConnectError::dns)?; + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: Uri) -> Self::Future { + let mut self_ = self.clone(); + HttpConnecting { + fut: Box::pin(async move { self_.call_async(dst).await }), + _marker: PhantomData, + } + } +} + +fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> { + trace!( + "Http::connect; scheme={:?}, host={:?}, port={:?}", + dst.scheme(), + dst.host(), + dst.port(), + ); + + if config.enforce_http { + if dst.scheme() != Some(&Scheme::HTTP) { + return Err(ConnectError { + msg: INVALID_NOT_HTTP.into(), + cause: None, + }); + } + } else if dst.scheme().is_none() { + return Err(ConnectError { + msg: INVALID_MISSING_SCHEME.into(), + cause: None, + }); + } + + let host = match dst.host() { + Some(s) => s, + None => { + return Err(ConnectError { + msg: INVALID_MISSING_HOST.into(), + cause: None, + }) + } + }; + let port = match dst.port() { + Some(port) => port.as_u16(), + None => { + if dst.scheme() == Some(&Scheme::HTTPS) { + 443 + } else { + 80 + } + } + }; + + Ok((host, port)) +} + +impl<R> HttpConnector<R> +where + R: Resolve, +{ + async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> { + let config = &self.config; + + let (host, port) = get_host_port(config, &dst)?; + let host = host.trim_start_matches('[').trim_end_matches(']'); + + // If the host is already an IP addr (v4 or v6), + // skip resolving the dns and start connecting right away. + let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) { + addrs + } else { + let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) + .await + .map_err(ConnectError::dns)?; + let addrs = addrs + .map(|mut addr| { + addr.set_port(port); + addr + }) + .collect(); + dns::SocketAddrs::new(addrs) + }; + + let c = ConnectingTcp::new(addrs, config); + + let sock = c.connect().await?; + + if let Err(e) = sock.set_nodelay(config.nodelay) { + warn!("tcp set_nodelay error: {}", e); + } + + Ok(sock) + } +} + +impl Connection for TcpStream { + fn connected(&self) -> Connected { + let connected = Connected::new(); + if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) { + connected.extra(HttpInfo { remote_addr, local_addr }) + } else { + connected + } + } +} + +impl HttpInfo { + /// Get the remote address of the transport used. + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } + + /// Get the local address of the transport used. + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } +} + +pin_project! { + // Not publicly exported (so missing_docs doesn't trigger). + // + // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly + // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot + // (and thus we can change the type in the future). + #[must_use = "futures do nothing unless polled"] + #[allow(missing_debug_implementations)] + pub struct HttpConnecting<R> { + #[pin] + fut: BoxConnecting, + _marker: PhantomData<R>, + } +} + +type ConnectResult = Result<TcpStream, ConnectError>; +type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>; + +impl<R: Resolve> Future for HttpConnecting<R> { + type Output = ConnectResult; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + self.project().fut.poll(cx) + } +} + +// Not publicly exported (so missing_docs doesn't trigger). +pub struct ConnectError { + msg: Box<str>, + cause: Option<Box<dyn StdError + Send + Sync>>, +} + +impl ConnectError { + fn new<S, E>(msg: S, cause: E) -> ConnectError + where + S: Into<Box<str>>, + E: Into<Box<dyn StdError + Send + Sync>>, + { + ConnectError { + msg: msg.into(), + cause: Some(cause.into()), + } + } + + fn dns<E>(cause: E) -> ConnectError + where + E: Into<Box<dyn StdError + Send + Sync>>, + { + ConnectError::new("dns error", cause) + } + + fn m<S, E>(msg: S) -> impl FnOnce(E) -> ConnectError + where + S: Into<Box<str>>, + E: Into<Box<dyn StdError + Send + Sync>>, + { + move |cause| ConnectError::new(msg, cause) + } +} + +impl fmt::Debug for ConnectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(ref cause) = self.cause { + f.debug_tuple("ConnectError") + .field(&self.msg) + .field(cause) + .finish() + } else { + self.msg.fmt(f) + } + } +} + +impl fmt::Display for ConnectError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.msg)?; + + if let Some(ref cause) = self.cause { + write!(f, ": {}", cause)?; + } + + Ok(()) + } +} + +impl StdError for ConnectError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.cause.as_ref().map(|e| &**e as _) + } +} + +struct ConnectingTcp<'a> { + preferred: ConnectingTcpRemote, + fallback: Option<ConnectingTcpFallback>, + config: &'a Config, +} + +impl<'a> ConnectingTcp<'a> { + fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self { + if let Some(fallback_timeout) = config.happy_eyeballs_timeout { + let (preferred_addrs, fallback_addrs) = remote_addrs + .split_by_preference(config.local_address_ipv4, config.local_address_ipv6); + if fallback_addrs.is_empty() { + return ConnectingTcp { + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), + fallback: None, + config, + }; + } + + ConnectingTcp { + preferred: ConnectingTcpRemote::new(preferred_addrs, config.connect_timeout), + fallback: Some(ConnectingTcpFallback { + delay: tokio::time::sleep(fallback_timeout), + remote: ConnectingTcpRemote::new(fallback_addrs, config.connect_timeout), + }), + config, + } + } else { + ConnectingTcp { + preferred: ConnectingTcpRemote::new(remote_addrs, config.connect_timeout), + fallback: None, + config, + } + } + } +} + +struct ConnectingTcpFallback { + delay: Sleep, + remote: ConnectingTcpRemote, +} + +struct ConnectingTcpRemote { + addrs: dns::SocketAddrs, + connect_timeout: Option<Duration>, +} + +impl ConnectingTcpRemote { + fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self { + let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32)); + + Self { + addrs, + connect_timeout, + } + } +} + +impl ConnectingTcpRemote { + async fn connect(&mut self, config: &Config) -> Result<TcpStream, ConnectError> { + let mut err = None; + for addr in &mut self.addrs { + debug!("connecting to {}", addr); + match connect(&addr, config, self.connect_timeout)?.await { + Ok(tcp) => { + debug!("connected to {}", addr); + return Ok(tcp); + } + Err(e) => { + trace!("connect error for {}: {:?}", addr, e); + err = Some(e); + } + } + } + + match err { + Some(e) => Err(e), + None => Err(ConnectError::new( + "tcp connect error", + std::io::Error::new(std::io::ErrorKind::NotConnected, "Network unreachable"), + )), + } + } +} + +fn bind_local_address( + socket: &socket2::Socket, + dst_addr: &SocketAddr, + local_addr_ipv4: &Option<Ipv4Addr>, + local_addr_ipv6: &Option<Ipv6Addr>, +) -> io::Result<()> { + match (*dst_addr, local_addr_ipv4, local_addr_ipv6) { + (SocketAddr::V4(_), Some(addr), _) => { + socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?; + } + (SocketAddr::V6(_), _, Some(addr)) => { + socket.bind(&SocketAddr::new(addr.clone().into(), 0).into())?; + } + _ => { + if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match *dst_addr { + SocketAddr::V4(_) => ([0, 0, 0, 0], 0).into(), + SocketAddr::V6(_) => ([0, 0, 0, 0, 0, 0, 0, 0], 0).into(), + }; + socket.bind(&any.into())?; + } + } + } + + Ok(()) +} + +fn connect( + addr: &SocketAddr, + config: &Config, + connect_timeout: Option<Duration>, +) -> Result<impl Future<Output = Result<TcpStream, ConnectError>>, ConnectError> { + // TODO(eliza): if Tokio's `TcpSocket` gains support for setting the + // keepalive timeout, it would be nice to use that instead of socket2, + // and avoid the unsafe `into_raw_fd`/`from_raw_fd` dance... + use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type}; + use std::convert::TryInto; + + let domain = Domain::for_address(*addr); + let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) + .map_err(ConnectError::m("tcp open error"))?; + + // When constructing a Tokio `TcpSocket` from a raw fd/socket, the user is + // responsible for ensuring O_NONBLOCK is set. + socket + .set_nonblocking(true) + .map_err(ConnectError::m("tcp set_nonblocking error"))?; + + if let Some(dur) = config.keep_alive_timeout { + let conf = TcpKeepalive::new().with_time(dur); + if let Err(e) = socket.set_tcp_keepalive(&conf) { + warn!("tcp set_keepalive error: {}", e); + } + } + + bind_local_address( + &socket, + addr, + &config.local_address_ipv4, + &config.local_address_ipv6, + ) + .map_err(ConnectError::m("tcp bind local error"))?; + + #[cfg(unix)] + let socket = unsafe { + // Safety: `from_raw_fd` is only safe to call if ownership of the raw + // file descriptor is transferred. Since we call `into_raw_fd` on the + // socket2 socket, it gives up ownership of the fd and will not close + // it, so this is safe. + use std::os::unix::io::{FromRawFd, IntoRawFd}; + TcpSocket::from_raw_fd(socket.into_raw_fd()) + }; + #[cfg(windows)] + let socket = unsafe { + // Safety: `from_raw_socket` is only safe to call if ownership of the raw + // Windows SOCKET is transferred. Since we call `into_raw_socket` on the + // socket2 socket, it gives up ownership of the SOCKET and will not close + // it, so this is safe. + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + TcpSocket::from_raw_socket(socket.into_raw_socket()) + }; + + if config.reuse_address { + if let Err(e) = socket.set_reuseaddr(true) { + warn!("tcp set_reuse_address error: {}", e); + } + } + + if let Some(size) = config.send_buffer_size { + if let Err(e) = socket.set_send_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) { + warn!("tcp set_buffer_size error: {}", e); + } + } + + if let Some(size) = config.recv_buffer_size { + if let Err(e) = socket.set_recv_buffer_size(size.try_into().unwrap_or(std::u32::MAX)) { + warn!("tcp set_recv_buffer_size error: {}", e); + } + } + + let connect = socket.connect(*addr); + Ok(async move { + match connect_timeout { + Some(dur) => match tokio::time::timeout(dur, connect).await { + Ok(Ok(s)) => Ok(s), + Ok(Err(e)) => Err(e), + Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)), + }, + None => connect.await, + } + .map_err(ConnectError::m("tcp connect error")) + }) +} + +impl ConnectingTcp<'_> { + async fn connect(mut self) -> Result<TcpStream, ConnectError> { + match self.fallback { + None => self.preferred.connect(self.config).await, + Some(mut fallback) => { + let preferred_fut = self.preferred.connect(self.config); + futures_util::pin_mut!(preferred_fut); + + let fallback_fut = fallback.remote.connect(self.config); + futures_util::pin_mut!(fallback_fut); + + let fallback_delay = fallback.delay; + futures_util::pin_mut!(fallback_delay); + + let (result, future) = + match futures_util::future::select(preferred_fut, fallback_delay).await { + Either::Left((result, _fallback_delay)) => { + (result, Either::Right(fallback_fut)) + } + Either::Right(((), preferred_fut)) => { + // Delay is done, start polling both the preferred and the fallback + futures_util::future::select(preferred_fut, fallback_fut) + .await + .factor_first() + } + }; + + if result.is_err() { + // Fallback to the remaining future (could be preferred or fallback) + // if we get an error + future.await + } else { + result + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::io; + + use ::http::Uri; + + use super::super::sealed::{Connect, ConnectSvc}; + use super::{Config, ConnectError, HttpConnector}; + + async fn connect<C>( + connector: C, + dst: Uri, + ) -> Result<<C::_Svc as ConnectSvc>::Connection, <C::_Svc as ConnectSvc>::Error> + where + C: Connect, + { + connector.connect(super::super::sealed::Internal, dst).await + } + + #[tokio::test] + async fn test_errors_enforce_http() { + let dst = "https://example.domain/foo/bar?baz".parse().unwrap(); + let connector = HttpConnector::new(); + + let err = connect(connector, dst).await.unwrap_err(); + assert_eq!(&*err.msg, super::INVALID_NOT_HTTP); + } + + #[cfg(any(target_os = "linux", target_os = "macos"))] + fn get_local_ips() -> (Option<std::net::Ipv4Addr>, Option<std::net::Ipv6Addr>) { + use std::net::{IpAddr, TcpListener}; + + let mut ip_v4 = None; + let mut ip_v6 = None; + + let ips = pnet_datalink::interfaces() + .into_iter() + .flat_map(|i| i.ips.into_iter().map(|n| n.ip())); + + for ip in ips { + match ip { + IpAddr::V4(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v4 = Some(ip), + IpAddr::V6(ip) if TcpListener::bind((ip, 0)).is_ok() => ip_v6 = Some(ip), + _ => (), + } + + if ip_v4.is_some() && ip_v6.is_some() { + break; + } + } + + (ip_v4, ip_v6) + } + + #[tokio::test] + async fn test_errors_missing_scheme() { + let dst = "example.domain".parse().unwrap(); + let mut connector = HttpConnector::new(); + connector.enforce_http(false); + + let err = connect(connector, dst).await.unwrap_err(); + assert_eq!(&*err.msg, super::INVALID_MISSING_SCHEME); + } + + // NOTE: pnet crate that we use in this test doesn't compile on Windows + #[cfg(any(target_os = "linux", target_os = "macos"))] + #[tokio::test] + async fn local_address() { + use std::net::{IpAddr, TcpListener}; + let _ = pretty_env_logger::try_init(); + + let (bind_ip_v4, bind_ip_v6) = get_local_ips(); + let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); + let port = server4.local_addr().unwrap().port(); + let server6 = TcpListener::bind(&format!("[::1]:{}", port)).unwrap(); + + let assert_client_ip = |dst: String, server: TcpListener, expected_ip: IpAddr| async move { + let mut connector = HttpConnector::new(); + + match (bind_ip_v4, bind_ip_v6) { + (Some(v4), Some(v6)) => connector.set_local_addresses(v4, v6), + (Some(v4), None) => connector.set_local_address(Some(v4.into())), + (None, Some(v6)) => connector.set_local_address(Some(v6.into())), + _ => unreachable!(), + } + + connect(connector, dst.parse().unwrap()).await.unwrap(); + + let (_, client_addr) = server.accept().unwrap(); + + assert_eq!(client_addr.ip(), expected_ip); + }; + + if let Some(ip) = bind_ip_v4 { + assert_client_ip(format!("http://127.0.0.1:{}", port), server4, ip.into()).await; + } + + if let Some(ip) = bind_ip_v6 { + assert_client_ip(format!("http://[::1]:{}", port), server6, ip.into()).await; + } + } + + #[test] + #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] + fn client_happy_eyeballs() { + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; + use std::time::{Duration, Instant}; + + use super::dns; + use super::ConnectingTcp; + + let _ = pretty_env_logger::try_init(); + let server4 = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server4.local_addr().unwrap(); + let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let local_timeout = Duration::default(); + let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1; + let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1; + let fallback_timeout = std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout) + + Duration::from_millis(250); + + let scenarios = &[ + // Fast primary, without fallback. + (&[local_ipv4_addr()][..], 4, local_timeout, false), + (&[local_ipv6_addr()][..], 6, local_timeout, false), + // Fast primary, with (unused) fallback. + ( + &[local_ipv4_addr(), local_ipv6_addr()][..], + 4, + local_timeout, + false, + ), + ( + &[local_ipv6_addr(), local_ipv4_addr()][..], + 6, + local_timeout, + false, + ), + // Unreachable + fast primary, without fallback. + ( + &[unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, + unreachable_v4_timeout, + false, + ), + ( + &[unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, + unreachable_v6_timeout, + false, + ), + // Unreachable + fast primary, with (unused) fallback. + ( + &[ + unreachable_ipv4_addr(), + local_ipv4_addr(), + local_ipv6_addr(), + ][..], + 4, + unreachable_v4_timeout, + false, + ), + ( + &[ + unreachable_ipv6_addr(), + local_ipv6_addr(), + local_ipv4_addr(), + ][..], + 6, + unreachable_v6_timeout, + true, + ), + // Slow primary, with (used) fallback. + ( + &[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..], + 6, + fallback_timeout, + false, + ), + ( + &[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..], + 4, + fallback_timeout, + true, + ), + // Slow primary, with (used) unreachable + fast fallback. + ( + &[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..], + 6, + fallback_timeout + unreachable_v6_timeout, + false, + ), + ( + &[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..], + 4, + fallback_timeout + unreachable_v4_timeout, + true, + ), + ]; + + // Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network. + // Otherwise, connection to "slow" IPv6 address will error-out immediately. + let ipv6_accessible = measure_connect(slow_ipv6_addr()).0; + + for &(hosts, family, timeout, needs_ipv6_access) in scenarios { + if needs_ipv6_access && !ipv6_accessible { + continue; + } + + let (start, stream) = rt + .block_on(async move { + let addrs = hosts + .iter() + .map(|host| (host.clone(), addr.port()).into()) + .collect(); + let cfg = Config { + local_address_ipv4: None, + local_address_ipv6: None, + connect_timeout: None, + keep_alive_timeout: None, + happy_eyeballs_timeout: Some(fallback_timeout), + nodelay: false, + reuse_address: false, + enforce_http: false, + send_buffer_size: None, + recv_buffer_size: None, + }; + let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg); + let start = Instant::now(); + Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) + }) + .unwrap(); + let res = if stream.peer_addr().unwrap().is_ipv4() { + 4 + } else { + 6 + }; + let duration = start.elapsed(); + + // Allow actual duration to be +/- 150ms off. + let min_duration = if timeout >= Duration::from_millis(150) { + timeout - Duration::from_millis(150) + } else { + Duration::default() + }; + let max_duration = timeout + Duration::from_millis(150); + + assert_eq!(res, family); + assert!(duration >= min_duration); + assert!(duration <= max_duration); + } + + fn local_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 1).into() + } + + fn local_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into() + } + + fn unreachable_ipv4_addr() -> IpAddr { + Ipv4Addr::new(127, 0, 0, 2).into() + } + + fn unreachable_ipv6_addr() -> IpAddr { + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into() + } + + fn slow_ipv4_addr() -> IpAddr { + // RFC 6890 reserved IPv4 address. + Ipv4Addr::new(198, 18, 0, 25).into() + } + + fn slow_ipv6_addr() -> IpAddr { + // RFC 6890 reserved IPv6 address. + Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into() + } + + fn measure_connect(addr: IpAddr) -> (bool, Duration) { + let start = Instant::now(); + let result = + std::net::TcpStream::connect_timeout(&(addr, 80).into(), Duration::from_secs(1)); + + let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut; + let duration = start.elapsed(); + (reachable, duration) + } + } +} |