summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto/streams/prioritize.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/prioritize.rs')
-rw-r--r--third_party/rust/h2/src/proto/streams/prioritize.rs931
1 files changed, 931 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/prioritize.rs b/third_party/rust/h2/src/proto/streams/prioritize.rs
new file mode 100644
index 0000000000..3196049a48
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/prioritize.rs
@@ -0,0 +1,931 @@
+use super::store::Resolve;
+use super::*;
+
+use crate::frame::{Reason, StreamId};
+
+use crate::codec::UserError;
+use crate::codec::UserError::*;
+
+use bytes::buf::{Buf, Take};
+use std::{
+ cmp::{self, Ordering},
+ fmt, io, mem,
+ task::{Context, Poll, Waker},
+};
+
+/// # Warning
+///
+/// Queued streams are ordered by stream ID, as we need to ensure that
+/// lower-numbered streams are sent headers before higher-numbered ones.
+/// This is because "idle" stream IDs – those which have been initiated but
+/// have yet to receive frames – will be implicitly closed on receipt of a
+/// frame on a higher stream ID. If these queues was not ordered by stream
+/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
+/// idle stream is opened first.
+#[derive(Debug)]
+pub(super) struct Prioritize {
+ /// Queue of streams waiting for socket capacity to send a frame.
+ pending_send: store::Queue<stream::NextSend>,
+
+ /// Queue of streams waiting for window capacity to produce data.
+ pending_capacity: store::Queue<stream::NextSendCapacity>,
+
+ /// Streams waiting for capacity due to max concurrency
+ ///
+ /// The `SendRequest` handle is `Clone`. This enables initiating requests
+ /// from many tasks. However, offering this capability while supporting
+ /// backpressure at some level is tricky. If there are many `SendRequest`
+ /// handles and a single stream becomes available, which handle gets
+ /// assigned that stream? Maybe that handle is no longer ready to send a
+ /// request.
+ ///
+ /// The strategy used is to allow each `SendRequest` handle one buffered
+ /// request. A `SendRequest` handle is ready to send a request if it has no
+ /// associated buffered requests. This is the same strategy as `mpsc` in the
+ /// futures library.
+ pending_open: store::Queue<stream::NextOpen>,
+
+ /// Connection level flow control governing sent data
+ flow: FlowControl,
+
+ /// Stream ID of the last stream opened.
+ last_opened_id: StreamId,
+
+ /// What `DATA` frame is currently being sent in the codec.
+ in_flight_data_frame: InFlightData,
+
+ /// The maximum amount of bytes a stream should buffer.
+ max_buffer_size: usize,
+}
+
+#[derive(Debug, Eq, PartialEq)]
+enum InFlightData {
+ /// There is no `DATA` frame in flight.
+ Nothing,
+ /// There is a `DATA` frame in flight belonging to the given stream.
+ DataFrame(store::Key),
+ /// There was a `DATA` frame, but the stream's queue was since cleared.
+ Drop,
+}
+
+pub(crate) struct Prioritized<B> {
+ // The buffer
+ inner: Take<B>,
+
+ end_of_stream: bool,
+
+ // The stream that this is associated with
+ stream: store::Key,
+}
+
+// ===== impl Prioritize =====
+
+impl Prioritize {
+ pub fn new(config: &Config) -> Prioritize {
+ let mut flow = FlowControl::new();
+
+ flow.inc_window(config.remote_init_window_sz)
+ .expect("invalid initial window size");
+
+ // TODO: proper error handling
+ let _res = flow.assign_capacity(config.remote_init_window_sz);
+ debug_assert!(_res.is_ok());
+
+ tracing::trace!("Prioritize::new; flow={:?}", flow);
+
+ Prioritize {
+ pending_send: store::Queue::new(),
+ pending_capacity: store::Queue::new(),
+ pending_open: store::Queue::new(),
+ flow,
+ last_opened_id: StreamId::ZERO,
+ in_flight_data_frame: InFlightData::Nothing,
+ max_buffer_size: config.local_max_buffer_size,
+ }
+ }
+
+ pub(crate) fn max_buffer_size(&self) -> usize {
+ self.max_buffer_size
+ }
+
+ /// Queue a frame to be sent to the remote
+ pub fn queue_frame<B>(
+ &mut self,
+ frame: Frame<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ task: &mut Option<Waker>,
+ ) {
+ let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
+ let _e = span.enter();
+ // Queue the frame in the buffer
+ stream.pending_send.push_back(buffer, frame);
+ self.schedule_send(stream, task);
+ }
+
+ pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
+ // If the stream is waiting to be opened, nothing more to do.
+ if stream.is_send_ready() {
+ tracing::trace!(?stream.id, "schedule_send");
+ // Queue the stream
+ self.pending_send.push(stream);
+
+ // Notify the connection.
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+ }
+
+ pub fn queue_open(&mut self, stream: &mut store::Ptr) {
+ self.pending_open.push(stream);
+ }
+
+ /// Send a data frame
+ pub fn send_data<B>(
+ &mut self,
+ frame: frame::Data<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError>
+ where
+ B: Buf,
+ {
+ let sz = frame.payload().remaining();
+
+ if sz > MAX_WINDOW_SIZE as usize {
+ return Err(UserError::PayloadTooBig);
+ }
+
+ let sz = sz as WindowSize;
+
+ if !stream.state.is_send_streaming() {
+ if stream.state.is_closed() {
+ return Err(InactiveStreamId);
+ } else {
+ return Err(UnexpectedFrameType);
+ }
+ }
+
+ // Update the buffered data counter
+ stream.buffered_send_data += sz as usize;
+
+ let span =
+ tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
+ let _e = span.enter();
+ tracing::trace!(buffered = stream.buffered_send_data);
+
+ // Implicitly request more send capacity if not enough has been
+ // requested yet.
+ if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
+ // Update the target requested capacity
+ stream.requested_send_capacity =
+ cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
+
+ self.try_assign_capacity(stream);
+ }
+
+ if frame.is_end_stream() {
+ stream.state.send_close();
+ self.reserve_capacity(0, stream, counts);
+ }
+
+ tracing::trace!(
+ available = %stream.send_flow.available(),
+ buffered = stream.buffered_send_data,
+ );
+
+ // The `stream.buffered_send_data == 0` check is here so that, if a zero
+ // length data frame is queued to the front (there is no previously
+ // queued data), it gets sent out immediately even if there is no
+ // available send window.
+ //
+ // Sending out zero length data frames can be done to signal
+ // end-of-stream.
+ //
+ if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
+ // The stream currently has capacity to send the data frame, so
+ // queue it up and notify the connection task.
+ self.queue_frame(frame.into(), buffer, stream, task);
+ } else {
+ // The stream has no capacity to send the frame now, save it but
+ // don't notify the connection task. Once additional capacity
+ // becomes available, the frame will be flushed.
+ stream.pending_send.push_back(buffer, frame.into());
+ }
+
+ Ok(())
+ }
+
+ /// Request capacity to send data
+ pub fn reserve_capacity(
+ &mut self,
+ capacity: WindowSize,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ let span = tracing::trace_span!(
+ "reserve_capacity",
+ ?stream.id,
+ requested = capacity,
+ effective = (capacity as usize) + stream.buffered_send_data,
+ curr = stream.requested_send_capacity
+ );
+ let _e = span.enter();
+
+ // Actual capacity is `capacity` + the current amount of buffered data.
+ // If it were less, then we could never send out the buffered data.
+ let capacity = (capacity as usize) + stream.buffered_send_data;
+
+ match capacity.cmp(&(stream.requested_send_capacity as usize)) {
+ Ordering::Equal => {
+ // Nothing to do
+ }
+ Ordering::Less => {
+ // Update the target requested capacity
+ stream.requested_send_capacity = capacity as WindowSize;
+
+ // Currently available capacity assigned to the stream
+ let available = stream.send_flow.available().as_size();
+
+ // If the stream has more assigned capacity than requested, reclaim
+ // some for the connection
+ if available as usize > capacity {
+ let diff = available - capacity as WindowSize;
+
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(diff);
+ debug_assert!(_res.is_ok());
+
+ self.assign_connection_capacity(diff, stream, counts);
+ }
+ }
+ Ordering::Greater => {
+ // If trying to *add* capacity, but the stream send side is closed,
+ // there's nothing to be done.
+ if stream.state.is_send_closed() {
+ return;
+ }
+
+ // Update the target requested capacity
+ stream.requested_send_capacity =
+ cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
+
+ // Try to assign additional capacity to the stream. If none is
+ // currently available, the stream will be queued to receive some
+ // when more becomes available.
+ self.try_assign_capacity(stream);
+ }
+ }
+ }
+
+ pub fn recv_stream_window_update(
+ &mut self,
+ inc: WindowSize,
+ stream: &mut store::Ptr,
+ ) -> Result<(), Reason> {
+ let span = tracing::trace_span!(
+ "recv_stream_window_update",
+ ?stream.id,
+ ?stream.state,
+ inc,
+ flow = ?stream.send_flow
+ );
+ let _e = span.enter();
+
+ if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
+ // We can't send any data, so don't bother doing anything else.
+ return Ok(());
+ }
+
+ // Update the stream level flow control.
+ stream.send_flow.inc_window(inc)?;
+
+ // If the stream is waiting on additional capacity, then this will
+ // assign it (if available on the connection) and notify the producer
+ self.try_assign_capacity(stream);
+
+ Ok(())
+ }
+
+ pub fn recv_connection_window_update(
+ &mut self,
+ inc: WindowSize,
+ store: &mut Store,
+ counts: &mut Counts,
+ ) -> Result<(), Reason> {
+ // Update the connection's window
+ self.flow.inc_window(inc)?;
+
+ self.assign_connection_capacity(inc, store, counts);
+ Ok(())
+ }
+
+ /// Reclaim all capacity assigned to the stream and re-assign it to the
+ /// connection
+ pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
+ let available = stream.send_flow.available().as_size();
+ if available > 0 {
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(available);
+ debug_assert!(_res.is_ok());
+ // Re-assign all capacity to the connection
+ self.assign_connection_capacity(available, stream, counts);
+ }
+ }
+
+ /// Reclaim just reserved capacity, not buffered capacity, and re-assign
+ /// it to the connection
+ pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
+ // only reclaim requested capacity that isn't already buffered
+ if stream.requested_send_capacity as usize > stream.buffered_send_data {
+ let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
+
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(reserved);
+ debug_assert!(_res.is_ok());
+ self.assign_connection_capacity(reserved, stream, counts);
+ }
+ }
+
+ pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
+ let span = tracing::trace_span!("clear_pending_capacity");
+ let _e = span.enter();
+ while let Some(stream) = self.pending_capacity.pop(store) {
+ counts.transition(stream, |_, stream| {
+ tracing::trace!(?stream.id, "clear_pending_capacity");
+ })
+ }
+ }
+
+ pub fn assign_connection_capacity<R>(
+ &mut self,
+ inc: WindowSize,
+ store: &mut R,
+ counts: &mut Counts,
+ ) where
+ R: Resolve,
+ {
+ let span = tracing::trace_span!("assign_connection_capacity", inc);
+ let _e = span.enter();
+
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(inc);
+ debug_assert!(_res.is_ok());
+
+ // Assign newly acquired capacity to streams pending capacity.
+ while self.flow.available() > 0 {
+ let stream = match self.pending_capacity.pop(store) {
+ Some(stream) => stream,
+ None => return,
+ };
+
+ // Streams pending capacity may have been reset before capacity
+ // became available. In that case, the stream won't want any
+ // capacity, and so we shouldn't "transition" on it, but just evict
+ // it and continue the loop.
+ if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
+ continue;
+ }
+
+ counts.transition(stream, |_, stream| {
+ // Try to assign capacity to the stream. This will also re-queue the
+ // stream if there isn't enough connection level capacity to fulfill
+ // the capacity request.
+ self.try_assign_capacity(stream);
+ })
+ }
+ }
+
+ /// Request capacity to send data
+ fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
+ let total_requested = stream.requested_send_capacity;
+
+ // Total requested should never go below actual assigned
+ // (Note: the window size can go lower than assigned)
+ debug_assert!(stream.send_flow.available() <= total_requested as usize);
+
+ // The amount of additional capacity that the stream requests.
+ // Don't assign more than the window has available!
+ let additional = cmp::min(
+ total_requested - stream.send_flow.available().as_size(),
+ // Can't assign more than what is available
+ stream.send_flow.window_size() - stream.send_flow.available().as_size(),
+ );
+ let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
+ let _e = span.enter();
+ tracing::trace!(
+ requested = total_requested,
+ additional,
+ buffered = stream.buffered_send_data,
+ window = stream.send_flow.window_size(),
+ conn = %self.flow.available()
+ );
+
+ if additional == 0 {
+ // Nothing more to do
+ return;
+ }
+
+ // If the stream has requested capacity, then it must be in the
+ // streaming state (more data could be sent) or there is buffered data
+ // waiting to be sent.
+ debug_assert!(
+ stream.state.is_send_streaming() || stream.buffered_send_data > 0,
+ "state={:?}",
+ stream.state
+ );
+
+ // The amount of currently available capacity on the connection
+ let conn_available = self.flow.available().as_size();
+
+ // First check if capacity is immediately available
+ if conn_available > 0 {
+ // The amount of capacity to assign to the stream
+ // TODO: Should prioritization factor into this?
+ let assign = cmp::min(conn_available, additional);
+
+ tracing::trace!(capacity = assign, "assigning");
+
+ // Assign the capacity to the stream
+ stream.assign_capacity(assign, self.max_buffer_size);
+
+ // Claim the capacity from the connection
+ // TODO: proper error handling
+ let _res = self.flow.claim_capacity(assign);
+ debug_assert!(_res.is_ok());
+ }
+
+ tracing::trace!(
+ available = %stream.send_flow.available(),
+ requested = stream.requested_send_capacity,
+ buffered = stream.buffered_send_data,
+ has_unavailable = %stream.send_flow.has_unavailable()
+ );
+
+ if stream.send_flow.available() < stream.requested_send_capacity as usize
+ && stream.send_flow.has_unavailable()
+ {
+ // The stream requires additional capacity and the stream's
+ // window has available capacity, but the connection window
+ // does not.
+ //
+ // In this case, the stream needs to be queued up for when the
+ // connection has more capacity.
+ self.pending_capacity.push(stream);
+ }
+
+ // If data is buffered and the stream is send ready, then
+ // schedule the stream for execution
+ if stream.buffered_send_data > 0 && stream.is_send_ready() {
+ // TODO: This assertion isn't *exactly* correct. There can still be
+ // buffered send data while the stream's pending send queue is
+ // empty. This can happen when a large data frame is in the process
+ // of being **partially** sent. Once the window has been sent, the
+ // data frame will be returned to the prioritization layer to be
+ // re-scheduled.
+ //
+ // That said, it would be nice to figure out how to make this
+ // assertion correctly.
+ //
+ // debug_assert!(!stream.pending_send.is_empty());
+
+ self.pending_send.push(stream);
+ }
+ }
+
+ pub fn poll_complete<T, B>(
+ &mut self,
+ cx: &mut Context,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ // Ensure codec is ready
+ ready!(dst.poll_ready(cx))?;
+
+ // Reclaim any frame that has previously been written
+ self.reclaim_frame(buffer, store, dst);
+
+ // The max frame length
+ let max_frame_len = dst.max_send_frame_size();
+
+ tracing::trace!("poll_complete");
+
+ loop {
+ if let Some(mut stream) = self.pop_pending_open(store, counts) {
+ self.pending_send.push_front(&mut stream);
+ }
+
+ match self.pop_frame(buffer, store, max_frame_len, counts) {
+ Some(frame) => {
+ tracing::trace!(?frame, "writing");
+
+ debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
+ if let Frame::Data(ref frame) = frame {
+ self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
+ }
+ dst.buffer(frame).expect("invalid frame");
+
+ // Ensure the codec is ready to try the loop again.
+ ready!(dst.poll_ready(cx))?;
+
+ // Because, always try to reclaim...
+ self.reclaim_frame(buffer, store, dst);
+ }
+ None => {
+ // Try to flush the codec.
+ ready!(dst.flush(cx))?;
+
+ // This might release a data frame...
+ if !self.reclaim_frame(buffer, store, dst) {
+ return Poll::Ready(Ok(()));
+ }
+
+ // No need to poll ready as poll_complete() does this for
+ // us...
+ }
+ }
+ }
+ }
+
+ /// Tries to reclaim a pending data frame from the codec.
+ ///
+ /// Returns true if a frame was reclaimed.
+ ///
+ /// When a data frame is written to the codec, it may not be written in its
+ /// entirety (large chunks are split up into potentially many data frames).
+ /// In this case, the stream needs to be reprioritized.
+ fn reclaim_frame<T, B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> bool
+ where
+ B: Buf,
+ {
+ let span = tracing::trace_span!("try_reclaim_frame");
+ let _e = span.enter();
+
+ // First check if there are any data chunks to take back
+ if let Some(frame) = dst.take_last_data_frame() {
+ self.reclaim_frame_inner(buffer, store, frame)
+ } else {
+ false
+ }
+ }
+
+ fn reclaim_frame_inner<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ frame: frame::Data<Prioritized<B>>,
+ ) -> bool
+ where
+ B: Buf,
+ {
+ tracing::trace!(
+ ?frame,
+ sz = frame.payload().inner.get_ref().remaining(),
+ "reclaimed"
+ );
+
+ let mut eos = false;
+ let key = frame.payload().stream;
+
+ match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
+ InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
+ InFlightData::Drop => {
+ tracing::trace!("not reclaiming frame for cancelled stream");
+ return false;
+ }
+ InFlightData::DataFrame(k) => {
+ debug_assert_eq!(k, key);
+ }
+ }
+
+ let mut frame = frame.map(|prioritized| {
+ // TODO: Ensure fully written
+ eos = prioritized.end_of_stream;
+ prioritized.inner.into_inner()
+ });
+
+ if frame.payload().has_remaining() {
+ let mut stream = store.resolve(key);
+
+ if eos {
+ frame.set_end_stream(true);
+ }
+
+ self.push_back_frame(frame.into(), buffer, &mut stream);
+
+ return true;
+ }
+
+ false
+ }
+
+ /// Push the frame to the front of the stream's deque, scheduling the
+ /// stream if needed.
+ fn push_back_frame<B>(
+ &mut self,
+ frame: Frame<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ ) {
+ // Push the frame to the front of the stream's deque
+ stream.pending_send.push_front(buffer, frame);
+
+ // If needed, schedule the sender
+ if stream.send_flow.available() > 0 {
+ debug_assert!(!stream.pending_send.is_empty());
+ self.pending_send.push(stream);
+ }
+ }
+
+ pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
+ let span = tracing::trace_span!("clear_queue", ?stream.id);
+ let _e = span.enter();
+
+ // TODO: make this more efficient?
+ while let Some(frame) = stream.pending_send.pop_front(buffer) {
+ tracing::trace!(?frame, "dropping");
+ }
+
+ stream.buffered_send_data = 0;
+ stream.requested_send_capacity = 0;
+ if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
+ if stream.key() == key {
+ // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
+ self.in_flight_data_frame = InFlightData::Drop;
+ }
+ }
+ }
+
+ pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_send.pop(store) {
+ let is_pending_reset = stream.is_pending_reset_expiration();
+ counts.transition_after(stream, is_pending_reset);
+ }
+ }
+
+ pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_open.pop(store) {
+ let is_pending_reset = stream.is_pending_reset_expiration();
+ counts.transition_after(stream, is_pending_reset);
+ }
+ }
+
+ fn pop_frame<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ max_len: usize,
+ counts: &mut Counts,
+ ) -> Option<Frame<Prioritized<B>>>
+ where
+ B: Buf,
+ {
+ let span = tracing::trace_span!("pop_frame");
+ let _e = span.enter();
+
+ loop {
+ match self.pending_send.pop(store) {
+ Some(mut stream) => {
+ let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
+ let _e = span.enter();
+
+ // It's possible that this stream, besides having data to send,
+ // is also queued to send a reset, and thus is already in the queue
+ // to wait for "some time" after a reset.
+ //
+ // To be safe, we just always ask the stream.
+ let is_pending_reset = stream.is_pending_reset_expiration();
+
+ tracing::trace!(is_pending_reset);
+
+ let frame = match stream.pending_send.pop_front(buffer) {
+ Some(Frame::Data(mut frame)) => {
+ // Get the amount of capacity remaining for stream's
+ // window.
+ let stream_capacity = stream.send_flow.available();
+ let sz = frame.payload().remaining();
+
+ tracing::trace!(
+ sz,
+ eos = frame.is_end_stream(),
+ window = %stream_capacity,
+ available = %stream.send_flow.available(),
+ requested = stream.requested_send_capacity,
+ buffered = stream.buffered_send_data,
+ "data frame"
+ );
+
+ // Zero length data frames always have capacity to
+ // be sent.
+ if sz > 0 && stream_capacity == 0 {
+ tracing::trace!("stream capacity is 0");
+
+ // Ensure that the stream is waiting for
+ // connection level capacity
+ //
+ // TODO: uncomment
+ // debug_assert!(stream.is_pending_send_capacity);
+
+ // The stream has no more capacity, this can
+ // happen if the remote reduced the stream
+ // window. In this case, we need to buffer the
+ // frame and wait for a window update...
+ stream.pending_send.push_front(buffer, frame.into());
+
+ continue;
+ }
+
+ // Only send up to the max frame length
+ let len = cmp::min(sz, max_len);
+
+ // Only send up to the stream's window capacity
+ let len =
+ cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
+
+ // There *must* be be enough connection level
+ // capacity at this point.
+ debug_assert!(len <= self.flow.window_size());
+
+ // Check if the stream level window the peer knows is available. In some
+ // scenarios, maybe the window we know is available but the window which
+ // peer knows is not.
+ if len > 0 && len > stream.send_flow.window_size() {
+ stream.pending_send.push_front(buffer, frame.into());
+ continue;
+ }
+
+ tracing::trace!(len, "sending data frame");
+
+ // Update the flow control
+ tracing::trace_span!("updating stream flow").in_scope(|| {
+ stream.send_data(len, self.max_buffer_size);
+
+ // Assign the capacity back to the connection that
+ // was just consumed from the stream in the previous
+ // line.
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(len);
+ debug_assert!(_res.is_ok());
+ });
+
+ let (eos, len) = tracing::trace_span!("updating connection flow")
+ .in_scope(|| {
+ // TODO: proper error handling
+ let _res = self.flow.send_data(len);
+ debug_assert!(_res.is_ok());
+
+ // Wrap the frame's data payload to ensure that the
+ // correct amount of data gets written.
+
+ let eos = frame.is_end_stream();
+ let len = len as usize;
+
+ if frame.payload().remaining() > len {
+ frame.set_end_stream(false);
+ }
+ (eos, len)
+ });
+
+ Frame::Data(frame.map(|buf| Prioritized {
+ inner: buf.take(len),
+ end_of_stream: eos,
+ stream: stream.key(),
+ }))
+ }
+ Some(Frame::PushPromise(pp)) => {
+ let mut pushed =
+ stream.store_mut().find_mut(&pp.promised_id()).unwrap();
+ pushed.is_pending_push = false;
+ // Transition stream from pending_push to pending_open
+ // if possible
+ if !pushed.pending_send.is_empty() {
+ if counts.can_inc_num_send_streams() {
+ counts.inc_num_send_streams(&mut pushed);
+ self.pending_send.push(&mut pushed);
+ } else {
+ self.queue_open(&mut pushed);
+ }
+ }
+ Frame::PushPromise(pp)
+ }
+ Some(frame) => frame.map(|_| {
+ unreachable!(
+ "Frame::map closure will only be called \
+ on DATA frames."
+ )
+ }),
+ None => {
+ if let Some(reason) = stream.state.get_scheduled_reset() {
+ let stream_id = stream.id;
+ stream
+ .state
+ .set_reset(stream_id, reason, Initiator::Library);
+
+ let frame = frame::Reset::new(stream.id, reason);
+ Frame::Reset(frame)
+ } else {
+ // If the stream receives a RESET from the peer, it may have
+ // had data buffered to be sent, but all the frames are cleared
+ // in clear_queue(). Instead of doing O(N) traversal through queue
+ // to remove, lets just ignore the stream here.
+ tracing::trace!("removing dangling stream from pending_send");
+ // Since this should only happen as a consequence of `clear_queue`,
+ // we must be in a closed state of some kind.
+ debug_assert!(stream.state.is_closed());
+ counts.transition_after(stream, is_pending_reset);
+ continue;
+ }
+ }
+ };
+
+ tracing::trace!("pop_frame; frame={:?}", frame);
+
+ if cfg!(debug_assertions) && stream.state.is_idle() {
+ debug_assert!(stream.id > self.last_opened_id);
+ self.last_opened_id = stream.id;
+ }
+
+ if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
+ // TODO: Only requeue the sender IF it is ready to send
+ // the next frame. i.e. don't requeue it if the next
+ // frame is a data frame and the stream does not have
+ // any more capacity.
+ self.pending_send.push(&mut stream);
+ }
+
+ counts.transition_after(stream, is_pending_reset);
+
+ return Some(frame);
+ }
+ None => return None,
+ }
+ }
+ }
+
+ fn pop_pending_open<'s>(
+ &mut self,
+ store: &'s mut Store,
+ counts: &mut Counts,
+ ) -> Option<store::Ptr<'s>> {
+ tracing::trace!("schedule_pending_open");
+ // check for any pending open streams
+ if counts.can_inc_num_send_streams() {
+ if let Some(mut stream) = self.pending_open.pop(store) {
+ tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
+
+ counts.inc_num_send_streams(&mut stream);
+ stream.notify_send();
+ return Some(stream);
+ }
+ }
+
+ None
+ }
+}
+
+// ===== impl Prioritized =====
+
+impl<B> Buf for Prioritized<B>
+where
+ B: Buf,
+{
+ fn remaining(&self) -> usize {
+ self.inner.remaining()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ self.inner.chunk()
+ }
+
+ fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
+ self.inner.chunks_vectored(dst)
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ self.inner.advance(cnt)
+ }
+}
+
+impl<B: Buf> fmt::Debug for Prioritized<B> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Prioritized")
+ .field("remaining", &self.inner.get_ref().remaining())
+ .field("end_of_stream", &self.end_of_stream)
+ .field("stream", &self.stream)
+ .finish()
+ }
+}