/// HTTP2 Ping usage /// /// hyper uses HTTP2 pings for two purposes: /// /// 1. Adaptive flow control using BDP /// 2. Connection keep-alive /// /// Both cases are optional. /// /// # BDP Algorithm /// /// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: /// 1a. Record current time. /// 1b. Send a BDP ping. /// 2. Increment the number of received bytes. /// 3. When the BDP ping ack is received: /// 3a. Record duration from sent time. /// 3b. Merge RTT with a running average. /// 3c. Calculate bdp as bytes/rtt. /// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. #[cfg(feature = "runtime")] use std::fmt; #[cfg(feature = "runtime")] use std::future::Future; #[cfg(feature = "runtime")] use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{self, Poll}; use std::time::Duration; #[cfg(not(feature = "runtime"))] use std::time::Instant; use h2::{Ping, PingPong}; #[cfg(feature = "runtime")] use tokio::time::{Instant, Sleep}; use tracing::{debug, trace}; type WindowSize = u32; pub(super) fn disabled() -> Recorder { Recorder { shared: None } } pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) { debug_assert!( config.is_enabled(), "ping channel requires bdp or keep-alive config", ); let bdp = config.bdp_initial_window.map(|wnd| Bdp { bdp: wnd, max_bandwidth: 0.0, rtt: 0.0, ping_delay: Duration::from_millis(100), stable_count: 0, }); let (bytes, next_bdp_at) = if bdp.is_some() { (Some(0), Some(Instant::now())) } else { (None, None) }; #[cfg(feature = "runtime")] let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { interval, timeout: config.keep_alive_timeout, while_idle: config.keep_alive_while_idle, timer: Box::pin(tokio::time::sleep(interval)), state: KeepAliveState::Init, }); #[cfg(feature = "runtime")] let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); let shared = Arc::new(Mutex::new(Shared { bytes, #[cfg(feature = "runtime")] last_read_at, #[cfg(feature = "runtime")] is_keep_alive_timed_out: false, ping_pong, ping_sent_at: None, next_bdp_at, })); ( Recorder { shared: Some(shared.clone()), }, Ponger { bdp, #[cfg(feature = "runtime")] keep_alive, shared, }, ) } #[derive(Clone)] pub(super) struct Config { pub(super) bdp_initial_window: Option, /// If no frames are received in this amount of time, a PING frame is sent. #[cfg(feature = "runtime")] pub(super) keep_alive_interval: Option, /// After sending a keepalive PING, the connection will be closed if /// a pong is not received in this amount of time. #[cfg(feature = "runtime")] pub(super) keep_alive_timeout: Duration, /// If true, sends pings even when there are no active streams. #[cfg(feature = "runtime")] pub(super) keep_alive_while_idle: bool, } #[derive(Clone)] pub(crate) struct Recorder { shared: Option>>, } pub(super) struct Ponger { bdp: Option, #[cfg(feature = "runtime")] keep_alive: Option, shared: Arc>, } struct Shared { ping_pong: PingPong, ping_sent_at: Option, // bdp /// If `Some`, bdp is enabled, and this tracks how many bytes have been /// read during the current sample. bytes: Option, /// We delay a variable amount of time between BDP pings. This allows us /// to send less pings as the bandwidth stabilizes. next_bdp_at: Option, // keep-alive /// If `Some`, keep-alive is enabled, and the Instant is how long ago /// the connection read the last frame. #[cfg(feature = "runtime")] last_read_at: Option, #[cfg(feature = "runtime")] is_keep_alive_timed_out: bool, } struct Bdp { /// Current BDP in bytes bdp: u32, /// Largest bandwidth we've seen so far. max_bandwidth: f64, /// Round trip time in seconds rtt: f64, /// Delay the next ping by this amount. /// /// This will change depending on how stable the current bandwidth is. ping_delay: Duration, /// The count of ping round trips where BDP has stayed the same. stable_count: u32, } #[cfg(feature = "runtime")] struct KeepAlive { /// If no frames are received in this amount of time, a PING frame is sent. interval: Duration, /// After sending a keepalive PING, the connection will be closed if /// a pong is not received in this amount of time. timeout: Duration, /// If true, sends pings even when there are no active streams. while_idle: bool, state: KeepAliveState, timer: Pin>, } #[cfg(feature = "runtime")] enum KeepAliveState { Init, Scheduled, PingSent, } pub(super) enum Ponged { SizeUpdate(WindowSize), #[cfg(feature = "runtime")] KeepAliveTimedOut, } #[cfg(feature = "runtime")] #[derive(Debug)] pub(super) struct KeepAliveTimedOut; // ===== impl Config ===== impl Config { pub(super) fn is_enabled(&self) -> bool { #[cfg(feature = "runtime")] { self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() } #[cfg(not(feature = "runtime"))] { self.bdp_initial_window.is_some() } } } // ===== impl Recorder ===== impl Recorder { pub(crate) fn record_data(&self, len: usize) { let shared = if let Some(ref shared) = self.shared { shared } else { return; }; let mut locked = shared.lock().unwrap(); #[cfg(feature = "runtime")] locked.update_last_read_at(); // are we ready to send another bdp ping? // if not, we don't need to record bytes either if let Some(ref next_bdp_at) = locked.next_bdp_at { if Instant::now() < *next_bdp_at { return; } else { locked.next_bdp_at = None; } } if let Some(ref mut bytes) = locked.bytes { *bytes += len; } else { // no need to send bdp ping if bdp is disabled return; } if !locked.is_ping_sent() { locked.send_ping(); } } pub(crate) fn record_non_data(&self) { #[cfg(feature = "runtime")] { let shared = if let Some(ref shared) = self.shared { shared } else { return; }; let mut locked = shared.lock().unwrap(); locked.update_last_read_at(); } } /// If the incoming stream is already closed, convert self into /// a disabled reporter. #[cfg(feature = "client")] pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { if stream.is_end_stream() { disabled() } else { self } } pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { #[cfg(feature = "runtime")] { if let Some(ref shared) = self.shared { let locked = shared.lock().unwrap(); if locked.is_keep_alive_timed_out { return Err(KeepAliveTimedOut.crate_error()); } } } // else Ok(()) } } // ===== impl Ponger ===== impl Ponger { pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll { let now = Instant::now(); let mut locked = self.shared.lock().unwrap(); #[cfg(feature = "runtime")] let is_idle = self.is_idle(); #[cfg(feature = "runtime")] { if let Some(ref mut ka) = self.keep_alive { ka.schedule(is_idle, &locked); ka.maybe_ping(cx, &mut locked); } } if !locked.is_ping_sent() { // XXX: this doesn't register a waker...? return Poll::Pending; } match locked.ping_pong.poll_pong(cx) { Poll::Ready(Ok(_pong)) => { let start = locked .ping_sent_at .expect("pong received implies ping_sent_at"); locked.ping_sent_at = None; let rtt = now - start; trace!("recv pong"); #[cfg(feature = "runtime")] { if let Some(ref mut ka) = self.keep_alive { locked.update_last_read_at(); ka.schedule(is_idle, &locked); } } if let Some(ref mut bdp) = self.bdp { let bytes = locked.bytes.expect("bdp enabled implies bytes"); locked.bytes = Some(0); // reset trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); let update = bdp.calculate(bytes, rtt); locked.next_bdp_at = Some(now + bdp.ping_delay); if let Some(update) = update { return Poll::Ready(Ponged::SizeUpdate(update)) } } } Poll::Ready(Err(e)) => { debug!("pong error: {}", e); } Poll::Pending => { #[cfg(feature = "runtime")] { if let Some(ref mut ka) = self.keep_alive { if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { self.keep_alive = None; locked.is_keep_alive_timed_out = true; return Poll::Ready(Ponged::KeepAliveTimedOut); } } } } } // XXX: this doesn't register a waker...? Poll::Pending } #[cfg(feature = "runtime")] fn is_idle(&self) -> bool { Arc::strong_count(&self.shared) <= 2 } } // ===== impl Shared ===== impl Shared { fn send_ping(&mut self) { match self.ping_pong.send_ping(Ping::opaque()) { Ok(()) => { self.ping_sent_at = Some(Instant::now()); trace!("sent ping"); } Err(err) => { debug!("error sending ping: {}", err); } } } fn is_ping_sent(&self) -> bool { self.ping_sent_at.is_some() } #[cfg(feature = "runtime")] fn update_last_read_at(&mut self) { if self.last_read_at.is_some() { self.last_read_at = Some(Instant::now()); } } #[cfg(feature = "runtime")] fn last_read_at(&self) -> Instant { self.last_read_at.expect("keep_alive expects last_read_at") } } // ===== impl Bdp ===== /// Any higher than this likely will be hitting the TCP flow control. const BDP_LIMIT: usize = 1024 * 1024 * 16; impl Bdp { fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option { // No need to do any math if we're at the limit. if self.bdp as usize == BDP_LIMIT { self.stabilize_delay(); return None; } // average the rtt let rtt = seconds(rtt); if self.rtt == 0.0 { // First sample means rtt is first rtt. self.rtt = rtt; } else { // Weigh this rtt as 1/8 for a moving average. self.rtt += (rtt - self.rtt) * 0.125; } // calculate the current bandwidth let bw = (bytes as f64) / (self.rtt * 1.5); trace!("current bandwidth = {:.1}B/s", bw); if bw < self.max_bandwidth { // not a faster bandwidth, so don't update self.stabilize_delay(); return None; } else { self.max_bandwidth = bw; } // if the current `bytes` sample is at least 2/3 the previous // bdp, increase to double the current sample. if bytes >= self.bdp as usize * 2 / 3 { self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; trace!("BDP increased to {}", self.bdp); self.stable_count = 0; self.ping_delay /= 2; Some(self.bdp) } else { self.stabilize_delay(); None } } fn stabilize_delay(&mut self) { if self.ping_delay < Duration::from_secs(10) { self.stable_count += 1; if self.stable_count >= 2 { self.ping_delay *= 4; self.stable_count = 0; } } } } fn seconds(dur: Duration) -> f64 { const NANOS_PER_SEC: f64 = 1_000_000_000.0; let secs = dur.as_secs() as f64; secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC } // ===== impl KeepAlive ===== #[cfg(feature = "runtime")] impl KeepAlive { fn schedule(&mut self, is_idle: bool, shared: &Shared) { match self.state { KeepAliveState::Init => { if !self.while_idle && is_idle { return; } self.state = KeepAliveState::Scheduled; let interval = shared.last_read_at() + self.interval; self.timer.as_mut().reset(interval); } KeepAliveState::PingSent => { if shared.is_ping_sent() { return; } self.state = KeepAliveState::Scheduled; let interval = shared.last_read_at() + self.interval; self.timer.as_mut().reset(interval); } KeepAliveState::Scheduled => (), } } fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) { match self.state { KeepAliveState::Scheduled => { if Pin::new(&mut self.timer).poll(cx).is_pending() { return; } // check if we've received a frame while we were scheduled if shared.last_read_at() + self.interval > self.timer.deadline() { self.state = KeepAliveState::Init; cx.waker().wake_by_ref(); // schedule us again return; } trace!("keep-alive interval ({:?}) reached", self.interval); shared.send_ping(); self.state = KeepAliveState::PingSent; let timeout = Instant::now() + self.timeout; self.timer.as_mut().reset(timeout); } KeepAliveState::Init | KeepAliveState::PingSent => (), } } fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { match self.state { KeepAliveState::PingSent => { if Pin::new(&mut self.timer).poll(cx).is_pending() { return Ok(()); } trace!("keep-alive timeout ({:?}) reached", self.timeout); Err(KeepAliveTimedOut) } KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()), } } } // ===== impl KeepAliveTimedOut ===== #[cfg(feature = "runtime")] impl KeepAliveTimedOut { pub(super) fn crate_error(self) -> crate::Error { crate::Error::new(crate::error::Kind::Http2).with(self) } } #[cfg(feature = "runtime")] impl fmt::Display for KeepAliveTimedOut { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("keep-alive timed out") } } #[cfg(feature = "runtime")] impl std::error::Error for KeepAliveTimedOut { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { Some(&crate::error::TimedOut) } }