use super::*; use std::task::{Context, Waker}; use std::time::Instant; use std::usize; /// Tracks Stream related state /// /// # Reference counting /// /// There can be a number of outstanding handles to a single Stream. These are /// tracked using reference counting. The `ref_count` field represents the /// number of outstanding userspace handles that can reach this stream. /// /// It's important to note that when the stream is placed in an internal queue /// (such as an accept queue), this is **not** tracked by a reference count. /// Thus, `ref_count` can be zero and the stream still has to be kept around. #[derive(Debug)] pub(super) struct Stream { /// The h2 stream identifier pub id: StreamId, /// Current state of the stream pub state: State, /// Set to `true` when the stream is counted against the connection's max /// concurrent streams. pub is_counted: bool, /// Number of outstanding handles pointing to this stream pub ref_count: usize, // ===== Fields related to sending ===== /// Next node in the accept linked list pub next_pending_send: Option, /// Set to true when the stream is pending accept pub is_pending_send: bool, /// Send data flow control pub send_flow: FlowControl, /// Amount of send capacity that has been requested, but not yet allocated. pub requested_send_capacity: WindowSize, /// Amount of data buffered at the prioritization layer. /// TODO: Technically this could be greater than the window size... pub buffered_send_data: usize, /// Task tracking additional send capacity (i.e. window updates). send_task: Option, /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, /// Next node in the linked list of streams waiting for additional /// connection level capacity. pub next_pending_send_capacity: Option, /// True if the stream is waiting for outbound connection capacity pub is_pending_send_capacity: bool, /// Set to true when the send capacity has been incremented pub send_capacity_inc: bool, /// Next node in the open linked list pub next_open: Option, /// Set to true when the stream is pending to be opened pub is_pending_open: bool, /// Set to true when a push is pending for this stream pub is_pending_push: bool, // ===== Fields related to receiving ===== /// Next node in the accept linked list pub next_pending_accept: Option, /// Set to true when the stream is pending accept pub is_pending_accept: bool, /// Receive data flow control pub recv_flow: FlowControl, pub in_flight_recv_data: WindowSize, /// Next node in the linked list of streams waiting to send window updates. pub next_window_update: Option, /// True if the stream is waiting to send a window update pub is_pending_window_update: bool, /// The time when this stream may have been locally reset. pub reset_at: Option, /// Next node in list of reset streams that should expire eventually pub next_reset_expire: Option, /// Frames pending for this stream to read pub pending_recv: buffer::Deque, /// When the RecvStream drop occurs, no data should be received. pub is_recv: bool, /// Task tracking receiving frames pub recv_task: Option, /// The stream's pending push promises pub pending_push_promises: store::Queue, /// Validate content-length headers pub content_length: ContentLength, } /// State related to validating a stream's content-length #[derive(Debug)] pub enum ContentLength { Omitted, Head, Remaining(u64), } #[derive(Debug)] pub(super) struct NextAccept; #[derive(Debug)] pub(super) struct NextSend; #[derive(Debug)] pub(super) struct NextSendCapacity; #[derive(Debug)] pub(super) struct NextWindowUpdate; #[derive(Debug)] pub(super) struct NextOpen; #[derive(Debug)] pub(super) struct NextResetExpire; impl Stream { pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { let mut send_flow = FlowControl::new(); let mut recv_flow = FlowControl::new(); recv_flow .inc_window(init_recv_window) .expect("invalid initial receive window"); recv_flow.assign_capacity(init_recv_window); send_flow .inc_window(init_send_window) .expect("invalid initial send window size"); Stream { id, state: State::default(), ref_count: 0, is_counted: false, // ===== Fields related to sending ===== next_pending_send: None, is_pending_send: false, send_flow, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, pending_send: buffer::Deque::new(), is_pending_send_capacity: false, next_pending_send_capacity: None, send_capacity_inc: false, is_pending_open: false, next_open: None, is_pending_push: false, // ===== Fields related to receiving ===== next_pending_accept: None, is_pending_accept: false, recv_flow, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, reset_at: None, next_reset_expire: None, pending_recv: buffer::Deque::new(), is_recv: true, recv_task: None, pending_push_promises: store::Queue::new(), content_length: ContentLength::Omitted, } } /// Increment the stream's ref count pub fn ref_inc(&mut self) { assert!(self.ref_count < usize::MAX); self.ref_count += 1; } /// Decrements the stream's ref count pub fn ref_dec(&mut self) { assert!(self.ref_count > 0); self.ref_count -= 1; } /// Returns true if stream is currently being held for some time because of /// a local reset. pub fn is_pending_reset_expiration(&self) -> bool { self.reset_at.is_some() } /// Returns true if frames for this stream are ready to be sent over the wire pub fn is_send_ready(&self) -> bool { // Why do we check pending_open? // // We allow users to call send_request() which schedules a stream to be pending_open // if there is no room according to the concurrency limit (max_send_streams), and we // also allow data to be buffered for send with send_data() if there is no capacity for // the stream to send the data, which attempts to place the stream in pending_send. // If the stream is not open, we don't want the stream to be scheduled for // execution (pending_send). Note that if the stream is in pending_open, it will be // pushed to pending_send when there is room for an open stream. // // In pending_push we track whether a PushPromise still needs to be sent // from a different stream before we can start sending frames on this one. // This is different from the "open" check because reserved streams don't count // toward the concurrency limit. // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 !self.is_pending_open && !self.is_pending_push } /// Returns true if the stream is closed pub fn is_closed(&self) -> bool { // The state has fully transitioned to closed. self.state.is_closed() && // Because outbound frames transition the stream state before being // buffered, we have to ensure that all frames have been flushed. self.pending_send.is_empty() && // Sometimes large data frames are sent out in chunks. After a chunk // of the frame is sent, the remainder is pushed back onto the send // queue to be rescheduled. // // Checking for additional buffered data lets us catch this case. self.buffered_send_data == 0 } /// Returns true if the stream is no longer in use pub fn is_released(&self) -> bool { // The stream is closed and fully flushed self.is_closed() && // There are no more outstanding references to the stream self.ref_count == 0 && // The stream is not in any queue !self.is_pending_send && !self.is_pending_send_capacity && !self.is_pending_accept && !self.is_pending_window_update && !self.is_pending_open && self.reset_at.is_none() } /// Returns true when the consumer of the stream has dropped all handles /// (indicating no further interest in the stream) and the stream state is /// not actually closed. /// /// In this case, a reset should be sent. pub fn is_canceled_interest(&self) -> bool { self.ref_count == 0 && !self.state.is_closed() } /// Current available stream send capacity pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { let available = self.send_flow.available().as_size() as usize; let buffered = self.buffered_send_data; available.min(max_buffer_size).saturating_sub(buffered) as WindowSize } pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); debug_assert!(capacity > 0); self.send_flow.assign_capacity(capacity); tracing::trace!( " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", self.send_flow.available(), self.buffered_send_data, self.id, max_buffer_size, prev_capacity, ); if prev_capacity < self.capacity(max_buffer_size) { self.notify_capacity(); } } pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); self.send_flow.send_data(len); // Decrement the stream's buffered data counter debug_assert!(self.buffered_send_data >= len as usize); self.buffered_send_data -= len as usize; self.requested_send_capacity -= len; tracing::trace!( " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", self.send_flow.available(), self.buffered_send_data, self.id, max_buffer_size, prev_capacity, ); if prev_capacity < self.capacity(max_buffer_size) { self.notify_capacity(); } } /// If the capacity was limited because of the max_send_buffer_size, /// then consider waking the send task again... pub fn notify_capacity(&mut self) { self.send_capacity_inc = true; tracing::trace!(" notifying task"); self.notify_send(); } /// Returns `Err` when the decrement cannot be completed due to overflow. pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { match self.content_length { ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { Some(val) => *rem = val, None => return Err(()), }, ContentLength::Head => { if len != 0 { return Err(()); } } _ => {} } Ok(()) } pub fn ensure_content_length_zero(&self) -> Result<(), ()> { match self.content_length { ContentLength::Remaining(0) => Ok(()), ContentLength::Remaining(_) => Err(()), _ => Ok(()), } } pub fn notify_send(&mut self) { if let Some(task) = self.send_task.take() { task.wake(); } } pub fn wait_send(&mut self, cx: &Context) { self.send_task = Some(cx.waker().clone()); } pub fn notify_recv(&mut self) { if let Some(task) = self.recv_task.take() { task.wake(); } } } impl store::Next for NextAccept { fn next(stream: &Stream) -> Option { stream.next_pending_accept } fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_accept = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_pending_accept.take() } fn is_queued(stream: &Stream) -> bool { stream.is_pending_accept } fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_accept = val; } } impl store::Next for NextSend { fn next(stream: &Stream) -> Option { stream.next_pending_send } fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send.take() } fn is_queued(stream: &Stream) -> bool { stream.is_pending_send } fn set_queued(stream: &mut Stream, val: bool) { if val { // ensure that stream is not queued for being opened // if it's being put into queue for sending data debug_assert!(!stream.is_pending_open); } stream.is_pending_send = val; } } impl store::Next for NextSendCapacity { fn next(stream: &Stream) -> Option { stream.next_pending_send_capacity } fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send_capacity = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send_capacity.take() } fn is_queued(stream: &Stream) -> bool { stream.is_pending_send_capacity } fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_send_capacity = val; } } impl store::Next for NextWindowUpdate { fn next(stream: &Stream) -> Option { stream.next_window_update } fn set_next(stream: &mut Stream, key: Option) { stream.next_window_update = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_window_update.take() } fn is_queued(stream: &Stream) -> bool { stream.is_pending_window_update } fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_window_update = val; } } impl store::Next for NextOpen { fn next(stream: &Stream) -> Option { stream.next_open } fn set_next(stream: &mut Stream, key: Option) { stream.next_open = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_open.take() } fn is_queued(stream: &Stream) -> bool { stream.is_pending_open } fn set_queued(stream: &mut Stream, val: bool) { if val { // ensure that stream is not queued for being sent // if it's being put into queue for opening the stream debug_assert!(!stream.is_pending_send); } stream.is_pending_open = val; } } impl store::Next for NextResetExpire { fn next(stream: &Stream) -> Option { stream.next_reset_expire } fn set_next(stream: &mut Stream, key: Option) { stream.next_reset_expire = key; } fn take_next(stream: &mut Stream) -> Option { stream.next_reset_expire.take() } fn is_queued(stream: &Stream) -> bool { stream.reset_at.is_some() } fn set_queued(stream: &mut Stream, val: bool) { if val { stream.reset_at = Some(Instant::now()); } else { stream.reset_at = None; } } } // ===== impl ContentLength ===== impl ContentLength { pub fn is_head(&self) -> bool { matches!(*self, Self::Head) } }