summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto/streams/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/stream.rs')
-rw-r--r--third_party/rust/h2/src/proto/streams/stream.rs527
1 files changed, 527 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/stream.rs b/third_party/rust/h2/src/proto/streams/stream.rs
new file mode 100644
index 0000000000..43e313647a
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/stream.rs
@@ -0,0 +1,527 @@
+use super::*;
+
+use std::task::{Context, Waker};
+use std::time::Instant;
+use std::usize;
+
+/// Tracks Stream related state
+///
+/// # Reference counting
+///
+/// There can be a number of outstanding handles to a single Stream. These are
+/// tracked using reference counting. The `ref_count` field represents the
+/// number of outstanding userspace handles that can reach this stream.
+///
+/// It's important to note that when the stream is placed in an internal queue
+/// (such as an accept queue), this is **not** tracked by a reference count.
+/// Thus, `ref_count` can be zero and the stream still has to be kept around.
+#[derive(Debug)]
+pub(super) struct Stream {
+ /// The h2 stream identifier
+ pub id: StreamId,
+
+ /// Current state of the stream
+ pub state: State,
+
+ /// Set to `true` when the stream is counted against the connection's max
+ /// concurrent streams.
+ pub is_counted: bool,
+
+ /// Number of outstanding handles pointing to this stream
+ pub ref_count: usize,
+
+ // ===== Fields related to sending =====
+ /// Next node in the accept linked list
+ pub next_pending_send: Option<store::Key>,
+
+ /// Set to true when the stream is pending accept
+ pub is_pending_send: bool,
+
+ /// Send data flow control
+ pub send_flow: FlowControl,
+
+ /// Amount of send capacity that has been requested, but not yet allocated.
+ pub requested_send_capacity: WindowSize,
+
+ /// Amount of data buffered at the prioritization layer.
+ /// TODO: Technically this could be greater than the window size...
+ pub buffered_send_data: usize,
+
+ /// Task tracking additional send capacity (i.e. window updates).
+ send_task: Option<Waker>,
+
+ /// Frames pending for this stream being sent to the socket
+ pub pending_send: buffer::Deque,
+
+ /// Next node in the linked list of streams waiting for additional
+ /// connection level capacity.
+ pub next_pending_send_capacity: Option<store::Key>,
+
+ /// True if the stream is waiting for outbound connection capacity
+ pub is_pending_send_capacity: bool,
+
+ /// Set to true when the send capacity has been incremented
+ pub send_capacity_inc: bool,
+
+ /// Next node in the open linked list
+ pub next_open: Option<store::Key>,
+
+ /// Set to true when the stream is pending to be opened
+ pub is_pending_open: bool,
+
+ /// Set to true when a push is pending for this stream
+ pub is_pending_push: bool,
+
+ // ===== Fields related to receiving =====
+ /// Next node in the accept linked list
+ pub next_pending_accept: Option<store::Key>,
+
+ /// Set to true when the stream is pending accept
+ pub is_pending_accept: bool,
+
+ /// Receive data flow control
+ pub recv_flow: FlowControl,
+
+ pub in_flight_recv_data: WindowSize,
+
+ /// Next node in the linked list of streams waiting to send window updates.
+ pub next_window_update: Option<store::Key>,
+
+ /// True if the stream is waiting to send a window update
+ pub is_pending_window_update: bool,
+
+ /// The time when this stream may have been locally reset.
+ pub reset_at: Option<Instant>,
+
+ /// Next node in list of reset streams that should expire eventually
+ pub next_reset_expire: Option<store::Key>,
+
+ /// Frames pending for this stream to read
+ pub pending_recv: buffer::Deque,
+
+ /// When the RecvStream drop occurs, no data should be received.
+ pub is_recv: bool,
+
+ /// Task tracking receiving frames
+ pub recv_task: Option<Waker>,
+
+ /// The stream's pending push promises
+ pub pending_push_promises: store::Queue<NextAccept>,
+
+ /// Validate content-length headers
+ pub content_length: ContentLength,
+}
+
+/// State related to validating a stream's content-length
+#[derive(Debug)]
+pub enum ContentLength {
+ Omitted,
+ Head,
+ Remaining(u64),
+}
+
+#[derive(Debug)]
+pub(super) struct NextAccept;
+
+#[derive(Debug)]
+pub(super) struct NextSend;
+
+#[derive(Debug)]
+pub(super) struct NextSendCapacity;
+
+#[derive(Debug)]
+pub(super) struct NextWindowUpdate;
+
+#[derive(Debug)]
+pub(super) struct NextOpen;
+
+#[derive(Debug)]
+pub(super) struct NextResetExpire;
+
+impl Stream {
+ pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
+ let mut send_flow = FlowControl::new();
+ let mut recv_flow = FlowControl::new();
+
+ recv_flow
+ .inc_window(init_recv_window)
+ .expect("invalid initial receive window");
+ // TODO: proper error handling?
+ let _res = recv_flow.assign_capacity(init_recv_window);
+ debug_assert!(_res.is_ok());
+
+ send_flow
+ .inc_window(init_send_window)
+ .expect("invalid initial send window size");
+
+ Stream {
+ id,
+ state: State::default(),
+ ref_count: 0,
+ is_counted: false,
+
+ // ===== Fields related to sending =====
+ next_pending_send: None,
+ is_pending_send: false,
+ send_flow,
+ requested_send_capacity: 0,
+ buffered_send_data: 0,
+ send_task: None,
+ pending_send: buffer::Deque::new(),
+ is_pending_send_capacity: false,
+ next_pending_send_capacity: None,
+ send_capacity_inc: false,
+ is_pending_open: false,
+ next_open: None,
+ is_pending_push: false,
+
+ // ===== Fields related to receiving =====
+ next_pending_accept: None,
+ is_pending_accept: false,
+ recv_flow,
+ in_flight_recv_data: 0,
+ next_window_update: None,
+ is_pending_window_update: false,
+ reset_at: None,
+ next_reset_expire: None,
+ pending_recv: buffer::Deque::new(),
+ is_recv: true,
+ recv_task: None,
+ pending_push_promises: store::Queue::new(),
+ content_length: ContentLength::Omitted,
+ }
+ }
+
+ /// Increment the stream's ref count
+ pub fn ref_inc(&mut self) {
+ assert!(self.ref_count < usize::MAX);
+ self.ref_count += 1;
+ }
+
+ /// Decrements the stream's ref count
+ pub fn ref_dec(&mut self) {
+ assert!(self.ref_count > 0);
+ self.ref_count -= 1;
+ }
+
+ /// Returns true if stream is currently being held for some time because of
+ /// a local reset.
+ pub fn is_pending_reset_expiration(&self) -> bool {
+ self.reset_at.is_some()
+ }
+
+ /// Returns true if frames for this stream are ready to be sent over the wire
+ pub fn is_send_ready(&self) -> bool {
+ // Why do we check pending_open?
+ //
+ // We allow users to call send_request() which schedules a stream to be pending_open
+ // if there is no room according to the concurrency limit (max_send_streams), and we
+ // also allow data to be buffered for send with send_data() if there is no capacity for
+ // the stream to send the data, which attempts to place the stream in pending_send.
+ // If the stream is not open, we don't want the stream to be scheduled for
+ // execution (pending_send). Note that if the stream is in pending_open, it will be
+ // pushed to pending_send when there is room for an open stream.
+ //
+ // In pending_push we track whether a PushPromise still needs to be sent
+ // from a different stream before we can start sending frames on this one.
+ // This is different from the "open" check because reserved streams don't count
+ // toward the concurrency limit.
+ // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
+ !self.is_pending_open && !self.is_pending_push
+ }
+
+ /// Returns true if the stream is closed
+ pub fn is_closed(&self) -> bool {
+ // The state has fully transitioned to closed.
+ self.state.is_closed() &&
+ // Because outbound frames transition the stream state before being
+ // buffered, we have to ensure that all frames have been flushed.
+ self.pending_send.is_empty() &&
+ // Sometimes large data frames are sent out in chunks. After a chunk
+ // of the frame is sent, the remainder is pushed back onto the send
+ // queue to be rescheduled.
+ //
+ // Checking for additional buffered data lets us catch this case.
+ self.buffered_send_data == 0
+ }
+
+ /// Returns true if the stream is no longer in use
+ pub fn is_released(&self) -> bool {
+ // The stream is closed and fully flushed
+ self.is_closed() &&
+ // There are no more outstanding references to the stream
+ self.ref_count == 0 &&
+ // The stream is not in any queue
+ !self.is_pending_send && !self.is_pending_send_capacity &&
+ !self.is_pending_accept && !self.is_pending_window_update &&
+ !self.is_pending_open && self.reset_at.is_none()
+ }
+
+ /// Returns true when the consumer of the stream has dropped all handles
+ /// (indicating no further interest in the stream) and the stream state is
+ /// not actually closed.
+ ///
+ /// In this case, a reset should be sent.
+ pub fn is_canceled_interest(&self) -> bool {
+ self.ref_count == 0 && !self.state.is_closed()
+ }
+
+ /// Current available stream send capacity
+ pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
+ let available = self.send_flow.available().as_size() as usize;
+ let buffered = self.buffered_send_data;
+
+ available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
+ }
+
+ pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
+ let prev_capacity = self.capacity(max_buffer_size);
+ debug_assert!(capacity > 0);
+ // TODO: proper error handling
+ let _res = self.send_flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
+
+ tracing::trace!(
+ " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
+ self.send_flow.available(),
+ self.buffered_send_data,
+ self.id,
+ max_buffer_size,
+ prev_capacity,
+ );
+
+ if prev_capacity < self.capacity(max_buffer_size) {
+ self.notify_capacity();
+ }
+ }
+
+ pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
+ let prev_capacity = self.capacity(max_buffer_size);
+
+ // TODO: proper error handling
+ let _res = self.send_flow.send_data(len);
+ debug_assert!(_res.is_ok());
+
+ // Decrement the stream's buffered data counter
+ debug_assert!(self.buffered_send_data >= len as usize);
+ self.buffered_send_data -= len as usize;
+ self.requested_send_capacity -= len;
+
+ tracing::trace!(
+ " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
+ self.send_flow.available(),
+ self.buffered_send_data,
+ self.id,
+ max_buffer_size,
+ prev_capacity,
+ );
+
+ if prev_capacity < self.capacity(max_buffer_size) {
+ self.notify_capacity();
+ }
+ }
+
+ /// If the capacity was limited because of the max_send_buffer_size,
+ /// then consider waking the send task again...
+ pub fn notify_capacity(&mut self) {
+ self.send_capacity_inc = true;
+ tracing::trace!(" notifying task");
+ self.notify_send();
+ }
+
+ /// Returns `Err` when the decrement cannot be completed due to overflow.
+ pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
+ match self.content_length {
+ ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
+ Some(val) => *rem = val,
+ None => return Err(()),
+ },
+ ContentLength::Head => {
+ if len != 0 {
+ return Err(());
+ }
+ }
+ _ => {}
+ }
+
+ Ok(())
+ }
+
+ pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
+ match self.content_length {
+ ContentLength::Remaining(0) => Ok(()),
+ ContentLength::Remaining(_) => Err(()),
+ _ => Ok(()),
+ }
+ }
+
+ pub fn notify_send(&mut self) {
+ if let Some(task) = self.send_task.take() {
+ task.wake();
+ }
+ }
+
+ pub fn wait_send(&mut self, cx: &Context) {
+ self.send_task = Some(cx.waker().clone());
+ }
+
+ pub fn notify_recv(&mut self) {
+ if let Some(task) = self.recv_task.take() {
+ task.wake();
+ }
+ }
+}
+
+impl store::Next for NextAccept {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_accept
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_accept = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_accept.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_accept
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_accept = val;
+ }
+}
+
+impl store::Next for NextSend {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_send
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_send = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_send.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_send
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ // ensure that stream is not queued for being opened
+ // if it's being put into queue for sending data
+ debug_assert!(!stream.is_pending_open);
+ }
+ stream.is_pending_send = val;
+ }
+}
+
+impl store::Next for NextSendCapacity {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_send_capacity
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_send_capacity = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_send_capacity.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_send_capacity
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_send_capacity = val;
+ }
+}
+
+impl store::Next for NextWindowUpdate {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_window_update
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_window_update = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_window_update.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_window_update
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_window_update = val;
+ }
+}
+
+impl store::Next for NextOpen {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_open
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_open = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_open.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_open
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ // ensure that stream is not queued for being sent
+ // if it's being put into queue for opening the stream
+ debug_assert!(!stream.is_pending_send);
+ }
+ stream.is_pending_open = val;
+ }
+}
+
+impl store::Next for NextResetExpire {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_reset_expire
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_reset_expire = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_reset_expire.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.reset_at.is_some()
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ stream.reset_at = Some(Instant::now());
+ } else {
+ stream.reset_at = None;
+ }
+ }
+}
+
+// ===== impl ContentLength =====
+
+impl ContentLength {
+ pub fn is_head(&self) -> bool {
+ matches!(*self, Self::Head)
+ }
+}