summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/send_message.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/neqo-http3/src/send_message.rs
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/neqo-http3/src/send_message.rs')
-rw-r--r--third_party/rust/neqo-http3/src/send_message.rs300
1 files changed, 300 insertions, 0 deletions
diff --git a/third_party/rust/neqo-http3/src/send_message.rs b/third_party/rust/neqo-http3/src/send_message.rs
new file mode 100644
index 0000000000..025f1fcf56
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/send_message.rs
@@ -0,0 +1,300 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use crate::hframe::HFrame;
+use crate::qlog;
+use crate::Header;
+use crate::{Error, Res};
+
+use neqo_common::{qdebug, qinfo, qtrace, Encoder};
+use neqo_qpack::encoder::QPackEncoder;
+use neqo_transport::{AppError, Connection};
+use std::cmp::min;
+use std::fmt::Debug;
+
+const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2
+const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header)
+const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3
+const MAX_DATA_HEADER_SIZE_3_LIMIT: usize = MAX_DATA_HEADER_SIZE_3 + 5; // 16383 + 5 (size of the next buffer data frame header)
+const MAX_DATA_HEADER_SIZE_5: usize = (1 << 30) - 1; // Maximal amount of data with DATA frame header size 3
+const MAX_DATA_HEADER_SIZE_5_LIMIT: usize = MAX_DATA_HEADER_SIZE_5 + 9; // 1073741823 + 9 (size of the next buffer data frame header)
+
+pub(crate) trait SendMessageEvents: Debug {
+ fn data_writable(&self, stream_id: u64);
+ fn remove_send_side_event(&self, stream_id: u64);
+ fn stop_sending(&self, stream_id: u64, app_err: AppError);
+}
+
+/*
+ * SendMessage states:
+ * Uninitialized
+ * Initialized : Headers are present but still not encoded. A message body may be present as well.
+ * The client side sends a message body using the send_body() function that directly
+ * writes into a transport stream. The server side sets headers and body when
+ * initializing a send message (TODO: make server use send_body as well)
+ * SendingInitialMessage : sending headers and maybe message body. From here we may switch to
+ * SendingData or Closed (if the app does not want to send data and
+ * has already closed the send stream).
+ * SendingData : We are sending request data until the app closes the stream.
+ * Closed
+ */
+
+#[derive(PartialEq, Debug)]
+enum SendMessageState {
+ Uninitialized,
+ Initialized {
+ headers: Vec<Header>,
+ data: Option<Vec<u8>>,
+ fin: bool,
+ },
+ SendingInitialMessage {
+ buf: Vec<u8>,
+ fin: bool,
+ },
+ SendingData,
+ Closed,
+}
+
+impl SendMessageState {
+ pub fn is_sending_closed(&self) -> bool {
+ match self {
+ Self::Initialized { fin, .. } | Self::SendingInitialMessage { fin, .. } => *fin,
+ Self::SendingData => false,
+ _ => true,
+ }
+ }
+
+ pub fn done(&self) -> bool {
+ matches!(self, Self::Closed)
+ }
+
+ pub fn is_state_sending_data(&self) -> bool {
+ matches!(self, Self::SendingData)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct SendMessage {
+ state: SendMessageState,
+ stream_id: u64,
+ conn_events: Box<dyn SendMessageEvents>,
+}
+
+impl SendMessage {
+ pub fn new(stream_id: u64, conn_events: Box<dyn SendMessageEvents>) -> Self {
+ qinfo!("Create a request stream_id={}", stream_id);
+ Self {
+ state: SendMessageState::Uninitialized,
+ stream_id,
+ conn_events,
+ }
+ }
+
+ pub fn new_with_headers(
+ stream_id: u64,
+ headers: Vec<Header>,
+ conn_events: Box<dyn SendMessageEvents>,
+ ) -> Self {
+ qinfo!("Create a request stream_id={}", stream_id);
+ Self {
+ state: SendMessageState::Initialized {
+ headers,
+ data: None,
+ fin: false,
+ },
+ stream_id,
+ conn_events,
+ }
+ }
+
+ pub fn set_message(&mut self, headers: &[Header], data: Option<&[u8]>) -> Res<()> {
+ if !matches!(self.state, SendMessageState::Uninitialized) {
+ return Err(Error::AlreadyInitialized);
+ }
+
+ self.state = SendMessageState::Initialized {
+ headers: headers.to_vec(),
+ data: data.map(|d| d.to_vec()),
+ fin: true,
+ };
+ Ok(())
+ }
+
+ pub fn send_body(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
+ qtrace!(
+ [self],
+ "send_body: state={:?} len={}",
+ self.state,
+ buf.len()
+ );
+ match self.state {
+ SendMessageState::Uninitialized
+ | SendMessageState::Initialized { .. }
+ | SendMessageState::SendingInitialMessage { .. } => Ok(0),
+ SendMessageState::SendingData => {
+ let available = conn
+ .stream_avail_send_space(self.stream_id)
+ .map_err(|e| Error::map_stream_send_errors(&e))?;
+ if available <= 2 {
+ return Ok(0);
+ }
+ let to_send;
+ if available <= MAX_DATA_HEADER_SIZE_2_LIMIT {
+ // 63 + 3
+ to_send = min(min(buf.len(), available - 2), MAX_DATA_HEADER_SIZE_2);
+ } else if available <= MAX_DATA_HEADER_SIZE_3_LIMIT {
+ // 16383 + 5
+ to_send = min(min(buf.len(), available - 3), MAX_DATA_HEADER_SIZE_3);
+ } else if available <= MAX_DATA_HEADER_SIZE_5 {
+ // 1073741823 + 9
+ to_send = min(min(buf.len(), available - 5), MAX_DATA_HEADER_SIZE_5_LIMIT);
+ } else {
+ to_send = min(buf.len(), available - 9);
+ }
+
+ qinfo!(
+ [self],
+ "send_request_body: available={} to_send={}.",
+ available,
+ to_send
+ );
+
+ let data_frame = HFrame::Data {
+ len: to_send as u64,
+ };
+ let mut enc = Encoder::default();
+ data_frame.encode(&mut enc);
+ let sent_fh = conn
+ .stream_send(self.stream_id, &enc)
+ .map_err(|e| Error::map_stream_send_errors(&e))?;
+ debug_assert_eq!(sent_fh, enc.len());
+
+ let sent = conn
+ .stream_send(self.stream_id, &buf[..to_send])
+ .map_err(|e| Error::map_stream_send_errors(&e))?;
+ qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, to_send);
+ Ok(sent)
+ }
+ SendMessageState::Closed => Err(Error::AlreadyClosed),
+ }
+ }
+
+ pub fn done(&self) -> bool {
+ self.state.done()
+ }
+
+ pub fn stream_writable(&self) {
+ if self.state.is_state_sending_data() {
+ self.conn_events.data_writable(self.stream_id);
+ }
+ }
+
+ /// # Errors
+ /// `ClosedCriticalStream` if the encoder stream is closed.
+ /// `InternalError` if an unexpected error occurred.
+ fn ensure_encoded(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
+ if let SendMessageState::Initialized { headers, data, fin } = &self.state {
+ qdebug!([self], "Encoding headers");
+ let header_block = encoder.encode_header_block(conn, &headers, self.stream_id)?;
+ let hframe = HFrame::Headers {
+ header_block: header_block.to_vec(),
+ };
+ let mut d = Encoder::default();
+ hframe.encode(&mut d);
+ if let Some(buf) = data {
+ qdebug!([self], "Encoding data");
+ let d_frame = HFrame::Data {
+ len: buf.len() as u64,
+ };
+ d_frame.encode(&mut d);
+ d.encode(&buf);
+ }
+
+ self.state = SendMessageState::SendingInitialMessage {
+ buf: d.into(),
+ fin: *fin,
+ };
+ }
+ Ok(())
+ }
+
+ /// # Errors
+ /// `ClosedCriticalStream` if the encoder stream is closed.
+ /// `InternalError` if an unexpected error occurred.
+ /// `InvalidStreamId` if the stream does not exist,
+ /// `AlreadyClosed` if the stream has already been closed.
+ /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output`
+ /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.)
+ pub fn send(&mut self, conn: &mut Connection, encoder: &mut QPackEncoder) -> Res<()> {
+ self.ensure_encoded(conn, encoder)?;
+
+ let label = if ::log::log_enabled!(::log::Level::Debug) {
+ format!("{}", self)
+ } else {
+ String::new()
+ };
+
+ if let SendMessageState::SendingInitialMessage { ref mut buf, fin } = self.state {
+ let sent =
+ Error::map_error(conn.stream_send(self.stream_id, &buf), Error::HttpInternal)?;
+ qlog::h3_data_moved_down(&mut conn.qlog_mut(), self.stream_id, sent);
+
+ qtrace!([label], "{} bytes sent", sent);
+
+ if sent == buf.len() {
+ if fin {
+ Error::map_error(conn.stream_close_send(self.stream_id), Error::HttpInternal)?;
+ self.state = SendMessageState::Closed;
+ qtrace!([label], "done sending request");
+ } else {
+ self.state = SendMessageState::SendingData;
+ self.conn_events.data_writable(self.stream_id);
+ qtrace!([label], "change to state SendingData");
+ }
+ } else {
+ let b = buf.split_off(sent);
+ *buf = b;
+ }
+ }
+ Ok(())
+ }
+
+ // SendMessage owns headers and sends them. It may also own data for the server side.
+ // This method returns if they're still being sent. Request body (if any) is sent by
+ // http client afterwards using `send_request_body` after receiving DataWritable event.
+ pub fn has_data_to_send(&self) -> bool {
+ matches!(self.state, SendMessageState::Initialized {..} | SendMessageState::SendingInitialMessage { .. } )
+ }
+
+ pub fn close(&mut self, conn: &mut Connection) -> Res<()> {
+ match self.state {
+ SendMessageState::SendingInitialMessage { ref mut fin, .. }
+ | SendMessageState::Initialized { ref mut fin, .. } => {
+ *fin = true;
+ }
+ _ => {
+ self.state = SendMessageState::Closed;
+ conn.stream_close_send(self.stream_id)?;
+ }
+ }
+
+ self.conn_events.remove_send_side_event(self.stream_id);
+ Ok(())
+ }
+
+ pub fn stop_sending(&mut self, app_err: AppError) {
+ if !self.state.is_sending_closed() {
+ self.conn_events.remove_send_side_event(self.stream_id);
+ self.conn_events.stop_sending(self.stream_id, app_err);
+ }
+ }
+}
+
+impl ::std::fmt::Display for SendMessage {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "SendMesage {}", self.stream_id)
+ }
+}