use crate::frame::Reason; use crate::proto::{WindowSize, MAX_WINDOW_SIZE}; use std::fmt; // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead // aggregate them when the changes are significant. Many implementations do // this by keeping a "ratio" of the update version the allowed window size. // // While some may wish to represent this ratio as percentage, using a f32, // we skip having to deal with float math and stick to integers. To do so, // the "ratio" is represented by 2 i32s, split into the numerator and // denominator. For example, a 50% ratio is simply represented as 1/2. // // An example applying this ratio: If a stream has an allowed window size of // 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change // becomes greater than 1/2, or 50 bytes. const UNCLAIMED_NUMERATOR: i32 = 1; const UNCLAIMED_DENOMINATOR: i32 = 2; #[test] fn sanity_unclaimed_ratio() { assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR); assert!(UNCLAIMED_NUMERATOR >= 0); assert!(UNCLAIMED_DENOMINATOR > 0); } #[derive(Copy, Clone, Debug)] pub struct FlowControl { /// Window the peer knows about. /// /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. /// /// For example, say the peer sends a request and uses 32kb of the window. /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust /// its understanding of the capacity of the window, and that would be: /// /// ```notrust /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb /// ``` window_size: Window, /// Window that we know about. /// /// This can go negative if a user declares a smaller target window than /// the peer knows about. available: Window, } impl FlowControl { pub fn new() -> FlowControl { FlowControl { window_size: Window(0), available: Window(0), } } /// Returns the window size as known by the peer pub fn window_size(&self) -> WindowSize { self.window_size.as_size() } /// Returns the window size available to the consumer pub fn available(&self) -> Window { self.available } /// Returns true if there is unavailable window capacity pub fn has_unavailable(&self) -> bool { if self.window_size < 0 { return false; } self.window_size > self.available } pub fn claim_capacity(&mut self, capacity: WindowSize) { self.available -= capacity; } pub fn assign_capacity(&mut self, capacity: WindowSize) { self.available += capacity; } /// If a WINDOW_UPDATE frame should be sent, returns a positive number /// representing the increment to be used. /// /// If there is no available bytes to be reclaimed, or the number of /// available bytes does not reach the threshold, this returns `None`. /// /// This represents pending outbound WINDOW_UPDATE frames. pub fn unclaimed_capacity(&self) -> Option { let available = self.available; if self.window_size >= available { return None; } let unclaimed = available.0 - self.window_size.0; let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; if unclaimed < threshold { None } else { Some(unclaimed as WindowSize) } } /// Increase the window size. /// /// This is called after receiving a WINDOW_UPDATE frame pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); if overflow { return Err(Reason::FLOW_CONTROL_ERROR); } if val > MAX_WINDOW_SIZE as i32 { return Err(Reason::FLOW_CONTROL_ERROR); } tracing::trace!( "inc_window; sz={}; old={}; new={}", sz, self.window_size, val ); self.window_size = Window(val); Ok(()) } /// Decrement the send-side window size. /// /// This is called after receiving a SETTINGS frame with a lower /// INITIAL_WINDOW_SIZE value. pub fn dec_send_window(&mut self, sz: WindowSize) { tracing::trace!( "dec_window; sz={}; window={}, available={}", sz, self.window_size, self.available ); // This should not be able to overflow `window_size` from the bottom. self.window_size -= sz; } /// Decrement the recv-side window size. /// /// This is called after receiving a SETTINGS ACK frame with a lower /// INITIAL_WINDOW_SIZE value. pub fn dec_recv_window(&mut self, sz: WindowSize) { tracing::trace!( "dec_recv_window; sz={}; window={}, available={}", sz, self.window_size, self.available ); // This should not be able to overflow `window_size` from the bottom. self.window_size -= sz; self.available -= sz; } /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. pub fn send_data(&mut self, sz: WindowSize) { tracing::trace!( "send_data; sz={}; window={}; available={}", sz, self.window_size, self.available ); // Ensure that the argument is correct assert!(self.window_size >= sz as usize); // Update values self.window_size -= sz; self.available -= sz; } } /// The current capacity of a flow-controlled Window. /// /// This number can go negative when either side has used a certain amount /// of capacity when the other side advertises a reduction in size. /// /// This type tries to centralize the knowledge of addition and subtraction /// to this capacity, instead of having integer casts throughout the source. #[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] pub struct Window(i32); impl Window { pub fn as_size(&self) -> WindowSize { if self.0 < 0 { 0 } else { self.0 as WindowSize } } pub fn checked_size(&self) -> WindowSize { assert!(self.0 >= 0, "negative Window"); self.0 as WindowSize } } impl PartialEq for Window { fn eq(&self, other: &usize) -> bool { if self.0 < 0 { false } else { (self.0 as usize).eq(other) } } } impl PartialOrd for Window { fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> { if self.0 < 0 { Some(::std::cmp::Ordering::Less) } else { (self.0 as usize).partial_cmp(other) } } } impl ::std::ops::SubAssign for Window { fn sub_assign(&mut self, other: WindowSize) { self.0 -= other as i32; } } impl ::std::ops::Add for Window { type Output = Self; fn add(self, other: WindowSize) -> Self::Output { Window(self.0 + other as i32) } } impl ::std::ops::AddAssign for Window { fn add_assign(&mut self, other: WindowSize) { self.0 += other as i32; } } impl fmt::Display for Window { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.0, f) } } impl From for isize { fn from(w: Window) -> isize { w.0 as isize } }