diff options
Diffstat (limited to 'third_party/rust/neqo-http3/src/connection_client.rs')
-rw-r--r-- | third_party/rust/neqo-http3/src/connection_client.rs | 7137 |
1 files changed, 7137 insertions, 0 deletions
diff --git a/third_party/rust/neqo-http3/src/connection_client.rs b/third_party/rust/neqo-http3/src/connection_client.rs new file mode 100644 index 0000000000..709a1e0d6b --- /dev/null +++ b/third_party/rust/neqo-http3/src/connection_client.rs @@ -0,0 +1,7137 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use crate::client_events::{Http3ClientEvent, Http3ClientEvents}; +use crate::connection::{Http3Connection, Http3State, RequestDescription}; +use crate::frames::HFrame; +use crate::push_controller::{PushController, RecvPushEvents}; +use crate::recv_message::{RecvMessage, RecvMessageInfo}; +use crate::request_target::AsRequestTarget; +use crate::settings::HSettings; +use crate::{ + Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler, ReceiveOutput, +}; +use neqo_common::{ + event::Provider as EventProvider, hex, hex_with_len, qdebug, qinfo, qlog::NeqoQlog, qtrace, + Datagram, Decoder, Encoder, Header, MessageType, Role, +}; +use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo}; +use neqo_qpack::Stats as QpackStats; +use neqo_transport::{ + AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator, DatagramTracking, + Output, Stats as TransportStats, StreamId, StreamType, Version, ZeroRttState, +}; +use std::cell::RefCell; +use std::convert::TryFrom; +use std::fmt::Debug; +use std::fmt::Display; +use std::mem; +use std::net::SocketAddr; +use std::rc::Rc; +use std::time::Instant; + +use crate::{Error, Res}; + +// This is used for filtering send_streams and recv_Streams with a stream_ids greater than or equal a given id. +// Only the same type (bidirectional or unidirectionsl) streams are filtered. +fn id_gte<U>(base: StreamId) -> impl FnMut((&StreamId, &U)) -> Option<StreamId> + 'static +where + U: ?Sized, +{ + move |(id, _)| { + if *id >= base && !(id.is_bidi() ^ base.is_bidi()) { + Some(*id) + } else { + None + } + } +} + +fn alpn_from_quic_version(version: Version) -> &'static str { + match version { + Version::Version2 | Version::Version1 => "h3", + Version::Draft29 => "h3-29", + Version::Draft30 => "h3-30", + Version::Draft31 => "h3-31", + Version::Draft32 => "h3-32", + } +} + +/// # The HTTP/3 client API +/// +/// This module implements the HTTP/3 client API. The main implementation of the protocol is in +/// [connection.rs](https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs) which +/// implements common behavior for the client-side and the server-side. `Http3Client` structure +/// implements the public API and set of functions that differ between the client and the server. + +/// The API is used for: +/// - create and close an endpoint: +/// - [`new`](struct.Http3Client.html#method.new) +/// - [`new_with_conn`](struct.Http3Client.html#method.new_with_conn) +/// - [`close`](struct.Http3Client.html#method.close) +/// - configuring an endpoint: +/// - [`authenticated`](struct.Http3Client.html#method.authenticated) +/// - [`enable_ech`](struct.Http3Client.html#method.enable_ech) +/// - [`enable_resumption`](struct.Http3Client.html#method.enable_resumption) +/// - [`initiate_key_update`](struct.Http3Client.html#method.initiate_key_update) +/// - [`set_qlog`](struct.Http3Client.html#method.set_qlog) +/// - retrieving information about a connection: +/// - [`peer_certificate`](struct.Http3Client.html#method.peer_certificate) +/// - [`qpack_decoder_stats`](struct.Http3Client.html#method.qpack_decoder_stats) +/// - [`qpack_encoder_stats`](struct.Http3Client.html#method.qpack_encoder_stats) +/// - [`transport_stats`](struct.Http3Client.html#method.transport_stats) +/// - [`state`](struct.Http3Client.html#method.state) +/// - [`take_resumption_token`](struct.Http3Client.html#method.take_resumption_token) +/// - [`tls_inf`](struct.Http3Client.html#method.tls_info) +/// - driving HTTP/3 session: +/// - [`process_output`](struct.Http3Client.html#method.process_output) +/// - [`process_input`](struct.Http3Client.html#method.process_input) +/// - [`process`](struct.Http3Client.html#method.process) +/// - create requests, send/receive data, and cancel requests: +/// - [`fetch`](struct.Http3Client.html#method.fetch) +/// - [`send_data`](struct.Http3Client.html#method.send_data) +/// - [`read_dara`](struct.Http3Client.html#method.read_data) +/// - [`stream_close_send`](struct.Http3Client.html#method.stream_close_send) +/// - [`cancel_fetch`](struct.Http3Client.html#method.cancel_fetch) +/// - [`stream_reset_send`](struct.Http3Client.html#method.stream_reset_send) +/// - [`stream_stop_sending`](struct.Http3Client.html#method.stream_stop_sending) +/// - [`set_stream_max_data`](struct.Http3Client.html#method.set_stream_max_data) +/// - priority feature: +/// - [`priority_update`](struct.Http3Client.html#method.priority_update) +/// - `WebTransport` feature: +/// - [`webtransport_create_session`](struct.Http3Client.html#method.webtransport_create_session) +/// - [`webtransport_close_session`](struct.Http3Client.html#method.webtransport_close_session) +/// - [`webtransport_create_stream`](struct.Http3Client.html#method.webtransport_create_sstream) +/// - [`webtransport_enabled`](struct.Http3Client.html#method.webtransport_enabled) +/// +/// ## Examples +/// +/// ### Fetching a resource +/// +/// ```ignore +/// let mut client = Http3Client::new(...); +/// +/// // Perform a handshake +/// ... +/// +/// let req = client +/// .fetch( +/// Instant::now(), +/// "GET", +/// &("https", "something.com", "/"), +/// &[Header::new("example1", "value1"), Header::new("example1", "value2")], +/// Priority::default(), +/// ) +/// .unwrap(); +/// +/// client.stream_close_send(req).unwrap(); +/// +/// loop { +/// // exchange packets +/// ... +/// +/// while let Some(event) = client.next_event() { +/// match event { +/// Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin } => { +/// println!("New response headers received for stream {:?} [fin={?}, interim={:?}]: {:?}", +/// stream_id, +/// fin, +/// interim, +/// headers, +/// ); +/// } +/// Http3ClientEvent::DataReadable { stream_id } => { +/// println!("New data available on stream {}", stream_id); +/// let mut buf = [0; 100]; +/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); +/// println!("Read {:?} bytes from stream {:?} [fin={?}]", +/// amount, +/// stream_id, +/// fin, +/// ); +/// } +/// _ => { +/// println!("Unhandled event {:?}", event); +/// } +/// } +/// } +/// } +///``` +/// +/// ### Creating a `WebTransport` session +/// +/// ```ignore +/// let mut client = Http3Client::new(...); +/// +/// // Perform a handshake +/// ... +/// +/// // Create a session +/// let wt_session_id = client +/// .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) +/// .unwrap(); +/// +/// loop { +/// // exchange packets +/// ... +/// +/// while let Some(event) = client.next_event() { +/// match event { +/// Http3ClientEvent::WebTransport(WebTransportEvent::Session{ +/// stream_id, +/// status, +/// .. +/// }) => { +/// println!("The response from the server: WebTransport session ID {:?} status={:?}", +/// stream_id, +/// status, +/// ); +/// } +/// _ => { +/// println!("Unhandled event {:?}", event); +/// } +/// } +/// } +/// } +/// +///``` +/// +/// ### `WebTransport`: create a stream, send and receive data on the stream +/// +/// ```ignore +/// const BUF_CLIENT: &[u8] = &[0; 10]; +/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above. +/// +/// // create a stream +/// let wt_stream_id = client +/// .webtransport_create_stream(wt_session_id, StreamType::BiDi) +/// .unwrap(); +/// +/// // send data +/// let data_sent = client.send_data(wt_stream_id, BUF_CLIENT).unwrap(); +/// assert_eq!(data_sent, BUF_CLIENT.len()); +/// +/// // close stream for sending +/// client.stream_close_send(wt_stream_id).unwrap(); +/// +/// // wait for data from the server +/// loop { +/// // exchange packets +/// ... +/// +/// while let Some(event) = client.next_event() { +/// match event { +/// Http3ClientEvent::DataReadable{ stream_id } => { +/// println!("Data receivedd form the server on WebTransport stream ID {:?}", +/// stream_id, +/// ); +/// let mut buf = [0; 100]; +/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); +/// println!("Read {:?} bytes from stream {:?} [fin={?}]", +/// amount, +/// stream_id, +/// fin, +/// ); +/// } +/// _ => { +/// println!("Unhandled event {:?}", event); +/// } +/// } +/// } +/// } +/// ``` +/// +/// ### `WebTransport`: receive a new stream form the server +/// +/// ```ignore +/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above. +/// +/// // wait for a new stream from the server +/// loop { +/// // exchange packets +/// ... +/// +/// while let Some(event) = client.next_event() { +/// match event { +/// Http3ClientEvent::WebTransport(WebTransportEvent::NewStream { +/// stream_id, +/// session_id, +/// }) => { +/// println!("New stream received on session{:?}, stream id={:?} stream type={:?}", +/// sesson_id.stream_id(), +/// stream_id.stream_id(), +/// stream_id.stream_type() +/// ); +/// } +/// Http3ClientEvent::DataReadable{ stream_id } => { +/// println!("Data receivedd form the server on WebTransport stream ID {:?}", +/// stream_id, +/// ); +/// let mut buf = [0; 100]; +/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); +/// println!("Read {:?} bytes from stream {:?} [fin={:?}]", +/// amount, +/// stream_id, +/// fin, +/// ); +/// } +/// _ => { +/// println!("Unhandled event {:?}", event); +/// } +/// } +/// } +/// } +/// ``` +/// +pub struct Http3Client { + conn: Connection, + base_handler: Http3Connection, + events: Http3ClientEvents, + push_handler: Rc<RefCell<PushController>>, +} + +impl Display for Http3Client { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "Http3 client") + } +} + +impl Http3Client { + /// # Errors + /// Making a `neqo-transport::connection` may produce an error. This can only be a crypto error if + /// the crypto context can't be created or configured. + pub fn new( + server_name: impl Into<String>, + cid_manager: Rc<RefCell<dyn ConnectionIdGenerator>>, + local_addr: SocketAddr, + remote_addr: SocketAddr, + http3_parameters: Http3Parameters, + now: Instant, + ) -> Res<Self> { + Ok(Self::new_with_conn( + Connection::new_client( + server_name, + &[alpn_from_quic_version( + http3_parameters + .get_connection_parameters() + .get_versions() + .initial(), + )], + cid_manager, + local_addr, + remote_addr, + http3_parameters.get_connection_parameters().clone(), + now, + )?, + http3_parameters, + )) + } + + /// This is a similar function to `new`. In this case, `neqo-transport::connection` has been + /// already created. + /// + /// It is recommended to use `new` instead. + #[must_use] + pub fn new_with_conn(c: Connection, http3_parameters: Http3Parameters) -> Self { + let events = Http3ClientEvents::default(); + let webtransport = http3_parameters.get_webtransport(); + let push_streams = http3_parameters.get_max_concurrent_push_streams(); + let mut base_handler = Http3Connection::new(http3_parameters, Role::Client); + if webtransport { + base_handler.set_features_listener(events.clone()); + } + Self { + conn: c, + events: events.clone(), + push_handler: Rc::new(RefCell::new(PushController::new(push_streams, events))), + base_handler, + } + } + + #[must_use] + pub fn role(&self) -> Role { + self.conn.role() + } + + /// The function returns the current state of the connection. + #[must_use] + pub fn state(&self) -> Http3State { + self.base_handler.state() + } + + #[must_use] + pub fn tls_info(&self) -> Option<&SecretAgentInfo> { + self.conn.tls_info() + } + + /// Get the peer's certificate. + #[must_use] + pub fn peer_certificate(&self) -> Option<CertificateInfo> { + self.conn.peer_certificate() + } + + /// This called when peer certificates have been verified. + /// + /// `Http3ClientEvent::AuthenticationNeeded` event is emitted when peer’s certificates are + /// available and need to be verified. When the verification is completed this function is + /// called. To inform HTTP/3 session of the verification results. + pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) { + self.conn.authenticated(status, now); + } + + pub fn set_qlog(&mut self, qlog: NeqoQlog) { + self.conn.set_qlog(qlog); + } + + /// Enable encrypted client hello (ECH). + /// + /// # Errors + /// Fails when the configuration provided is bad. + pub fn enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> { + self.conn.client_enable_ech(ech_config_list)?; + Ok(()) + } + + /// Get the connection id, which is useful for disambiguating connections to + /// the same origin. + #[must_use] + pub fn connection_id(&self) -> &ConnectionId { + self.conn.odcid().expect("Client always has odcid") + } + + fn encode_resumption_token(&self, token: &ResumptionToken) -> Option<ResumptionToken> { + self.base_handler.get_settings().map(|settings| { + let mut enc = Encoder::default(); + settings.encode_frame_contents(&mut enc); + enc.encode(token.as_ref()); + ResumptionToken::new(enc.into(), token.expiration_time()) + }) + } + + /// The correct way to obtain a resumption token is to wait for the + /// `Http3ClientEvent::ResumptionToken` event. To emit the event we are waiting for a + /// resumtion token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN` + /// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a + /// problem for short-lived connections, where the connection is closed before any events are + /// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to + /// arrive. + /// + /// In addition to the token, HTTP/3 settings are encoded into the token before giving it to + /// the application(`encode_resumption_token`). When the resumption token is supplied to a new + /// connection the HTTP/3 setting will be decoded and used until the setting are received from + /// the server. + pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> { + self.conn + .take_resumption_token(now) + .and_then(|t| self.encode_resumption_token(&t)) + } + + /// This may be call if an application has a resumption token. This must be called before connection starts. + /// + /// The resumption token also contains encoded HTTP/3 settings. The settings will be decoded + /// and used until the setting are received from the server. + /// + /// # Errors + /// An error is return if token cannot be decoded or a connection is is a wrong state. + /// # Panics + /// On closing if the base handler can't handle it (debug only). + pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> { + if self.base_handler.state != Http3State::Initializing { + return Err(Error::InvalidState); + } + let mut dec = Decoder::from(token.as_ref()); + let settings_slice = match dec.decode_vvec() { + Some(v) => v, + None => return Err(Error::InvalidResumptionToken), + }; + qtrace!([self], " settings {}", hex_with_len(settings_slice)); + let mut dec_settings = Decoder::from(settings_slice); + let mut settings = HSettings::default(); + Error::map_error( + settings.decode_frame_contents(&mut dec_settings), + Error::InvalidResumptionToken, + )?; + let tok = dec.decode_remainder(); + qtrace!([self], " Transport token {}", hex(tok)); + self.conn.enable_resumption(now, tok)?; + if self.conn.state().closed() { + let state = self.conn.state().clone(); + let res = self + .base_handler + .handle_state_change(&mut self.conn, &state); + debug_assert_eq!(Ok(true), res); + return Err(Error::FatalError); + } + if self.conn.zero_rtt_state() == ZeroRttState::Sending { + self.base_handler + .set_0rtt_settings(&mut self.conn, settings)?; + self.events + .connection_state_change(self.base_handler.state()); + self.push_handler + .borrow_mut() + .maybe_send_max_push_id_frame(&mut self.base_handler); + } + Ok(()) + } + + /// This is call to close a connection. + pub fn close<S>(&mut self, now: Instant, error: AppError, msg: S) + where + S: AsRef<str> + Display, + { + qinfo!([self], "Close the connection error={} msg={}.", error, msg); + if !matches!( + self.base_handler.state, + Http3State::Closing(_) | Http3State::Closed(_) + ) { + self.push_handler.borrow_mut().clear(); + self.conn.close(now, error, msg); + self.base_handler.close(error); + self.events + .connection_state_change(self.base_handler.state()); + } + } + + /// Attempt to force a key update. + /// # Errors + /// If the connection isn't confirmed, or there is an outstanding key update, this + /// returns `Err(Error::TransportError(neqo_transport::Error::KeyUpdateBlocked))`. + pub fn initiate_key_update(&mut self) -> Res<()> { + self.conn.initiate_key_update()?; + Ok(()) + } + + // API: Request/response + + /// The function fetches a resource using `method`, `target` and `headers`. A response body + /// may be added by calling `send_data`. `stream_close_send` must be sent to finish the request + /// even if request data are not sent. + /// # Errors + /// If a new stream cannot be created an error will be return. + /// # Panics + /// `SendMessage` implements `http_stream` so it will not panic. + pub fn fetch<'x, 't: 'x, T>( + &mut self, + now: Instant, + method: &'t str, + target: &'t T, + headers: &'t [Header], + priority: Priority, + ) -> Res<StreamId> + where + T: AsRequestTarget<'x> + ?Sized + Debug, + { + let output = self.base_handler.fetch( + &mut self.conn, + Box::new(self.events.clone()), + Box::new(self.events.clone()), + Some(Rc::clone(&self.push_handler)), + &RequestDescription { + method, + connect_type: None, + target, + headers, + priority, + }, + ); + if let Err(e) = &output { + if e.connection_error() { + self.close(now, e.code(), ""); + } + } + output + } + + /// Send an [`PRIORITY_UPDATE`-frame][1] on next `Http3Client::process_output()` call. + /// Returns if the priority got changed. + /// # Errors + /// `InvalidStreamId` if the stream does not exist + /// + /// [1]: https://datatracker.ietf.org/doc/html/draft-kazuho-httpbis-priority-04#section-5.2 + pub fn priority_update(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> { + self.base_handler.queue_update_priority(stream_id, priority) + } + + /// An application may cancel a stream(request). + /// Both sides, the receiviing and sending side, sending and receiving side, will be closed. + /// # Errors + /// An error will be return if a stream does not exist. + pub fn cancel_fetch(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { + qinfo!([self], "reset_stream {} error={}.", stream_id, error); + self.base_handler + .cancel_fetch(stream_id, error, &mut self.conn) + } + + /// This is call when application is done sending a request. + /// # Errors + /// An error will be return if stream does not exist. + pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> { + qinfo!([self], "Close sending side stream={}.", stream_id); + self.base_handler + .stream_close_send(&mut self.conn, stream_id) + } + + /// # Errors + /// An error will be return if a stream does not exist. + pub fn stream_reset_send(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { + qinfo!([self], "stream_reset_send {} error={}.", stream_id, error); + self.base_handler + .stream_reset_send(&mut self.conn, stream_id, error) + } + + /// # Errors + /// An error will be return if a stream does not exist. + pub fn stream_stop_sending(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { + qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error); + self.base_handler + .stream_stop_sending(&mut self.conn, stream_id, error) + } + + /// This function is used for regular HTTP requests and `WebTransport` streams. + /// In the case of regular HTTP requests, the request body is supplied using this function, and + /// headers are supplied through the `fetch` function. + /// + /// # Errors + /// `InvalidStreamId` if the stream does not exist, + /// `AlreadyClosed` if the stream has already been closed. + /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output` + /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.) + /// `InvalidInput` if an empty buffer has been supplied. + pub fn send_data(&mut self, stream_id: StreamId, buf: &[u8]) -> Res<usize> { + qinfo!( + [self], + "send_data from stream {} sending {} bytes.", + stream_id, + buf.len() + ); + self.base_handler + .send_streams + .get_mut(&stream_id) + .ok_or(Error::InvalidStreamId)? + .send_data(&mut self.conn, buf) + } + + /// Response data are read directly into a buffer supplied as a parameter of this function to avoid copying + /// data. + /// # Errors + /// It returns an error if a stream does not exist or an error happen while reading a stream, e.g. + /// early close, protocol error, etc. + pub fn read_data( + &mut self, + now: Instant, + stream_id: StreamId, + buf: &mut [u8], + ) -> Res<(usize, bool)> { + qinfo!([self], "read_data from stream {}.", stream_id); + let res = self.base_handler.read_data(&mut self.conn, stream_id, buf); + if let Err(e) = &res { + if e.connection_error() { + self.close(now, e.code(), ""); + } + } + res + } + + // API: Push streams + + /// Cancel a push + /// # Errors + /// `InvalidStreamId` if the stream does not exist. + pub fn cancel_push(&mut self, push_id: u64) -> Res<()> { + self.push_handler + .borrow_mut() + .cancel(push_id, &mut self.conn, &mut self.base_handler) + } + + /// Push response data are read directly into a buffer supplied as a parameter of this function + /// to avoid copying data. + /// # Errors + /// It returns an error if a stream does not exist(`InvalidStreamId`) or an error has happened while + /// reading a stream, e.g. early close, protocol error, etc. + pub fn push_read_data( + &mut self, + now: Instant, + push_id: u64, + buf: &mut [u8], + ) -> Res<(usize, bool)> { + let stream_id = self + .push_handler + .borrow_mut() + .get_active_stream_id(push_id) + .ok_or(Error::InvalidStreamId)?; + self.conn.stream_keep_alive(stream_id, true)?; + self.read_data(now, stream_id, buf) + } + + // API WebTransport + + /// # Errors + /// If `WebTransport` cannot be created, e.g. the `WebTransport` support is + /// not negotiated or the HTTP/3 connection is closed. + pub fn webtransport_create_session<'x, 't: 'x, T>( + &mut self, + now: Instant, + target: &'t T, + headers: &'t [Header], + ) -> Res<StreamId> + where + T: AsRequestTarget<'x> + ?Sized + Debug, + { + let output = self.base_handler.webtransport_create_session( + &mut self.conn, + Box::new(self.events.clone()), + target, + headers, + ); + + if let Err(e) = &output { + if e.connection_error() { + self.close(now, e.code(), ""); + } + } + output + } + + /// Close `WebTransport` cleanly + /// # Errors + /// `InvalidStreamId` if the stream does not exist, + /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output` + /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.) + /// `InvalidInput` if an empty buffer has been supplied. + pub fn webtransport_close_session( + &mut self, + session_id: StreamId, + error: u32, + message: &str, + ) -> Res<()> { + self.base_handler + .webtransport_close_session(&mut self.conn, session_id, error, message) + } + + /// # Errors + /// This may return an error if the particular session does not exist + /// or the connection is not in the active state. + pub fn webtransport_create_stream( + &mut self, + session_id: StreamId, + stream_type: StreamType, + ) -> Res<StreamId> { + self.base_handler.webtransport_create_stream_local( + &mut self.conn, + session_id, + stream_type, + Box::new(self.events.clone()), + Box::new(self.events.clone()), + ) + } + + /// Send `WebTransport` datagram. + /// # Errors + /// It may return `InvalidStreamId` if a stream does not exist anymore. + /// The function returns `TooMuchData` if the supply buffer is bigger than + /// the allowed remote datagram size. + pub fn webtransport_send_datagram( + &mut self, + session_id: StreamId, + buf: &[u8], + id: impl Into<DatagramTracking>, + ) -> Res<()> { + qtrace!("webtransport_send_datagram session:{:?}", session_id); + self.base_handler + .webtransport_send_datagram(session_id, &mut self.conn, buf, id) + } + + /// Returns the current max size of a datagram that can fit into a packet. + /// The value will change over time depending on the encoded size of the + /// packet number, ack frames, etc. + /// # Errors + /// The function returns `NotAvailable` if datagrams are not enabled. + /// # Panics + /// This cannot panic. The max varint length is 8. + pub fn webtransport_max_datagram_size(&self, session_id: StreamId) -> Res<u64> { + Ok(self.conn.max_datagram_size()? + - u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap()) + } + + /// This function combines `process_input` and `process_output` function. + pub fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output { + qtrace!([self], "Process."); + if let Some(d) = dgram { + self.process_input(d, now); + } + self.process_output(now) + } + + /// The function should be called when there is a new UDP packet available. The function will + /// handle the packet payload. + /// + /// First, the payload will be handled by the QUIC layer. Afterward, `process_http3` will be + /// called to handle new [`ConnectionEvent`][1]s. + /// + /// After this function is called `process_output` should be called to check whether new + /// packets need to be sent or if a timer needs to be updated. + /// + /// [1]: ../neqo_transport/enum.ConnectionEvent.html + pub fn process_input(&mut self, dgram: Datagram, now: Instant) { + qtrace!([self], "Process input."); + self.conn.process_input(dgram, now); + self.process_http3(now); + } + + /// This should not be used because it gives access to functionalities that may disrupt the + /// proper functioning of the HTTP/3 session. + /// Only used by `neqo-interop`. + pub fn conn(&mut self) -> &mut Connection { + &mut self.conn + } + + /// Process HTTP3 layer. + /// When `process_output`, `process_input`, or `process` is called we must call this function + /// as well. The functions calls `Http3Client::check_connection_events` to handle events from + /// the QUC layer and calls `Http3Connection::process_sending` to ensure that HTTP/3 layer + /// data, e.g. control frames, are sent. + fn process_http3(&mut self, now: Instant) { + qtrace!([self], "Process http3 internal."); + match self.base_handler.state() { + Http3State::ZeroRtt | Http3State::Connected | Http3State::GoingAway(..) => { + let res = self.check_connection_events(); + if self.check_result(now, &res) { + return; + } + self.push_handler + .borrow_mut() + .maybe_send_max_push_id_frame(&mut self.base_handler); + let res = self.base_handler.process_sending(&mut self.conn); + self.check_result(now, &res); + } + Http3State::Closed { .. } => {} + _ => { + let res = self.check_connection_events(); + let _ = self.check_result(now, &res); + } + } + } + + /// The function should be called to check if there is a new UDP packet to be sent. It should + /// be called after a new packet is received and processed and after a timer expires (QUIC + /// needs timers to handle events like PTO detection and timers are not implemented by the neqo + /// library, but instead must be driven by the application). + /// + /// `process_output` can return: + /// - a [`Output::Datagram(Datagram)`][1]: data that should be sent as a UDP payload, + /// - a [`Output::Callback(Duration)`][1]: the duration of a timer. `process_output` should be called at least after the time expires, + /// - [`Output::None`][1]: this is returned when `Nttp3Client` is done and can be destroyed. + /// + /// The application should call this function repeatedly until a timer value or None is + /// returned. After that, the application should call the function again if a new UDP packet is + /// received and processed or the timer value expires. + /// + /// The HTTP/3 neqo implementation drives the HTTP/3 and QUC layers, therefore this function + /// will call both layers: + /// - First it calls HTTP/3 layer processing (`process_http3`) to make sure the layer writes + /// data to QUIC layer or cancels streams if needed. + /// - Then QUIC layer processing is called - [`Connection::process_output`][3]. This produces a + /// packet or a timer value. It may also produce ned [`ConnectionEvent`][2]s, e.g. connection + /// state-change event. + /// - Therefore the HTTP/3 layer processing (`process_http3`) is called again. + /// + /// [1]: ../neqo_transport/enum.Output.html + /// [2]: ../neqo_transport/struct.ConnectionEvents.html + /// [3]: ../neqo_transport/struct.Connection.html#method.process_output + pub fn process_output(&mut self, now: Instant) -> Output { + qtrace!([self], "Process output."); + + // Maybe send() stuff on http3-managed streams + self.process_http3(now); + + let out = self.conn.process_output(now); + + // Update H3 for any transport state changes and events + self.process_http3(now); + + out + } + + /// This function takes the provided result and check for an error. + /// An error results in closing the connection. + fn check_result<ERR>(&mut self, now: Instant, res: &Res<ERR>) -> bool { + match &res { + Err(Error::HttpGoaway) => { + qinfo!([self], "Connection error: goaway stream_id increased."); + self.close( + now, + Error::HttpGeneralProtocol.code(), + "Connection error: goaway stream_id increased", + ); + true + } + Err(e) => { + qinfo!([self], "Connection error: {}.", e); + self.close(now, e.code(), &format!("{}", e)); + true + } + _ => false, + } + } + + /// This function checks [`ConnectionEvent`][2]s emitted by the QUIC layer, e.g. connection change + /// state events, new incoming stream data is available, a stream is was reset, etc. The HTTP/3 + /// layer needs to handle these events. Most of the events are handled by + /// [`Http3Connection`][1] by calling appropriate functions, e.g. `handle_state_change`, + /// `handle_stream_reset`, etc. [`Http3Connection`][1] handle functionalities that are common + /// for the client and server side. Some of the functionalities are specific to the client and + /// they are handled by `Http3Client`. For example, [`ConnectionEvent::RecvStreamReadable`][3] event + /// is handled by `Http3Client::handle_stream_readable`. The function calls + /// `Http3Connection::handle_stream_readable` and then hands the return value as appropriate + /// for the client-side. + /// + /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs + /// [2]: ../neqo_transport/enum.ConnectionEvent.html + /// [3]: ../neqo_transport/enum.ConnectionEvent.html#variant.RecvStreamReadable + fn check_connection_events(&mut self) -> Res<()> { + qtrace!([self], "Check connection events."); + while let Some(e) = self.conn.next_event() { + qdebug!([self], "check_connection_events - event {:?}.", e); + match e { + ConnectionEvent::NewStream { stream_id } => { + // During this event we only add a new stream to the Http3Connection stream list, + // with NewStreamHeadReader stream handler. + // This function will not read from the stream and try to decode the stream. + // RecvStreamReadable will be emitted after this event and reading, i.e. decoding + // of a stream will happen during that event. + self.base_handler.add_new_stream(stream_id); + } + ConnectionEvent::SendStreamWritable { stream_id } => { + if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) { + s.stream_writable(); + } + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + self.handle_stream_readable(stream_id)?; + } + ConnectionEvent::RecvStreamReset { + stream_id, + app_error, + } => self + .base_handler + .handle_stream_reset(stream_id, app_error, &mut self.conn)?, + ConnectionEvent::SendStreamStopSending { + stream_id, + app_error, + } => self.base_handler.handle_stream_stop_sending( + stream_id, + app_error, + &mut self.conn, + )?, + + ConnectionEvent::SendStreamCreatable { stream_type } => { + self.events.new_requests_creatable(stream_type); + } + ConnectionEvent::AuthenticationNeeded => self.events.authentication_needed(), + ConnectionEvent::EchFallbackAuthenticationNeeded { public_name } => { + self.events.ech_fallback_authentication_needed(public_name); + } + ConnectionEvent::StateChange(state) => { + if self + .base_handler + .handle_state_change(&mut self.conn, &state)? + { + self.events + .connection_state_change(self.base_handler.state()); + } + } + ConnectionEvent::ZeroRttRejected => { + self.base_handler.handle_zero_rtt_rejected()?; + self.events.zero_rtt_rejected(); + self.push_handler.borrow_mut().handle_zero_rtt_rejected(); + } + ConnectionEvent::ResumptionToken(token) => { + if let Some(t) = self.encode_resumption_token(&token) { + self.events.resumption_token(t); + } + } + ConnectionEvent::Datagram(dgram) => { + self.base_handler.handle_datagram(&dgram); + } + ConnectionEvent::SendStreamComplete { .. } + | ConnectionEvent::OutgoingDatagramOutcome { .. } + | ConnectionEvent::IncomingDatagramDropped => {} + } + } + Ok(()) + } + + /// This function handled new data available on a stream. It calls + /// `Http3Client::handle_stream_readable` and handles its response. Reading streams are mostly + /// handled by [`Http3Connection`][1] because most part of it is common for the client and + /// server. The following actions need to be handled by the client-specific code: + /// - `ReceiveOutput::NewStream(NewStreamType::Push(_))` - the server cannot receive a push + /// stream, + /// - `ReceiveOutput::NewStream(NewStreamType::Http)` - client cannot receive a + /// server-initiated HTTP request, + /// - `ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_))` - because + /// `Http3ClientEvents`is needed and events handler is specific to the client. + /// - `ReceiveOutput::ControlFrames(control_frames)` - some control frame handling differs + /// between the client and the server: + /// - `HFrame::CancelPush` - only the client-side may receive it, + /// - `HFrame::MaxPushId { .. }`, `HFrame::PriorityUpdateRequest { .. } ` and + /// `HFrame::PriorityUpdatePush` can only be receive on the server side, + /// - `HFrame::Goaway { stream_id }` needs specific handling by the client by the protocol + /// specification. + /// + /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs + fn handle_stream_readable(&mut self, stream_id: StreamId) -> Res<()> { + match self + .base_handler + .handle_stream_readable(&mut self.conn, stream_id)? + { + ReceiveOutput::NewStream(NewStreamType::Push(push_id)) => { + self.handle_new_push_stream(stream_id, push_id) + } + ReceiveOutput::NewStream(NewStreamType::Http) => Err(Error::HttpStreamCreation), + ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => { + self.base_handler.webtransport_create_stream_remote( + StreamId::from(session_id), + stream_id, + Box::new(self.events.clone()), + Box::new(self.events.clone()), + )?; + let res = self + .base_handler + .handle_stream_readable(&mut self.conn, stream_id)?; + debug_assert!(matches!(res, ReceiveOutput::NoOutput)); + Ok(()) + } + ReceiveOutput::ControlFrames(control_frames) => { + for f in control_frames { + match f { + HFrame::CancelPush { push_id } => self + .push_handler + .borrow_mut() + .handle_cancel_push(push_id, &mut self.conn, &mut self.base_handler), + HFrame::MaxPushId { .. } + | HFrame::PriorityUpdateRequest { .. } + | HFrame::PriorityUpdatePush { .. } => Err(Error::HttpFrameUnexpected), + HFrame::Goaway { stream_id } => self.handle_goaway(stream_id), + _ => { + unreachable!( + "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames." + ); + } + }?; + } + Ok(()) + } + _ => Ok(()), + } + } + + fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> { + if !self.push_handler.borrow().can_receive_push() { + return Err(Error::HttpId); + } + + // Add a new push stream to `PushController`. `add_new_push_stream` may return an error + // (this will be a connection error) or a bool. + // If false is returned that means that the stream should be reset because the push has + // been already canceled (CANCEL_PUSH frame or canceling push from the application). + if !self + .push_handler + .borrow_mut() + .add_new_push_stream(push_id, stream_id)? + { + // We are not interested in the result of stream_stop_sending, we are not interested + // in this stream. + mem::drop( + self.conn + .stream_stop_sending(stream_id, Error::HttpRequestCancelled.code()), + ); + return Ok(()); + } + + self.base_handler.add_recv_stream( + stream_id, + Box::new(RecvMessage::new( + &RecvMessageInfo { + message_type: MessageType::Response, + stream_type: Http3StreamType::Push, + stream_id, + header_frame_type_read: false, + }, + Rc::clone(&self.base_handler.qpack_decoder), + Box::new(RecvPushEvents::new(push_id, Rc::clone(&self.push_handler))), + None, + // TODO: think about the right prority for the push streams. + PriorityHandler::new(true, Priority::default()), + )), + ); + let res = self + .base_handler + .handle_stream_readable(&mut self.conn, stream_id)?; + debug_assert!(matches!(res, ReceiveOutput::NoOutput)); + Ok(()) + } + + fn handle_goaway(&mut self, goaway_stream_id: StreamId) -> Res<()> { + qinfo!([self], "handle_goaway {}", goaway_stream_id); + + if goaway_stream_id.is_uni() || goaway_stream_id.is_server_initiated() { + return Err(Error::HttpId); + } + + match self.base_handler.state { + Http3State::Connected => { + self.base_handler.state = Http3State::GoingAway(goaway_stream_id); + } + Http3State::GoingAway(ref mut stream_id) => { + if goaway_stream_id > *stream_id { + return Err(Error::HttpGoaway); + } + *stream_id = goaway_stream_id; + } + Http3State::Closing(..) | Http3State::Closed(..) => {} + _ => unreachable!("Should not receive Goaway frame in this state."), + } + + // Issue reset events for streams >= goaway stream id + let send_ids: Vec<StreamId> = self + .base_handler + .send_streams + .iter() + .filter_map(id_gte(goaway_stream_id)) + .collect(); + for id in send_ids { + // We do not care about streams that are going to be closed. + mem::drop(self.base_handler.handle_stream_stop_sending( + id, + Error::HttpRequestRejected.code(), + &mut self.conn, + )); + } + + let recv_ids: Vec<StreamId> = self + .base_handler + .recv_streams + .iter() + .filter_map(id_gte(goaway_stream_id)) + .collect(); + for id in recv_ids { + // We do not care about streams that are going to be closed. + mem::drop(self.base_handler.handle_stream_reset( + id, + Error::HttpRequestRejected.code(), + &mut self.conn, + )); + } + + self.events.goaway_received(); + + Ok(()) + } + + /// Increases `max_stream_data` for a `stream_id`. + /// # Errors + /// Returns `InvalidStreamId` if a stream does not exist or the receiving + /// side is closed. + pub fn set_stream_max_data(&mut self, stream_id: StreamId, max_data: u64) -> Res<()> { + self.conn.set_stream_max_data(stream_id, max_data)?; + Ok(()) + } + + #[must_use] + pub fn qpack_decoder_stats(&self) -> QpackStats { + self.base_handler.qpack_decoder.borrow().stats() + } + + #[must_use] + pub fn qpack_encoder_stats(&self) -> QpackStats { + self.base_handler.qpack_encoder.borrow().stats() + } + + #[must_use] + pub fn transport_stats(&self) -> TransportStats { + self.conn.stats() + } + + #[must_use] + pub fn webtransport_enabled(&self) -> bool { + self.base_handler.webtransport_enabled() + } +} + +impl EventProvider for Http3Client { + type Event = Http3ClientEvent; + + /// Return true if there are outstanding events. + fn has_events(&self) -> bool { + self.events.has_events() + } + + /// Get events that indicate state changes on the connection. This method + /// correctly handles cases where handling one event can obsolete + /// previously-queued events, or cause new events to be generated. + fn next_event(&mut self) -> Option<Self::Event> { + self.events.next_event() + } +} + +#[cfg(test)] +mod tests { + use super::{ + AuthenticationStatus, Connection, Error, HSettings, Header, Http3Client, Http3ClientEvent, + Http3Parameters, Http3State, Rc, RefCell, + }; + use crate::frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES}; + use crate::qpack_encoder_receiver::EncoderRecvStream; + use crate::settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS}; + use crate::{Http3Server, Priority, RecvStream}; + use neqo_common::{event::Provider, qtrace, Datagram, Decoder, Encoder}; + use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken}; + use neqo_qpack::{encoder::QPackEncoder, QpackSettings}; + use neqo_transport::{ + ConnectionError, ConnectionEvent, ConnectionParameters, Output, State, StreamId, + StreamType, Version, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE, + }; + use std::convert::TryFrom; + use std::mem; + use std::time::Duration; + use test_fixture::{ + addr, anti_replay, default_server_h3, fixture_init, new_server, now, + CountingConnectionIdGenerator, DEFAULT_ALPN_H3, DEFAULT_KEYS, DEFAULT_SERVER_NAME, + }; + + fn assert_closed(client: &Http3Client, expected: &Error) { + match client.state() { + Http3State::Closing(err) | Http3State::Closed(err) => { + assert_eq!(err, ConnectionError::Application(expected.code())); + } + _ => panic!("Wrong state {:?}", client.state()), + }; + } + + /// Create a http3 client with default configuration. + pub fn default_http3_client() -> Http3Client { + default_http3_client_param(100) + } + + pub fn default_http3_client_param(max_table_size: u64) -> Http3Client { + fixture_init(); + Http3Client::new( + DEFAULT_SERVER_NAME, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + addr(), + addr(), + Http3Parameters::default() + .connection_parameters( + // Disable compatible upgrade, which complicates tests. + ConnectionParameters::default() + .versions(Version::default(), vec![Version::default()]), + ) + .max_table_size_encoder(max_table_size) + .max_table_size_decoder(max_table_size) + .max_blocked_streams(100) + .max_concurrent_push_streams(5), + now(), + ) + .expect("create a default client") + } + + const CONTROL_STREAM_TYPE: &[u8] = &[0x0]; + + // Encoder stream data + const ENCODER_STREAM_DATA: &[u8] = &[0x2]; + const ENCODER_STREAM_CAP_INSTRUCTION: &[u8] = &[0x3f, 0x45]; + + // Encoder stream data with a change capacity instruction(0x3f, 0x45 = change capacity to 100) + // This data will be send when 0-RTT is used and we already have a max_table_capacity from + // resumed settings. + const ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION: &[u8] = &[0x2, 0x3f, 0x45]; + + const ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST: &[u8] = &[ + 0x2, 0x3f, 0x45, 0x67, 0xa7, 0xd4, 0xe5, 0x1c, 0x85, 0xb1, 0x1f, 0x86, 0xa7, 0xd7, 0x71, + 0xd1, 0x69, 0x7f, + ]; + + // Decoder stream data + const DECODER_STREAM_DATA: &[u8] = &[0x3]; + + const PUSH_STREAM_TYPE: &[u8] = &[0x1]; + + const CLIENT_SIDE_CONTROL_STREAM_ID: StreamId = StreamId::new(2); + const CLIENT_SIDE_ENCODER_STREAM_ID: StreamId = StreamId::new(6); + const CLIENT_SIDE_DECODER_STREAM_ID: StreamId = StreamId::new(10); + + struct TestServer { + settings: HFrame, + conn: Connection, + control_stream_id: Option<StreamId>, + encoder: Rc<RefCell<QPackEncoder>>, + encoder_receiver: EncoderRecvStream, + encoder_stream_id: Option<StreamId>, + decoder_stream_id: Option<StreamId>, + } + + impl TestServer { + pub fn new() -> Self { + Self::new_with_settings(&[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ]) + } + + pub fn new_with_settings(server_settings: &[HSetting]) -> Self { + fixture_init(); + let max_table_size = server_settings + .iter() + .find(|s| s.setting_type == HSettingType::MaxTableCapacity) + .map_or(100, |s| s.value); + let max_blocked_streams = u16::try_from( + server_settings + .iter() + .find(|s| s.setting_type == HSettingType::BlockedStreams) + .map_or(100, |s| s.value), + ) + .unwrap(); + let qpack = Rc::new(RefCell::new(QPackEncoder::new( + &QpackSettings { + max_table_size_encoder: max_table_size, + max_table_size_decoder: max_table_size, + max_blocked_streams, + }, + true, + ))); + Self { + settings: HFrame::Settings { + settings: HSettings::new(server_settings), + }, + conn: default_server_h3(), + control_stream_id: None, + encoder: Rc::clone(&qpack), + encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack), + encoder_stream_id: None, + decoder_stream_id: None, + } + } + + pub fn new_with_conn(conn: Connection) -> Self { + let qpack = Rc::new(RefCell::new(QPackEncoder::new( + &QpackSettings { + max_table_size_encoder: 128, + max_table_size_decoder: 128, + max_blocked_streams: 0, + }, + true, + ))); + Self { + settings: HFrame::Settings { + settings: HSettings::new(&[]), + }, + conn, + control_stream_id: None, + encoder: Rc::clone(&qpack), + encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack), + encoder_stream_id: None, + decoder_stream_id: None, + } + } + + pub fn create_qpack_streams(&mut self) { + // Create a QPACK encoder stream + self.encoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap()); + self.encoder + .borrow_mut() + .add_send_stream(self.encoder_stream_id.unwrap()); + self.encoder + .borrow_mut() + .send_encoder_updates(&mut self.conn) + .unwrap(); + + // Create decoder stream + self.decoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap()); + assert_eq!( + self.conn + .stream_send(self.decoder_stream_id.unwrap(), DECODER_STREAM_DATA) + .unwrap(), + 1 + ); + } + + pub fn create_control_stream(&mut self) { + // Create control stream + let control = self.conn.stream_create(StreamType::UniDi).unwrap(); + qtrace!(["TestServer"], "control stream: {}", control); + self.control_stream_id = Some(control); + // Send stream type on the control stream. + assert_eq!( + self.conn + .stream_send(self.control_stream_id.unwrap(), CONTROL_STREAM_TYPE) + .unwrap(), + 1 + ); + + // Encode a settings frame and send it. + let mut enc = Encoder::default(); + self.settings.encode(&mut enc); + assert_eq!( + self.conn + .stream_send(self.control_stream_id.unwrap(), enc.as_ref()) + .unwrap(), + enc.len() + ); + } + + pub fn check_client_control_qpack_streams_no_resumption(&mut self) { + self.check_client_control_qpack_streams( + ENCODER_STREAM_DATA, + EXPECTED_REQUEST_HEADER_FRAME, + false, + true, + ); + } + + pub fn check_control_qpack_request_streams_resumption( + &mut self, + expect_encoder_stream_data: &[u8], + expect_request_header: &[u8], + expect_request: bool, + ) { + self.check_client_control_qpack_streams( + expect_encoder_stream_data, + expect_request_header, + expect_request, + false, + ); + } + + // Check that server has received correct settings and qpack streams. + pub fn check_client_control_qpack_streams( + &mut self, + expect_encoder_stream_data: &[u8], + expect_request_header: &[u8], + expect_request: bool, + expect_connected: bool, + ) { + let mut connected = false; + let mut control_stream = false; + let mut qpack_decoder_stream = false; + let mut qpack_encoder_stream = false; + let mut request = false; + while let Some(e) = self.conn.next_event() { + match e { + ConnectionEvent::NewStream { stream_id } + | ConnectionEvent::SendStreamWritable { stream_id } => { + if expect_request { + assert!(matches!(stream_id.as_u64(), 2 | 6 | 10 | 0)); + } else { + assert!(matches!(stream_id.as_u64(), 2 | 6 | 10)); + } + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + if stream_id == CLIENT_SIDE_CONTROL_STREAM_ID { + self.check_control_stream(); + control_stream = true; + } else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID { + // the qpack encoder stream + self.read_and_check_stream_data( + stream_id, + expect_encoder_stream_data, + false, + ); + qpack_encoder_stream = true; + } else if stream_id == CLIENT_SIDE_DECODER_STREAM_ID { + // the qpack decoder stream + self.read_and_check_stream_data(stream_id, DECODER_STREAM_DATA, false); + qpack_decoder_stream = true; + } else if stream_id == 0 { + assert!(expect_request); + self.read_and_check_stream_data(stream_id, expect_request_header, true); + request = true; + } else { + panic!("unexpected event"); + } + } + ConnectionEvent::StateChange(State::Connected) => connected = true, + ConnectionEvent::StateChange(_) + | ConnectionEvent::SendStreamCreatable { .. } => {} + _ => panic!("unexpected event"), + } + } + assert_eq!(connected, expect_connected); + assert!(control_stream); + assert!(qpack_encoder_stream); + assert!(qpack_decoder_stream); + assert_eq!(request, expect_request); + } + + // Check that the control stream contains default values. + // Expect a SETTINGS frame, some grease, and a MAX_PUSH_ID frame. + // The default test configuration uses: + // - max_table_capacity = 100 + // - max_blocked_streams = 100 + // and a maximum of 5 push streams. + fn check_control_stream(&mut self) { + let mut buf = [0_u8; 100]; + let (amount, fin) = self + .conn + .stream_recv(CLIENT_SIDE_CONTROL_STREAM_ID, &mut buf) + .unwrap(); + let mut dec = Decoder::from(&buf[..amount]); + assert_eq!(dec.decode_varint().unwrap(), 0); // control stream type + assert_eq!(dec.decode_varint().unwrap(), 4); // SETTINGS + assert_eq!( + dec.decode_vvec().unwrap(), + &[1, 0x40, 0x64, 7, 0x40, 0x64, 0xab, 0x60, 0x37, 0x42, 0x00] + ); + + assert_eq!((dec.decode_varint().unwrap() - 0x21) % 0x1f, 0); // Grease + assert!(dec.decode_vvec().unwrap().len() < 8); + + assert_eq!(dec.decode_varint().unwrap(), 0xd); // MAX_PUSH_ID + assert_eq!(dec.decode_vvec().unwrap(), &[5]); + + assert_eq!(dec.remaining(), 0); + assert!(!fin); + } + + pub fn read_and_check_stream_data( + &mut self, + stream_id: StreamId, + expected_data: &[u8], + expected_fin: bool, + ) { + let mut buf = [0_u8; 100]; + let (amount, fin) = self.conn.stream_recv(stream_id, &mut buf).unwrap(); + assert_eq!(fin, expected_fin); + assert_eq!(amount, expected_data.len()); + assert_eq!(&buf[..amount], expected_data); + } + + pub fn encode_headers( + &mut self, + stream_id: StreamId, + headers: &[Header], + encoder: &mut Encoder, + ) { + let header_block = + self.encoder + .borrow_mut() + .encode_header_block(&mut self.conn, headers, stream_id); + let hframe = HFrame::Headers { + header_block: header_block.as_ref().to_vec(), + }; + hframe.encode(encoder); + } + } + + fn handshake_only(client: &mut Http3Client, server: &mut TestServer) -> Output { + assert_eq!(client.state(), Http3State::Initializing); + let out = client.process(None, now()); + assert_eq!(client.state(), Http3State::Initializing); + + assert_eq!(*server.conn.state(), State::Init); + let out = server.conn.process(out.dgram(), now()); + assert_eq!(*server.conn.state(), State::Handshaking); + + let out = client.process(out.dgram(), now()); + let out = server.conn.process(out.dgram(), now()); + assert!(out.as_dgram_ref().is_none()); + + let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); + assert!(client.events().any(authentication_needed)); + client.authenticated(AuthenticationStatus::Ok, now()); + out + } + + // Perform only Quic transport handshake. + fn connect_only_transport_with(client: &mut Http3Client, server: &mut TestServer) { + let out = handshake_only(client, server); + + let out = client.process(out.dgram(), now()); + let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected)); + assert!(client.events().any(connected)); + + assert_eq!(client.state(), Http3State::Connected); + server.conn.process_input(out.dgram().unwrap(), now()); + assert!(server.conn.state().connected()); + } + + // Perform only Quic transport handshake. + fn connect_only_transport() -> (Http3Client, TestServer) { + let mut client = default_http3_client(); + let mut server = TestServer::new(); + connect_only_transport_with(&mut client, &mut server); + (client, server) + } + + fn send_and_receive_client_settings(client: &mut Http3Client, server: &mut TestServer) { + // send and receive client settings + let dgram = client.process(None, now()).dgram(); + server.conn.process_input(dgram.unwrap(), now()); + server.check_client_control_qpack_streams_no_resumption(); + } + + // Perform Quic transport handshake and exchange Http3 settings. + fn connect_with(client: &mut Http3Client, server: &mut TestServer) { + connect_only_transport_with(client, server); + + send_and_receive_client_settings(client, server); + + server.create_control_stream(); + + server.create_qpack_streams(); + // Send the server's control and qpack streams data. + let dgram = server.conn.process(None, now()).dgram(); + client.process_input(dgram.unwrap(), now()); + + // assert no error occured. + assert_eq!(client.state(), Http3State::Connected); + } + + // Perform Quic transport handshake and exchange Http3 settings. + fn connect_with_connection_parameters( + server_conn_params: ConnectionParameters, + ) -> (Http3Client, TestServer) { + // connecting with default max_table_size + let mut client = default_http3_client_param(100); + let server = Connection::new_server( + test_fixture::DEFAULT_KEYS, + test_fixture::DEFAULT_ALPN_H3, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + server_conn_params, + ) + .unwrap(); + let mut server = TestServer::new_with_conn(server); + connect_with(&mut client, &mut server); + (client, server) + } + + // Perform Quic transport handshake and exchange Http3 settings. + fn connect() -> (Http3Client, TestServer) { + let mut client = default_http3_client(); + let mut server = TestServer::new(); + connect_with(&mut client, &mut server); + (client, server) + } + + // Fetch request fetch("GET", "https", "something.com", "/", headers). + fn make_request( + client: &mut Http3Client, + close_sending_side: bool, + headers: &[Header], + ) -> StreamId { + let request_stream_id = client + .fetch( + now(), + "GET", + "https://something.com/", + headers, + Priority::default(), + ) + .unwrap(); + if close_sending_side { + client.stream_close_send(request_stream_id).unwrap(); + } + request_stream_id + } + + // For fetch request fetch("GET", "https", "something.com", "/", &[]) + // the following request header frame will be sent: + const EXPECTED_REQUEST_HEADER_FRAME: &[u8] = &[ + 0x01, 0x10, 0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, + 0x43, 0xd3, 0xc1, + ]; + + // For fetch request fetch("GET", "https", "something.com", "/", &[(String::from("myheaders", "myvalue"))]) + // the following request header frame will be sent: + const EXPECTED_REQUEST_HEADER_FRAME_VERSION2: &[u8] = &[ + 0x01, 0x11, 0x02, 0x80, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, + 0x43, 0xd3, 0xc1, 0x10, + ]; + + const HTTP_HEADER_FRAME_0: &[u8] = &[0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30]; + + // The response header from HTTP_HEADER_FRAME (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30) are + // decoded into: + fn check_response_header_0(header: &[Header]) { + let expected_response_header_0 = &[ + Header::new(":status", "200"), + Header::new("content-length", "0"), + ]; + assert_eq!(header, expected_response_header_0); + } + + const HTTP_RESPONSE_1: &[u8] = &[ + // headers + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // the first data frame + 0x0, 0x3, 0x61, 0x62, 0x63, // the second data frame + 0x0, 0x4, 0x64, 0x65, 0x66, 0x67, + ]; + + const HTTP_RESPONSE_HEADER_ONLY_1: &[u8] = &[ + // headers + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, + ]; + const HTTP_RESPONSE_DATA_FRAME_1_ONLY_1: &[u8] = &[0x0, 0x3, 0x61, 0x62, 0x63]; + + const HTTP_RESPONSE_DATA_FRAME_2_ONLY_1: &[u8] = &[0x0, 0x4, 0x64, 0x65, 0x66, 0x67]; + + // The response header from HTTP_RESPONSE_1 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are + // decoded into: + fn check_response_header_1(header: &[Header]) { + let expected_response_header_1 = &[ + Header::new(":status", "200"), + Header::new("content-length", "7"), + ]; + assert_eq!(header, expected_response_header_1); + } + + const EXPECTED_RESPONSE_DATA_1: &[u8] = &[0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67]; + + const HTTP_RESPONSE_2: &[u8] = &[ + // headers + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33, // the data frame + 0x0, 0x3, 0x61, 0x62, 0x63, + ]; + + const HTTP_RESPONSE_HEADER_ONLY_2: &[u8] = &[ + // headers + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33, + ]; + + const HTTP_RESPONSE_DATA_FRAME_ONLY_2: &[u8] = &[ + // the data frame + 0x0, 0x3, 0x61, 0x62, 0x63, + ]; + + // The response header from HTTP_RESPONSE_2 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are + // decoded into: + fn check_response_header_2(header: &[Header]) { + let expected_response_header_2 = &[ + Header::new(":status", "200"), + Header::new("content-length", "3"), + ]; + assert_eq!(header, expected_response_header_2); + } + + // The data frame payload from HTTP_RESPONSE_2 is: + const EXPECTED_RESPONSE_DATA_2_FRAME_1: &[u8] = &[0x61, 0x62, 0x63]; + + fn make_request_and_exchange_pkts( + client: &mut Http3Client, + server: &mut TestServer, + close_sending_side: bool, + ) -> StreamId { + let request_stream_id = make_request(client, close_sending_side, &[]); + + let dgram = client.process(None, now()).dgram(); + server.conn.process_input(dgram.unwrap(), now()); + + // find the new request/response stream and send frame v on it. + while let Some(e) = server.conn.next_event() { + match e { + ConnectionEvent::NewStream { stream_id } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(stream_id.stream_type(), StreamType::BiDi); + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID { + server.read_and_check_stream_data( + stream_id, + ENCODER_STREAM_CAP_INSTRUCTION, + false, + ); + } else { + assert_eq!(stream_id, request_stream_id); + server.read_and_check_stream_data( + stream_id, + EXPECTED_REQUEST_HEADER_FRAME, + close_sending_side, + ); + } + } + _ => {} + } + } + let dgram = server.conn.process_output(now()).dgram(); + if let Some(d) = dgram { + client.process_input(d, now()); + } + request_stream_id + } + + fn connect_and_send_request(close_sending_side: bool) -> (Http3Client, TestServer, StreamId) { + let (mut client, mut server) = connect(); + let request_stream_id = + make_request_and_exchange_pkts(&mut client, &mut server, close_sending_side); + assert_eq!(request_stream_id, 0); + + (client, server, request_stream_id) + } + + fn server_send_response_and_exchange_packet( + client: &mut Http3Client, + server: &mut TestServer, + stream_id: StreamId, + response: impl AsRef<[u8]>, + close_stream: bool, + ) { + let _ = server + .conn + .stream_send(stream_id, response.as_ref()) + .unwrap(); + if close_stream { + server.conn.stream_close_send(stream_id).unwrap(); + } + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + } + + const PUSH_PROMISE_DATA: &[u8] = &[ + 0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, 0x43, 0xd3, + 0xc1, + ]; + + fn check_pushpromise_header(header: &[Header]) { + let expected_response_header_1 = &[ + Header::new(":method", "GET"), + Header::new(":scheme", "https"), + Header::new(":authority", "something.com"), + Header::new(":path", "/"), + ]; + assert_eq!(header, expected_response_header_1); + } + + // Send a push promise with push_id and request_stream_id. + fn send_push_promise(conn: &mut Connection, stream_id: StreamId, push_id: u64) { + let frame = HFrame::PushPromise { + push_id, + header_block: PUSH_PROMISE_DATA.to_vec(), + }; + let mut d = Encoder::default(); + frame.encode(&mut d); + let _ = conn.stream_send(stream_id, d.as_ref()).unwrap(); + } + + fn send_push_data_and_exchange_packets( + client: &mut Http3Client, + server: &mut TestServer, + push_id: u8, + close_push_stream: bool, + ) -> StreamId { + let push_stream_id = send_push_data(&mut server.conn, push_id, close_push_stream); + + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + + push_stream_id + } + + fn send_push_promise_and_exchange_packets( + client: &mut Http3Client, + server: &mut TestServer, + stream_id: StreamId, + push_id: u64, + ) { + send_push_promise(&mut server.conn, stream_id, push_id); + + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + } + + fn send_cancel_push_and_exchange_packets( + client: &mut Http3Client, + server: &mut TestServer, + push_id: u64, + ) { + let frame = HFrame::CancelPush { push_id }; + let mut d = Encoder::default(); + frame.encode(&mut d); + server + .conn + .stream_send(server.control_stream_id.unwrap(), d.as_ref()) + .unwrap(); + + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + } + + const PUSH_DATA: &[u8] = &[ + // headers + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34, // the data frame. + 0x0, 0x4, 0x61, 0x62, 0x63, 0x64, + ]; + + // The response header from PUSH_DATA (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34) are + // decoded into: + fn check_push_response_header(header: &[Header]) { + let expected_push_response_header = vec![ + Header::new(":status", "200"), + Header::new("content-length", "4"), + ]; + assert_eq!(header, &expected_push_response_header[..]); + } + + // The data frame payload from PUSH_DATA is: + const EXPECTED_PUSH_RESPONSE_DATA_FRAME: &[u8] = &[0x61, 0x62, 0x63, 0x64]; + + // Send push data on a push stream: + // 1) push_stream_type PUSH_STREAM_TYPE + // 2) push_id + // 3) PUSH_DATA that contains encoded headers and a data frame. + // This function can only handle small push_id numbers that fit in a varint of length 1 byte. + fn send_data_on_push( + conn: &mut Connection, + push_stream_id: StreamId, + push_id: u8, + data: impl AsRef<[u8]>, + close_push_stream: bool, + ) { + // send data + let _ = conn.stream_send(push_stream_id, PUSH_STREAM_TYPE).unwrap(); + let _ = conn.stream_send(push_stream_id, &[push_id]).unwrap(); + let _ = conn.stream_send(push_stream_id, data.as_ref()).unwrap(); + if close_push_stream { + conn.stream_close_send(push_stream_id).unwrap(); + } + } + + // Send push data on a push stream: + // 1) push_stream_type PUSH_STREAM_TYPE + // 2) push_id + // 3) PUSH_DATA that contains encoded headers and a data frame. + // This function can only handle small push_id numbers that fit in a varint of length 1 byte. + fn send_push_data(conn: &mut Connection, push_id: u8, close_push_stream: bool) -> StreamId { + send_push_with_data(conn, push_id, PUSH_DATA, close_push_stream) + } + + // Send push data on a push stream: + // 1) push_stream_type PUSH_STREAM_TYPE + // 2) push_id + // 3) and supplied push data. + // This function can only handle small push_id numbers that fit in a varint of length 1 byte. + fn send_push_with_data( + conn: &mut Connection, + push_id: u8, + data: &[u8], + close_push_stream: bool, + ) -> StreamId { + // create a push stream + let push_stream_id = conn.stream_create(StreamType::UniDi).unwrap(); + // send data + send_data_on_push(conn, push_stream_id, push_id, data, close_push_stream); + push_stream_id + } + + struct PushPromiseInfo { + pub push_id: u64, + pub ref_stream_id: StreamId, + } + + // Helper function: read response when a server sends: + // - HTTP_RESPONSE_2 on the request_stream_id stream, + // - a number of push promises described by a list of PushPromiseInfo. + // - and a push streams with push_id in the push_streams list. + // All push stream contain PUSH_DATA that decodes to headers (that can be checked by calling + // check_push_response_header) and EXPECTED_PUSH_RESPONSE_DATA_FRAME + fn read_response_and_push_events( + client: &mut Http3Client, + push_promises: &[PushPromiseInfo], + push_streams: &[u64], + response_stream_id: StreamId, + ) { + let mut num_push_promises = 0; + let mut num_push_stream_headers = 0; + let mut num_push_stream_data = 0; + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::PushPromise { + push_id, + request_stream_id, + headers, + } => { + assert!(push_promises + .iter() + .any(|p| p.push_id == push_id && p.ref_stream_id == request_stream_id)); + check_pushpromise_header(&headers[..]); + num_push_promises += 1; + } + Http3ClientEvent::PushHeaderReady { + push_id, + headers, + interim, + fin, + } => { + assert!(push_streams.contains(&push_id)); + check_push_response_header(&headers); + num_push_stream_headers += 1; + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::PushDataReadable { push_id } => { + assert!(push_streams.contains(&push_id)); + let mut buf = [0_u8; 100]; + let (amount, fin) = client.push_read_data(now(), push_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!(amount, EXPECTED_PUSH_RESPONSE_DATA_FRAME.len()); + assert_eq!(&buf[..amount], EXPECTED_PUSH_RESPONSE_DATA_FRAME); + num_push_stream_data += 1; + } + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, response_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, response_stream_id); + let mut buf = [0_u8; 100]; + let (amount, _) = client.read_data(now(), stream_id, &mut buf).unwrap(); + assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); + } + _ => {} + } + } + + assert_eq!(num_push_promises, push_promises.len()); + assert_eq!(num_push_stream_headers, push_streams.len()); + assert_eq!(num_push_stream_data, push_streams.len()); + } + + // Client: Test receiving a new control stream and a SETTINGS frame. + #[test] + fn test_client_connect_and_exchange_qpack_and_control_streams() { + mem::drop(connect()); + } + + // Client: Test that the connection will be closed if control stream + // has been closed. + #[test] + fn test_client_close_control_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_close_send(server.control_stream_id.unwrap()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the local control stream + // has been reset. + #[test] + fn test_client_reset_control_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_reset_send(server.control_stream_id.unwrap(), Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the server side encoder stream + // has been reset. + #[test] + fn test_client_reset_server_side_encoder_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_reset_send(server.encoder_stream_id.unwrap(), Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the server side decoder stream + // has been reset. + #[test] + fn test_client_reset_server_side_decoder_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_reset_send(server.decoder_stream_id.unwrap(), Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the local control stream + // has received a stop_sending. + #[test] + fn test_client_stop_sending_control_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_stop_sending(CLIENT_SIDE_CONTROL_STREAM_ID, Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the client side encoder stream + // has received a stop_sending. + #[test] + fn test_client_stop_sending_encoder_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_stop_sending(CLIENT_SIDE_ENCODER_STREAM_ID, Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: Test that the connection will be closed if the client side decoder stream + // has received a stop_sending. + #[test] + fn test_client_stop_sending_decoder_stream() { + let (mut client, mut server) = connect(); + server + .conn + .stream_stop_sending(CLIENT_SIDE_DECODER_STREAM_ID, Error::HttpNoError.code()) + .unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpClosedCriticalStream); + } + + // Client: test missing SETTINGS frame + // (the first frame sent is a garbage frame). + #[test] + fn test_client_missing_settings() { + let (mut client, mut server) = connect_only_transport(); + // Create server control stream. + let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); + // Send a HEADERS frame instead (which contains garbage). + let sent = server + .conn + .stream_send(control_stream, &[0x0, 0x1, 0x3, 0x0, 0x1, 0x2]); + assert_eq!(sent, Ok(6)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpMissingSettings); + } + + // Client: receiving SETTINGS frame twice causes connection close + // with error HTTP_UNEXPECTED_FRAME. + #[test] + fn test_client_receive_settings_twice() { + let (mut client, mut server) = connect(); + // send the second SETTINGS frame. + let sent = server.conn.stream_send( + server.control_stream_id.unwrap(), + &[0x4, 0x6, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64], + ); + assert_eq!(sent, Ok(8)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + fn test_wrong_frame_on_control_stream(v: &[u8]) { + let (mut client, mut server) = connect(); + + // send a frame that is not allowed on the control stream. + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), v) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + // send DATA frame on a cortrol stream + #[test] + fn test_data_frame_on_control_stream() { + test_wrong_frame_on_control_stream(&[0x0, 0x2, 0x1, 0x2]); + } + + // send HEADERS frame on a cortrol stream + #[test] + fn test_headers_frame_on_control_stream() { + test_wrong_frame_on_control_stream(&[0x1, 0x2, 0x1, 0x2]); + } + + // send PUSH_PROMISE frame on a cortrol stream + #[test] + fn test_push_promise_frame_on_control_stream() { + test_wrong_frame_on_control_stream(&[0x5, 0x2, 0x1, 0x2]); + } + + // send PRIORITY_UPDATE frame on a control stream to the client + #[test] + fn test_priority_update_request_on_control_stream() { + test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); + } + + #[test] + fn test_priority_update_push_on_control_stream() { + test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); + } + + fn test_wrong_frame_on_push_stream(v: &[u8]) { + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + send_push_promise(&mut server.conn, request_stream_id, 0); + // Create a push stream + let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); + + // Send the push stream type byte, push_id and frame v. + let _ = server + .conn + .stream_send(push_stream_id, &[0x01, 0x0]) + .unwrap(); + let _ = server.conn.stream_send(push_stream_id, v).unwrap(); + + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + #[test] + fn test_cancel_push_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x3, 0x1, 0x5]); + } + + #[test] + fn test_settings_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]); + } + + #[test] + fn test_push_promise_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x5, 0x2, 0x1, 0x2]); + } + + #[test] + fn test_priority_update_request_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); + } + + #[test] + fn test_priority_update_push_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); + } + + #[test] + fn test_goaway_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x7, 0x1, 0x5]); + } + + #[test] + fn test_max_push_id_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0xd, 0x1, 0x5]); + } + + // send DATA frame before a header frame + #[test] + fn test_data_frame_on_push_stream() { + test_wrong_frame_on_push_stream(&[0x0, 0x2, 0x1, 0x2]); + } + + // Client: receive unknown stream type + // This function also tests getting stream id that does not fit into a single byte. + #[test] + fn test_client_received_unknown_stream() { + let (mut client, mut server) = connect(); + + // create a stream with unknown type. + let new_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); + let _ = server + .conn + .stream_send(new_stream_id, &[0x41, 0x19, 0x4, 0x4, 0x6, 0x0, 0x8, 0x0]) + .unwrap(); + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + + // check for stop-sending with Error::HttpStreamCreation. + let mut stop_sending_event_found = false; + while let Some(e) = server.conn.next_event() { + if let ConnectionEvent::SendStreamStopSending { + stream_id, + app_error, + } = e + { + stop_sending_event_found = true; + assert_eq!(stream_id, new_stream_id); + assert_eq!(app_error, Error::HttpStreamCreation.code()); + } + } + assert!(stop_sending_event_found); + assert_eq!(client.state(), Http3State::Connected); + } + + // Test wrong frame on req/rec stream + fn test_wrong_frame_on_request_stream(v: &[u8]) { + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + let _ = server.conn.stream_send(request_stream_id, v).unwrap(); + + // Generate packet with the above bad h3 input + let out = server.conn.process(None, now()); + // Process bad input and close the connection. + mem::drop(client.process(out.dgram(), now())); + + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + #[test] + fn test_cancel_push_frame_on_request_stream() { + test_wrong_frame_on_request_stream(&[0x3, 0x1, 0x5]); + } + + #[test] + fn test_settings_frame_on_request_stream() { + test_wrong_frame_on_request_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]); + } + + #[test] + fn test_goaway_frame_on_request_stream() { + test_wrong_frame_on_request_stream(&[0x7, 0x1, 0x5]); + } + + #[test] + fn test_max_push_id_frame_on_request_stream() { + test_wrong_frame_on_request_stream(&[0xd, 0x1, 0x5]); + } + + #[test] + fn test_priority_update_request_on_request_stream() { + test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); + } + + #[test] + fn test_priority_update_push_on_request_stream() { + test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); + } + + // Test reading of a slowly streamed frame. bytes are received one by one + #[test] + fn test_frame_reading() { + let (mut client, mut server) = connect_only_transport(); + + // create a control stream. + let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); + + // send the stream type + let mut sent = server.conn.stream_send(control_stream, &[0x0]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // start sending SETTINGS frame + sent = server.conn.stream_send(control_stream, &[0x4]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x4]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x6]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x0]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x8]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x0]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_eq!(client.state(), Http3State::Connected); + + // Now test PushPromise + sent = server.conn.stream_send(control_stream, &[0x5]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x5]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x4]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x61]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x62]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x63]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + sent = server.conn.stream_send(control_stream, &[0x64]); + assert_eq!(sent, Ok(1)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // PUSH_PROMISE on a control stream will cause an error + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + #[test] + fn fetch_basic() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // send response - 200 Content-Length: 7 + // with content: 'abcdefg'. + // The content will be send in 2 DATA frames. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + true, + ); + + let http_events = client.events().collect::<Vec<_>>(); + assert_eq!(http_events.len(), 2); + for e in http_events { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_1(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!(amount, EXPECTED_RESPONSE_DATA_1.len()); + assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_1); + } + _ => {} + } + } + + // after this stream will be removed from hcoon. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + /// Force both endpoints into an idle state. + /// Do this by opening unidirectional streams at both endpoints and sending + /// a partial unidirectional stream type (which the receiver has to buffer), + /// then delivering packets out of order. + /// This forces the receiver to create an acknowledgment, which will allow + /// the peer to become idle. + fn force_idle(client: &mut Http3Client, server: &mut TestServer) { + // Send a partial unidirectional stream ID. + // Note that this can't close the stream as that causes the receiver + // to send `MAX_STREAMS`, which would prevent it from becoming idle. + fn dgram(c: &mut Connection) -> Datagram { + let stream = c.stream_create(StreamType::UniDi).unwrap(); + let _ = c.stream_send(stream, &[0xc0]).unwrap(); + c.process_output(now()).dgram().unwrap() + } + + let d1 = dgram(&mut client.conn); + let d2 = dgram(&mut client.conn); + server.conn.process_input(d2, now()); + server.conn.process_input(d1, now()); + let d3 = dgram(&mut server.conn); + let d4 = dgram(&mut server.conn); + client.process_input(d4, now()); + client.process_input(d3, now()); + let ack = client.process_output(now()).dgram(); + server.conn.process_input(ack.unwrap(), now()); + } + + /// The client should keep a connection alive if it has unanswered requests. + #[test] + fn fetch_keep_alive() { + let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); + force_idle(&mut client, &mut server); + + let idle_timeout = ConnectionParameters::default().get_idle_timeout(); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); + } + + // Helper function: read response when a server sends HTTP_RESPONSE_2. + fn read_response( + client: &mut Http3Client, + server: &mut Connection, + request_stream_id: StreamId, + ) { + let out = server.process(None, now()); + client.process(out.dgram(), now()); + + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); + } + _ => {} + } + } + + // after this stream will be removed from client. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + // Data sent with a request: + const REQUEST_BODY: &[u8] = &[0x64, 0x65, 0x66]; + // Corresponding data frame that server will receive. + const EXPECTED_REQUEST_BODY_FRAME: &[u8] = &[0x0, 0x3, 0x64, 0x65, 0x66]; + + // Send a request with the request body. + #[test] + fn fetch_with_data() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Get DataWritable for the request stream so that we can write the request body. + let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); + assert!(client.events().any(data_writable)); + let sent = client.send_data(request_stream_id, REQUEST_BODY).unwrap(); + assert_eq!(sent, REQUEST_BODY.len()); + client.stream_close_send(request_stream_id).unwrap(); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + // find the new request/response stream and send response on it. + while let Some(e) = server.conn.next_event() { + match e { + ConnectionEvent::NewStream { stream_id } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(stream_id.stream_type(), StreamType::BiDi); + } + ConnectionEvent::RecvStreamReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + + // Read request body. + let mut buf = [0_u8; 100]; + let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!(amount, EXPECTED_REQUEST_BODY_FRAME.len()); + assert_eq!(&buf[..amount], EXPECTED_REQUEST_BODY_FRAME); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); + server.conn.stream_close_send(stream_id).unwrap(); + } + _ => {} + } + } + + read_response(&mut client, &mut server.conn, request_stream_id); + } + + // send a request with request body containing request_body. We expect to receive expected_data_frame_header. + fn fetch_with_data_length_xbytes(request_body: &[u8], expected_data_frame_header: &[u8]) { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Get DataWritable for the request stream so that we can write the request body. + let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); + assert!(client.events().any(data_writable)); + let sent = client.send_data(request_stream_id, request_body); + assert_eq!(sent, Ok(request_body.len())); + + // Close stream. + client.stream_close_send(request_stream_id).unwrap(); + + // We need to loop a bit until all data has been sent. + let mut out = client.process(None, now()); + for _i in 0..20 { + out = server.conn.process(out.dgram(), now()); + out = client.process(out.dgram(), now()); + } + + // check request body is received. + // Then send a response. + while let Some(e) = server.conn.next_event() { + if let ConnectionEvent::RecvStreamReadable { stream_id } = e { + if stream_id == request_stream_id { + // Read the DATA frame. + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; + let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!( + amount, + request_body.len() + expected_data_frame_header.len() + ); + + // Check the DATA frame header + assert_eq!( + &buf[..expected_data_frame_header.len()], + expected_data_frame_header + ); + + // Check data. + assert_eq!(&buf[expected_data_frame_header.len()..amount], request_body); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); + server.conn.stream_close_send(stream_id).unwrap(); + } + } + } + + read_response(&mut client, &mut server.conn, request_stream_id); + } + + // send a request with 63 bytes. The DATA frame length field will still have 1 byte. + #[test] + fn fetch_with_data_length_63bytes() { + fetch_with_data_length_xbytes(&[0_u8; 63], &[0x0, 0x3f]); + } + + // send a request with 64 bytes. The DATA frame length field will need 2 byte. + #[test] + fn fetch_with_data_length_64bytes() { + fetch_with_data_length_xbytes(&[0_u8; 64], &[0x0, 0x40, 0x40]); + } + + // send a request with 16383 bytes. The DATA frame length field will still have 2 byte. + #[test] + fn fetch_with_data_length_16383bytes() { + fetch_with_data_length_xbytes(&[0_u8; 16383], &[0x0, 0x7f, 0xff]); + } + + // send a request with 16384 bytes. The DATA frame length field will need 4 byte. + #[test] + fn fetch_with_data_length_16384bytes() { + fetch_with_data_length_xbytes(&[0_u8; 16384], &[0x0, 0x80, 0x0, 0x40, 0x0]); + } + + // Send 2 data frames so that the second one cannot fit into the send_buf and it is only + // partialy sent. We check that the sent data is correct. + #[allow(clippy::useless_vec)] + fn fetch_with_two_data_frames( + first_frame: &[u8], + expected_first_data_frame_header: &[u8], + expected_second_data_frame_header: &[u8], + expected_second_data_frame: &[u8], + ) { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Get DataWritable for the request stream so that we can write the request body. + let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); + assert!(client.events().any(data_writable)); + + // Send the first frame. + let sent = client.send_data(request_stream_id, first_frame); + assert_eq!(sent, Ok(first_frame.len())); + + // The second frame cannot fit. + let sent = client.send_data(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]); + assert_eq!(sent, Ok(expected_second_data_frame.len())); + + // Close stream. + client.stream_close_send(request_stream_id).unwrap(); + + let mut out = client.process(None, now()); + // We need to loop a bit until all data has been sent. Once for every 1K + // of data. + for _i in 0..SEND_BUFFER_SIZE / 1000 { + out = server.conn.process(out.dgram(), now()); + out = client.process(out.dgram(), now()); + } + + // check received frames and send a response. + while let Some(e) = server.conn.next_event() { + if let ConnectionEvent::RecvStreamReadable { stream_id } = e { + if stream_id == request_stream_id { + // Read DATA frames. + let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; + let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!( + amount, + expected_first_data_frame_header.len() + + first_frame.len() + + expected_second_data_frame_header.len() + + expected_second_data_frame.len() + ); + + // Check the first DATA frame header + let end = expected_first_data_frame_header.len(); + assert_eq!(&buf[..end], expected_first_data_frame_header); + + // Check the first frame data. + let start = end; + let end = end + first_frame.len(); + assert_eq!(&buf[start..end], first_frame); + + // Check the second DATA frame header + let start2 = end; + let end2 = end + expected_second_data_frame_header.len(); + assert_eq!(&buf[start2..end2], expected_second_data_frame_header); + + // Check the second frame data. + let start3 = end2; + let end3 = end2 + expected_second_data_frame.len(); + assert_eq!(&buf[start3..end3], expected_second_data_frame); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); + server.conn.stream_close_send(stream_id).unwrap(); + } + } + } + + read_response(&mut client, &mut server.conn, request_stream_id); + } + + fn alloc_buffer(size: usize) -> (Vec<u8>, Vec<u8>) { + let data_frame = HFrame::Data { len: size as u64 }; + let mut enc = Encoder::default(); + data_frame.encode(&mut enc); + + (vec![0_u8; size], enc.as_ref().to_vec()) + } + + // Send 2 frames. For the second one we can only send 63 bytes. + // After the first frame there is exactly 63+2 bytes left in the send buffer. + #[test] + fn fetch_two_data_frame_second_63bytes() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); + } + + // Send 2 frames. For the second one we can only send 63 bytes. + // After the first frame there is exactly 63+3 bytes left in the send buffer, + // but we can only send 63 bytes. + #[test] + fn fetch_two_data_frame_second_63bytes_place_for_66() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); + } + + // Send 2 frames. For the second one we can only send 64 bytes. + // After the first frame there is exactly 64+3 bytes left in the send buffer, + // but we can only send 64 bytes. + #[test] + fn fetch_two_data_frame_second_64bytes_place_for_67() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]); + } + + // Send 2 frames. For the second one we can only send 16383 bytes. + // After the first frame there is exactly 16383+3 bytes left in the send buffer. + #[test] + fn fetch_two_data_frame_second_16383bytes() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); + } + + // Send 2 frames. For the second one we can only send 16383 bytes. + // After the first frame there is exactly 16383+4 bytes left in the send buffer, but we can only send 16383 bytes. + #[test] + fn fetch_two_data_frame_second_16383bytes_place_for_16387() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); + } + + // Send 2 frames. For the second one we can only send 16383 bytes. + // After the first frame there is exactly 16383+5 bytes left in the send buffer, but we can only send 16383 bytes. + #[test] + fn fetch_two_data_frame_second_16383bytes_place_for_16388() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); + } + + // Send 2 frames. For the second one we can send 16384 bytes. + // After the first frame there is exactly 16384+5 bytes left in the send buffer, but we can send 16384 bytes. + #[test] + fn fetch_two_data_frame_second_16384bytes_place_for_16389() { + let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412); + fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]); + } + + // Test receiving STOP_SENDING with the HttpNoError error code. + #[test] + fn test_stop_sending_early_response() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Stop sending with early_response. + assert_eq!( + Ok(()), + server + .conn + .stream_stop_sending(request_stream_id, Error::HttpNoError.code()) + ); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + let mut stop_sending = false; + let mut response_headers = false; + let mut response_body = false; + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { stream_id, error } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpNoError.code()); + // assert that we cannot send any more request data. + assert_eq!( + Err(Error::InvalidStreamId), + client.send_data(request_stream_id, &[0_u8; 10]) + ); + stop_sending = true; + } + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + response_headers = true; + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); + assert!(fin); + assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); + response_body = true; + } + _ => {} + } + } + assert!(response_headers); + assert!(response_body); + assert!(stop_sending); + + // after this stream will be removed from client. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + // Server sends stop sending and reset. + #[test] + fn test_stop_sending_other_error_with_reset() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Stop sending with RequestRejected. + assert_eq!( + Ok(()), + server + .conn + .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code()) + ); + // also reset with RequestRejected. + assert_eq!( + Ok(()), + server + .conn + .stream_reset_send(request_stream_id, Error::HttpRequestRejected.code()) + ); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut reset = false; + let mut stop_sending = false; + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { stream_id, error } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestRejected.code()); + stop_sending = true; + } + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestRejected.code()); + assert!(!local); + reset = true; + } + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { + panic!("We should not get any headers or data"); + } + _ => {} + } + } + + assert!(reset); + assert!(stop_sending); + + // after this stream will be removed from client. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + // Server sends stop sending with RequestRejected, but it does not send reset. + #[test] + fn test_stop_sending_other_error_wo_reset() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Stop sending with RequestRejected. + assert_eq!( + Ok(()), + server + .conn + .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code()) + ); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut stop_sending = false; + + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { stream_id, error } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestRejected.code()); + stop_sending = true; + } + Http3ClientEvent::Reset { .. } => { + panic!("We should not get StopSending."); + } + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { + panic!("We should not get any headers or data"); + } + _ => {} + } + } + + assert!(stop_sending); + + // after this we can still read from a stream. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_ok()); + + client.close(now(), 0, ""); + } + + // Server sends stop sending and reset. We have some events for that stream already + // in client.events. The events will be removed. + #[test] + fn test_stop_sending_and_reset_other_error_with_events() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + false, + ); + // At this moment we have some new events, i.e. a HeadersReady event + + // Send a stop sending and reset. + assert_eq!( + Ok(()), + server + .conn + .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code()) + ); + assert_eq!( + Ok(()), + server + .conn + .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) + ); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut reset = false; + + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { stream_id, error } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestCancelled.code()); + } + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestCancelled.code()); + assert!(!local); + reset = true; + } + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { + panic!("We should not get any headers or data"); + } + _ => {} + } + } + + assert!(reset); + + // after this stream will be removed from client. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + // Server sends stop sending with code that is not HttpNoError. + // We have some events for that stream already in the client.events. + // The events will be removed. + #[test] + fn test_stop_sending_other_error_with_events() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // send response - 200 Content-Length: 3 + // with content: 'abc'. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + false, + ); + // At this moment we have some new event, i.e. a HeadersReady event + + // Send a stop sending. + assert_eq!( + Ok(()), + server + .conn + .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code()) + ); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut stop_sending = false; + let mut header_ready = false; + + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { stream_id, error } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestCancelled.code()); + stop_sending = true; + } + Http3ClientEvent::Reset { .. } => { + panic!("We should not get StopSending."); + } + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { + header_ready = true; + } + _ => {} + } + } + + assert!(stop_sending); + assert!(header_ready); + + // after this, we can sill read data from a sttream. + let mut buf = [0_u8; 100]; + let (amount, fin) = client + .read_data(now(), request_stream_id, &mut buf) + .unwrap(); + assert!(!fin); + assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); + + client.close(now(), 0, ""); + } + + // Server sends a reset. We will close sending side as well. + #[test] + fn test_reset_wo_stop_sending() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Send a reset. + assert_eq!( + Ok(()), + server + .conn + .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) + ); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut reset = false; + + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::StopSending { .. } => { + panic!("We should not get StopSending."); + } + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert_eq!(stream_id, request_stream_id); + assert_eq!(error, Error::HttpRequestCancelled.code()); + assert!(!local); + reset = true; + } + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { + panic!("We should not get any headers or data"); + } + _ => {} + } + } + + assert!(reset); + + // after this stream will be removed from client. We will check this by trying to read + // from the stream and that should fail. + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), request_stream_id, &mut buf); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + client.close(now(), 0, ""); + } + + fn test_incomplet_frame(buf: &[u8], error: &Error) { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + buf, + true, + ); + + while let Some(e) = client.next_event() { + if let Http3ClientEvent::DataReadable { stream_id } = e { + assert_eq!(stream_id, request_stream_id); + let mut buf_res = [0_u8; 100]; + let res = client.read_data(now(), stream_id, &mut buf_res); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::HttpFrame); + } + } + assert_closed(&client, error); + } + + // Incomplete DATA frame + #[test] + fn test_incomplet_data_frame() { + test_incomplet_frame(&HTTP_RESPONSE_2[..12], &Error::HttpFrame); + } + + // Incomplete HEADERS frame + #[test] + fn test_incomplet_headers_frame() { + test_incomplet_frame(&HTTP_RESPONSE_2[..7], &Error::HttpFrame); + } + + #[test] + fn test_incomplet_unknown_frame() { + test_incomplet_frame(&[0x21], &Error::HttpFrame); + } + + // test goaway + #[test] + fn test_goaway() { + let (mut client, mut server) = connect(); + let request_stream_id_1 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_1, 0); + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + let request_stream_id_3 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_3, 8); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) + .unwrap(); + + // find the new request/response stream and send frame v on it. + while let Some(e) = server.conn.next_event() { + if let ConnectionEvent::RecvStreamReadable { stream_id } = e { + let mut buf = [0_u8; 100]; + let _ = server.conn.stream_recv(stream_id, &mut buf).unwrap(); + if (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2) { + // send response - 200 Content-Length: 7 + // with content: 'abcdefg'. + // The content will be send in 2 DATA frames. + let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_1).unwrap(); + server.conn.stream_close_send(stream_id).unwrap(); + } + } + } + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let mut stream_reset = false; + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { headers, fin, .. } => { + check_response_header_1(&headers); + assert!(!fin); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert!( + (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2) + ); + let mut buf = [0_u8; 100]; + assert_eq!( + (EXPECTED_RESPONSE_DATA_1.len(), true), + client.read_data(now(), stream_id, &mut buf).unwrap() + ); + } + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert_eq!(stream_id, request_stream_id_3); + assert_eq!(error, Error::HttpRequestRejected.code()); + assert!(!local); + stream_reset = true; + } + _ => {} + } + } + + assert!(stream_reset); + assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8))); + + // Check that a new request cannot be made. + assert_eq!( + client.fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default() + ), + Err(Error::AlreadyClosed) + ); + + client.close(now(), 0, ""); + } + + #[test] + fn multiple_goaways() { + let (mut client, mut server) = connect(); + let request_stream_id_1 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_1, 0); + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + let request_stream_id_3 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_3, 8); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + // First send a Goaway frame with an higher number + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Check that there is one reset for stream_id 8 + let mut stream_reset_1 = 0; + while let Some(e) = client.next_event() { + if let Http3ClientEvent::Reset { + stream_id, + error, + local, + } = e + { + assert_eq!(stream_id, request_stream_id_3); + assert_eq!(error, Error::HttpRequestRejected.code()); + assert!(!local); + stream_reset_1 += 1; + } + } + + assert_eq!(stream_reset_1, 1); + assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8))); + + // Server sends another GOAWAY frame + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4]) + .unwrap(); + + // Send response for stream 0 + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id_1, + HTTP_RESPONSE_1, + true, + ); + + let mut stream_reset_2 = 0; + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { headers, fin, .. } => { + check_response_header_1(&headers); + assert!(!fin); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert!(stream_id == request_stream_id_1); + let mut buf = [0_u8; 100]; + assert_eq!( + (EXPECTED_RESPONSE_DATA_1.len(), true), + client.read_data(now(), stream_id, &mut buf).unwrap() + ); + } + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert_eq!(stream_id, request_stream_id_2); + assert_eq!(error, Error::HttpRequestRejected.code()); + assert!(!local); + stream_reset_2 += 1; + } + _ => {} + } + } + + assert_eq!(stream_reset_2, 1); + assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4))); + } + + #[test] + fn multiple_goaways_stream_id_increased() { + let (mut client, mut server) = connect(); + let request_stream_id_1 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_1, 0); + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + let request_stream_id_3 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_3, 8); + + // First send a Goaway frame with a smaller number + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4]) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4))); + + // Now send a Goaway frame with an higher number + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_closed(&client, &Error::HttpGeneralProtocol); + } + + #[test] + fn goaway_wrong_stream_id() { + let (mut client, mut server) = connect(); + + let _ = server + .conn + .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x9]) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_closed(&client, &Error::HttpId); + } + + // Close stream before headers. + #[test] + fn test_stream_fin_wo_headers() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + // send fin before sending any data. + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Recv HeaderReady wo headers with fin. + let e = client.events().next().unwrap(); + assert_eq!( + e, + Http3ClientEvent::Reset { + stream_id: request_stream_id, + error: Error::HttpGeneralProtocolStream.code(), + local: true, + } + ); + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + // Close stream imemediately after headers. + #[test] + fn test_stream_fin_after_headers() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_HEADER_ONLY_2, + true, + ); + + // Recv HeaderReady with headers and fin. + let e = client.events().next().unwrap(); + if let Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } = e + { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(fin); + assert!(!interim); + } else { + panic!("wrong event type"); + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + // Send headers, read headers and than close stream. + // We should get HeaderReady and a DataReadable + #[test] + fn test_stream_fin_after_headers_are_read_wo_data_frame() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + // Send some good data wo fin + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_HEADER_ONLY_2, + false, + ); + + // Recv headers wo fin + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { .. } => { + panic!("We should not receive a DataGeadable event!"); + } + _ => {} + }; + } + + // ok NOW send fin + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Recv DataReadable wo data with fin + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { .. } => { + panic!("We should not get another HeaderReady!"); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), stream_id, &mut buf); + let (len, fin) = res.expect("should read"); + assert_eq!(0, len); + assert!(fin); + } + _ => {} + }; + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + // Send headers and an empty data frame, then close the stream. + #[test] + fn test_stream_fin_after_headers_and_a_empty_data_frame() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send headers. + let _ = server + .conn + .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) + .unwrap(); + // Send an empty data frame. + let _ = server + .conn + .stream_send(request_stream_id, &[0x00, 0x00]) + .unwrap(); + // ok NOW send fin + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Recv HeaderReady with fin. + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + assert_eq!(Ok((0, true)), client.read_data(now(), stream_id, &mut buf)); + } + _ => {} + }; + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), request_stream_id, &mut buf), + Err(Error::InvalidStreamId) + ); + } + + // Send headers and an empty data frame. Read headers and then close the stream. + // We should get a HeaderReady without fin and a DataReadable wo data and with fin. + #[test] + fn test_stream_fin_after_headers_an_empty_data_frame_are_read() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + // Send some good data wo fin + // Send headers. + let _ = server + .conn + .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) + .unwrap(); + // Send an empty data frame. + let _ = server + .conn + .stream_send(request_stream_id, &[0x00, 0x00]) + .unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Recv headers wo fin + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { .. } => { + panic!("We should not receive a DataGeadable event!"); + } + _ => {} + }; + } + + // ok NOW send fin + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Recv no data, but do get fin + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { .. } => { + panic!("We should not get another HeaderReady!"); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), stream_id, &mut buf); + let (len, fin) = res.expect("should read"); + assert_eq!(0, len); + assert!(fin); + } + _ => {} + }; + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + #[test] + fn test_stream_fin_after_a_data_frame() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + // Send some good data wo fin + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + false, + ); + + // Recv some good data wo fin + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } => { + assert_eq!(stream_id, request_stream_id); + check_response_header_2(&headers); + assert!(!fin); + assert!(!interim); + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let res = client.read_data(now(), stream_id, &mut buf); + let (len, fin) = res.expect("should have data"); + assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1); + assert!(!fin); + } + _ => {} + }; + } + + // ok NOW send fin + server.conn.stream_close_send(request_stream_id).unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // fin wo data should generate DataReadable + let e = client.events().next().unwrap(); + if let Http3ClientEvent::DataReadable { stream_id } = e { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0; 100]; + let res = client.read_data(now(), stream_id, &mut buf); + let (len, fin) = res.expect("should read"); + assert_eq!(0, len); + assert!(fin); + } else { + panic!("wrong event type"); + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + #[test] + fn test_multiple_data_frames() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send two data frames with fin + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + true, + ); + + // Read first frame + match client.events().nth(1).unwrap() { + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + assert_eq!( + (EXPECTED_RESPONSE_DATA_1.len(), true), + client.read_data(now(), stream_id, &mut buf).unwrap() + ); + } + x => { + panic!("event {:?}", x); + } + } + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + #[test] + fn test_receive_grease_before_response() { + // Construct an unknown frame. + const UNKNOWN_FRAME_LEN: usize = 832; + + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4); + enc.encode_varint(1028_u64); // Arbitrary type. + enc.encode_varint(UNKNOWN_FRAME_LEN as u64); + let mut buf: Vec<_> = enc.into(); + buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0); + let _ = server.conn.stream_send(request_stream_id, &buf).unwrap(); + + // Send a headers and a data frame with fin + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + // Read first frame + match client.events().nth(1).unwrap() { + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, request_stream_id); + let mut buf = [0_u8; 100]; + let (len, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); + assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); + assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1); + assert!(fin); + } + x => { + panic!("event {:?}", x); + } + } + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + #[test] + fn test_read_frames_header_blocked() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. + let encoder_inst_pkt = server.conn.process(None, now()); + + // Send response + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { len: 3 }; + d_frame.encode(&mut d); + d.encode(&[0x61, 0x62, 0x63]); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + true, + ); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!client.events().any(header_ready_event)); + + // Let client receive the encoder instructions. + mem::drop(client.process(encoder_inst_pkt.dgram(), now())); + + let out = server.conn.process(None, now()); + mem::drop(client.process(out.dgram(), now())); + mem::drop(client.process(None, now())); + + let mut recv_header = false; + let mut recv_data = false; + // Now the stream is unblocked and both headers and data will be consumed. + while let Some(e) = client.next_event() { + match e { + Http3ClientEvent::HeaderReady { stream_id, .. } => { + assert_eq!(stream_id, request_stream_id); + recv_header = true; + } + Http3ClientEvent::DataReadable { stream_id } => { + recv_data = true; + assert_eq!(stream_id, request_stream_id); + } + x => { + panic!("event {:?}", x); + } + } + } + assert!(recv_header && recv_data); + } + + #[test] + fn test_read_frames_header_blocked_with_fin_after_headers() { + let (mut hconn, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut hconn, &mut server); + + let sent_headers = vec![ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "0"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &sent_headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. + let encoder_inst_pkt = server.conn.process(None, now()); + + let mut d = Encoder::default(); + hframe.encode(&mut d); + + server_send_response_and_exchange_packet( + &mut hconn, + &mut server, + request_stream_id, + &d, + true, + ); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!hconn.events().any(header_ready_event)); + + // Let client receive the encoder instructions. + let _out = hconn.process(encoder_inst_pkt.dgram(), now()); + + let mut recv_header = false; + // Now the stream is unblocked. After headers we will receive a fin. + while let Some(e) = hconn.next_event() { + if let Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } = e + { + assert_eq!(stream_id, request_stream_id); + assert_eq!(headers.as_ref(), sent_headers); + assert!(fin); + assert!(!interim); + recv_header = true; + } else { + panic!("event {:?}", e); + } + } + assert!(recv_header); + } + + fn exchange_token(client: &mut Http3Client, server: &mut Connection) -> ResumptionToken { + server.send_ticket(now(), &[]).expect("can send ticket"); + let out = server.process_output(now()); + assert!(out.as_dgram_ref().is_some()); + client.process_input(out.dgram().unwrap(), now()); + // We do not have a token so we need to wait for a resumption token timer to trigger. + client.process_output(now() + Duration::from_millis(250)); + assert_eq!(client.state(), Http3State::Connected); + client + .events() + .find_map(|e| { + if let Http3ClientEvent::ResumptionToken(token) = e { + Some(token) + } else { + None + } + }) + .unwrap() + } + + fn start_with_0rtt() -> (Http3Client, TestServer) { + let (mut client, mut server) = connect(); + let token = exchange_token(&mut client, &mut server.conn); + + let mut client = default_http3_client(); + + let server = TestServer::new(); + + assert_eq!(client.state(), Http3State::Initializing); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + + assert_eq!(client.state(), Http3State::ZeroRtt); + let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); + assert!(client.events().any(zerortt_event)); + + (client, server) + } + + #[test] + fn zero_rtt_negotiated() { + let (mut client, mut server) = start_with_0rtt(); + + let out = client.process(None, now()); + + assert_eq!(client.state(), Http3State::ZeroRtt); + assert_eq!(*server.conn.state(), State::Init); + let out = server.conn.process(out.dgram(), now()); + + // Check that control and qpack streams are received and a + // SETTINGS frame has been received. + // Also qpack encoder stream will send "change capacity" instruction because it has + // the peer settings already. + server.check_control_qpack_request_streams_resumption( + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + EXPECTED_REQUEST_HEADER_FRAME, + false, + ); + + assert_eq!(*server.conn.state(), State::Handshaking); + let out = client.process(out.dgram(), now()); + assert_eq!(client.state(), Http3State::Connected); + + mem::drop(server.conn.process(out.dgram(), now())); + assert!(server.conn.state().connected()); + + assert!(client.tls_info().unwrap().resumed()); + assert!(server.conn.tls_info().unwrap().resumed()); + } + + #[test] + fn zero_rtt_send_request() { + let (mut client, mut server) = start_with_0rtt(); + + let request_stream_id = + make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]); + assert_eq!(request_stream_id, 0); + + let out = client.process(None, now()); + + assert_eq!(client.state(), Http3State::ZeroRtt); + assert_eq!(*server.conn.state(), State::Init); + let out = server.conn.process(out.dgram(), now()); + + // Check that control and qpack streams are received and a + // SETTINGS frame has been received. + // Also qpack encoder stream will send "change capacity" instruction because it has + // the peer settings already. + server.check_control_qpack_request_streams_resumption( + ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST, + EXPECTED_REQUEST_HEADER_FRAME_VERSION2, + true, + ); + + assert_eq!(*server.conn.state(), State::Handshaking); + let out = client.process(out.dgram(), now()); + assert_eq!(client.state(), Http3State::Connected); + let out = server.conn.process(out.dgram(), now()); + assert!(server.conn.state().connected()); + let out = client.process(out.dgram(), now()); + assert!(out.as_dgram_ref().is_none()); + + // After the server has been connected, send a response. + let res = server.conn.stream_send(request_stream_id, HTTP_RESPONSE_2); + assert_eq!(res, Ok(HTTP_RESPONSE_2.len())); + server.conn.stream_close_send(request_stream_id).unwrap(); + + read_response(&mut client, &mut server.conn, request_stream_id); + + assert!(client.tls_info().unwrap().resumed()); + assert!(server.conn.tls_info().unwrap().resumed()); + } + + #[test] + fn zero_rtt_before_resumption_token() { + let mut client = default_http3_client(); + assert!(client + .fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default() + ) + .is_err()); + } + + #[test] + fn zero_rtt_send_reject() { + let (mut client, mut server) = connect(); + let token = exchange_token(&mut client, &mut server.conn); + + let mut client = default_http3_client(); + let mut server = Connection::new_server( + test_fixture::DEFAULT_KEYS, + test_fixture::DEFAULT_ALPN_H3, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + ConnectionParameters::default(), + ) + .unwrap(); + // Using a freshly initialized anti-replay context + // should result in the server rejecting 0-RTT. + let ar = AntiReplay::new(now(), test_fixture::ANTI_REPLAY_WINDOW, 1, 3) + .expect("setup anti-replay"); + server + .server_enable_0rtt(&ar, AllowZeroRtt {}) + .expect("enable 0-RTT"); + + assert_eq!(client.state(), Http3State::Initializing); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); + assert!(client.events().any(zerortt_event)); + + // Send ClientHello. + let client_hs = client.process(None, now()); + assert!(client_hs.as_dgram_ref().is_some()); + + // Create a request + let request_stream_id = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id, 0); + + let client_0rtt = client.process(None, now()); + assert!(client_0rtt.as_dgram_ref().is_some()); + + let server_hs = server.process(client_hs.dgram(), now()); + assert!(server_hs.as_dgram_ref().is_some()); // Should produce ServerHello etc... + let server_ignored = server.process(client_0rtt.dgram(), now()); + assert!(server_ignored.as_dgram_ref().is_none()); + + // The server shouldn't receive that 0-RTT data. + let recvd_stream_evt = |e| matches!(e, ConnectionEvent::NewStream { .. }); + assert!(!server.events().any(recvd_stream_evt)); + + // Client should get a rejection. + let client_out = client.process(server_hs.dgram(), now()); + assert!(client_out.as_dgram_ref().is_some()); + let recvd_0rtt_reject = |e| e == Http3ClientEvent::ZeroRttRejected; + assert!(client.events().any(recvd_0rtt_reject)); + + // ...and the client stream should be gone. + let res = client.stream_close_send(request_stream_id); + assert!(res.is_err()); + assert_eq!(res.unwrap_err(), Error::InvalidStreamId); + + // Client will send Setting frame and open new qpack streams. + mem::drop(server.process(client_out.dgram(), now())); + TestServer::new_with_conn(server).check_client_control_qpack_streams_no_resumption(); + + // Check that we can send a request and that the stream_id starts again from 0. + assert_eq!(make_request(&mut client, false, &[]), 0); + } + + // Connect to a server, get token and reconnect using 0-rtt. Seerver sends new Settings. + fn zero_rtt_change_settings( + original_settings: &[HSetting], + resumption_settings: &[HSetting], + expected_client_state: &Http3State, + expected_encoder_stream_data: &[u8], + ) { + let mut client = default_http3_client(); + let mut server = TestServer::new_with_settings(original_settings); + // Connect and get a token + connect_with(&mut client, &mut server); + let token = exchange_token(&mut client, &mut server.conn); + + let mut client = default_http3_client(); + let mut server = TestServer::new_with_settings(resumption_settings); + assert_eq!(client.state(), Http3State::Initializing); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + assert_eq!(client.state(), Http3State::ZeroRtt); + let out = client.process(None, now()); + + assert_eq!(client.state(), Http3State::ZeroRtt); + assert_eq!(*server.conn.state(), State::Init); + let out = server.conn.process(out.dgram(), now()); + + // Check that control and qpack streams anda SETTINGS frame are received. + // Also qpack encoder stream will send "change capacity" instruction because it has + // the peer settings already. + server.check_control_qpack_request_streams_resumption( + expected_encoder_stream_data, + EXPECTED_REQUEST_HEADER_FRAME, + false, + ); + + assert_eq!(*server.conn.state(), State::Handshaking); + let out = client.process(out.dgram(), now()); + assert_eq!(client.state(), Http3State::Connected); + + mem::drop(server.conn.process(out.dgram(), now())); + assert!(server.conn.state().connected()); + + assert!(client.tls_info().unwrap().resumed()); + assert!(server.conn.tls_info().unwrap().resumed()); + + // Send new settings. + let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); + let mut enc = Encoder::default(); + server.settings.encode(&mut enc); + let mut sent = server.conn.stream_send(control_stream, CONTROL_STREAM_TYPE); + assert_eq!(sent.unwrap(), CONTROL_STREAM_TYPE.len()); + sent = server.conn.stream_send(control_stream, enc.as_ref()); + assert_eq!(sent.unwrap(), enc.len()); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + assert_eq!(&client.state(), expected_client_state); + assert!(server.conn.state().connected()); + } + + #[test] + fn zero_rtt_new_server_setting_are_the_same() { + // Send a new server settings that are the same as the old one. + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_omit_max_table() { + // Send a new server settings without MaxTableCapacity + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_omit_blocked_streams() { + // Send a new server settings without BlockedStreams + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_omit_header_list_size() { + // Send a new server settings without MaxHeaderListSize + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_max_table_size_bigger() { + // Send a new server settings MaxTableCapacity=200 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 200), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(514)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_max_table_size_smaller() { + // Send a new server settings MaxTableCapacity=50 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 50), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_blocked_streams_bigger() { + // Send a new server settings withBlockedStreams=200 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 200), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_blocked_streams_smaller() { + // Send a new server settings withBlockedStreams=50 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 50), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_max_header_size_bigger() { + // Send a new server settings with MaxHeaderListSize=20000 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 20000), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_new_server_setting_max_headers_size_smaller() { + // Send the new server settings with MaxHeaderListSize=5000 + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 5000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_max_table_size_first_omitted() { + // send server original settings without MaxTableCapacity + // send new server setting with MaxTableCapacity + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA, + ); + } + + #[test] + fn zero_rtt_blocked_streams_first_omitted() { + // Send server original settings without BlockedStreams + // Send the new server settings with BlockedStreams + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Connected, + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn zero_rtt_max_header_size_first_omitted() { + // Send server settings without MaxHeaderListSize + // Send new settings with MaxHeaderListSize. + zero_rtt_change_settings( + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 10000), + ], + &[ + HSetting::new(HSettingType::MaxTableCapacity, 100), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ], + &Http3State::Closing(ConnectionError::Application(265)), + ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, + ); + } + + #[test] + fn test_trailers_with_fin_after_headers() { + // Make a new connection. + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send HEADER frame. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + false, + ); + + // Check response headers. + let mut response_headers = false; + while let Some(e) = client.next_event() { + if let Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } = e + { + assert_eq!(stream_id, request_stream_id); + check_response_header_0(&headers); + assert!(!fin); + assert!(!interim); + response_headers = true; + } + } + assert!(response_headers); + + // Send trailers + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + true, + ); + + let events: Vec<Http3ClientEvent> = client.events().collect(); + + // We already had HeaderReady + let header_ready: fn(&Http3ClientEvent) -> _ = + |e| matches!(*e, Http3ClientEvent::HeaderReady { .. }); + assert!(!events.iter().any(header_ready)); + + // Check that we have a DataReady event. Reading from the stream will return fin=true. + let data_readable: fn(&Http3ClientEvent) -> _ = + |e| matches!(*e, Http3ClientEvent::DataReadable { .. }); + assert!(events.iter().any(data_readable)); + let mut buf = [0_u8; 100]; + let (len, fin) = client + .read_data(now(), request_stream_id, &mut buf) + .unwrap(); + assert_eq!(0, len); + assert!(fin); + } + + #[test] + fn test_trailers_with_later_fin_after_headers() { + // Make a new connection. + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send HEADER frame. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + false, + ); + + // Check response headers. + let mut response_headers = false; + while let Some(e) = client.next_event() { + if let Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } = e + { + assert_eq!(stream_id, request_stream_id); + check_response_header_0(&headers); + assert!(!fin); + assert!(!interim); + response_headers = true; + } + } + assert!(response_headers); + + // Send trailers + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + false, + ); + + // Check that we do not have a DataReady event. + let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); + assert!(!client.events().any(data_readable)); + + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + let events: Vec<Http3ClientEvent> = client.events().collect(); + + // We already had HeaderReady + let header_ready: fn(&Http3ClientEvent) -> _ = + |e| matches!(*e, Http3ClientEvent::HeaderReady { .. }); + assert!(!events.iter().any(header_ready)); + + // Check that we have a DataReady event. Reading from the stream will return fin=true. + let data_readable_fn: fn(&Http3ClientEvent) -> _ = + |e| matches!(*e, Http3ClientEvent::DataReadable { .. }); + assert!(events.iter().any(data_readable_fn)); + let mut buf = [0_u8; 100]; + let (len, fin) = client + .read_data(now(), request_stream_id, &mut buf) + .unwrap(); + assert_eq!(0, len); + assert!(fin); + } + + #[test] + fn test_data_after_trailers_after_headers() { + // Make a new connection. + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send HEADER frame. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + false, + ); + + // Check response headers. + let mut response_headers = false; + while let Some(e) = client.next_event() { + if let Http3ClientEvent::HeaderReady { + stream_id, + headers, + interim, + fin, + } = e + { + assert_eq!(stream_id, request_stream_id); + check_response_header_0(&headers); + assert!(!fin); + assert!(!interim); + response_headers = true; + } + } + assert!(response_headers); + + // Send trailers + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_HEADER_FRAME_0, + false, + ); + + // Check that we do not have a DataReady event. + let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); + assert!(!client.events().any(data_readable)); + + // Send Data frame. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + [0x0, 0x3, 0x61, 0x62, 0x63], // a data frame + false, + ); + + assert_closed(&client, &Error::HttpFrameUnexpected); + } + + #[test] + fn transport_stream_readable_event_after_all_data() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(false); + + // Send headers. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + false, + ); + + // Send an empty data frame and a fin + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + [0x0, 0x0], + true, + ); + + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Ok((3, true)) + ); + + client.process(None, now()); + } + + #[test] + fn no_data_ready_events_after_fin() { + // Connect exchange headers and send a request. Also check if the correct header frame has been sent. + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // send response - 200 Content-Length: 7 + // with content: 'abcdefg'. + // The content will be send in 2 DATA frames. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + true, + ); + + let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id); + assert!(client.events().any(data_readable_event)); + + let mut buf = [0_u8; 100]; + assert_eq!( + (EXPECTED_RESPONSE_DATA_1.len(), true), + client + .read_data(now(), request_stream_id, &mut buf) + .unwrap() + ); + + assert!(!client.events().any(data_readable_event)); + } + + #[test] + fn reading_small_chunks_of_data() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // send response - 200 Content-Length: 7 + // with content: 'abcdefg'. + // The content will be send in 2 DATA frames. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + true, + ); + + let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id); + assert!(client.events().any(data_readable_event)); + + let mut buf1 = [0_u8; 1]; + assert_eq!( + (1, false), + client + .read_data(now(), request_stream_id, &mut buf1) + .unwrap() + ); + assert!(!client.events().any(data_readable_event)); + + // Now read only until the end of the first frame. The firs framee has 3 bytes. + let mut buf2 = [0_u8; 2]; + assert_eq!( + (2, false), + client + .read_data(now(), request_stream_id, &mut buf2) + .unwrap() + ); + assert!(!client.events().any(data_readable_event)); + + // Read a half of the second frame. + assert_eq!( + (2, false), + client + .read_data(now(), request_stream_id, &mut buf2) + .unwrap() + ); + assert!(!client.events().any(data_readable_event)); + + // Read the rest. + // Read a half of the second frame. + assert_eq!( + (2, true), + client + .read_data(now(), request_stream_id, &mut buf2) + .unwrap() + ); + assert!(!client.events().any(data_readable_event)); + } + + #[test] + fn zero_length_data_at_end() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // send response - 200 Content-Length: 7 + // with content: 'abcdefg'. + // The content will be send in 2 DATA frames. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + false, + ); + // Send a zero-length frame at the end of the stream. + let _ = server.conn.stream_send(request_stream_id, &[0, 0]).unwrap(); + server.conn.stream_close_send(request_stream_id).unwrap(); + let dgram = server.conn.process_output(now()).dgram(); + client.process_input(dgram.unwrap(), now()); + + let data_readable_event = |e: &_| matches!(e, Http3ClientEvent::DataReadable { stream_id } if *stream_id == request_stream_id); + assert_eq!(client.events().filter(data_readable_event).count(), 1); + + let mut buf = [0_u8; 10]; + assert_eq!( + (7, true), + client + .read_data(now(), request_stream_id, &mut buf) + .unwrap() + ); + assert!(!client.events().any(|e| data_readable_event(&e))); + } + + #[test] + fn stream_blocked_no_remote_encoder_stream() { + let (mut client, mut server) = connect_only_transport(); + + send_and_receive_client_settings(&mut client, &mut server); + + server.create_control_stream(); + // Send the server's control stream data. + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + server.create_qpack_streams(); + let qpack_pkt1 = server.conn.process(None, now()); + // delay delivery of this packet. + + let request_stream_id = make_request(&mut client, true, &[]); + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions, + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + + // Send response + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { len: 3 }; + d_frame.encode(&mut d); + d.encode(&[0x61, 0x62, 0x63]); + let _ = server + .conn + .stream_send(request_stream_id, d.as_ref()) + .unwrap(); + server.conn.stream_close_send(request_stream_id).unwrap(); + + let out = server.conn.process(None, now()); + mem::drop(client.process(out.dgram(), now())); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!client.events().any(header_ready_event)); + + // Let client receive the encoder instructions. + mem::drop(client.process(qpack_pkt1.dgram(), now())); + + assert!(client.events().any(header_ready_event)); + } + + // Client: receive a push stream + #[test] + fn push_single() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, 0); + + // create a push stream. + let _ = send_push_data(&mut server.conn, 0, true); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }], + &[0], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + } + + /// We can't keep the connection alive on the basis of a push promise, + /// nor do we want to if the push promise is not interesting to the client. + /// We do the next best thing, which is keep any push stream alive if the + /// client reads from it. + #[test] + fn push_keep_alive() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + let idle_timeout = ConnectionParameters::default().get_idle_timeout(); + + // Promise a push and deliver, but don't close the stream. + send_push_promise(&mut server.conn, request_stream_id, 0); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }], + &[], // No push streams yet. + request_stream_id, + ); + + // The client will become idle here. + force_idle(&mut client, &mut server); + assert_eq!(client.process_output(now()).callback(), idle_timeout); + + // Reading push data will stop the client from being idle. + let _ = send_push_data(&mut server.conn, 0, false); + let dgram = server.conn.process_output(now()).dgram(); + client.process_input(dgram.unwrap(), now()); + + let mut buf = [0; 16]; + let (read, fin) = client.push_read_data(now(), 0, &mut buf).unwrap(); + assert!(read < buf.len()); + assert!(!fin); + + force_idle(&mut client, &mut server); + assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); + } + + #[test] + fn push_multiple() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, 1); + + // create a push stream. + let _ = send_push_data(&mut server.conn, 0, true); + + // create a second push stream. + let _ = send_push_data(&mut server.conn, 1, true); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 1, + ref_stream_id: request_stream_id, + }, + ], + &[0, 1], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + assert_eq!(client.cancel_push(1), Err(Error::InvalidStreamId)); + } + + #[test] + fn push_after_headers() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send response headers + let _ = server + .conn + .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) + .unwrap(); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, 0); + + // create a push stream. + let _ = send_push_data(&mut server.conn, 0, true); + + // Send response data + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_DATA_FRAME_ONLY_2, + true, + ); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }], + &[0], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + #[test] + fn push_after_response() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send response headers and data frames + let _ = server + .conn + .stream_send(request_stream_id, HTTP_RESPONSE_2) + .unwrap(); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, 0); + // create a push stream. + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }], + &[0], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + fn check_push_events(client: &mut Http3Client) -> bool { + let any_push_event = |e| { + matches!( + e, + Http3ClientEvent::PushPromise { .. } + | Http3ClientEvent::PushHeaderReady { .. } + | Http3ClientEvent::PushDataReadable { .. } + ) + }; + client.events().any(any_push_event) + } + + fn check_data_readable(client: &mut Http3Client) -> bool { + let any_data_event = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); + client.events().any(any_data_event) + } + + fn check_header_ready(client: &mut Http3Client) -> bool { + let any_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + client.events().any(any_event) + } + + fn check_header_ready_and_push_promise(client: &mut Http3Client) -> bool { + let any_event = |e| { + matches!( + e, + Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::PushPromise { .. } + ) + }; + client.events().any(any_event) + } + + #[test] + fn push_stream_before_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // create a push stream. + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Now send push_promise + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }], + &[0], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test receiving pushes out of order. + // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state. + // Start push_id 3 by receiving a push_promise and then a push stream with the push_id 3. + #[test] + fn push_out_of_order_1() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); + // Start a push stream with push_id 3. + send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); + + assert_eq!(client.state(), Http3State::Connected); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 3, + ref_stream_id: request_stream_id, + }, + ], + &[3], + request_stream_id, + ); + assert_eq!(client.state(), Http3State::Connected); + } + + // Test receiving pushes out of order. + // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state. + // Start push_id 3 by receiving a push stream with push_id 3 and then a push_promise. + #[test] + fn push_out_of_order_2() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + + send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 3, + ref_stream_id: request_stream_id, + }, + ], + &[3], + request_stream_id, + ); + assert_eq!(client.state(), Http3State::Connected); + } + + // Test receiving pushes out of order. + // Push_id 5 is received first and read so that it is removed from the list, + // therefore Push_id 3 will be in the PushState:Init state. + // Start push_id 3 by receiving a push stream with the push_id 3 and then a push_promise. + #[test] + fn push_out_of_order_3() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + assert_eq!(client.state(), Http3State::Connected); + + // Read push stream with push_id 5 to make it change to closed state. + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }], + &[5], + request_stream_id, + ); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); + send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 3, + ref_stream_id: request_stream_id, + }], + &[3], + request_stream_id, + ); + assert_eq!(client.state(), Http3State::Connected); + } + + // The next test is for receiving a second PushPromise when Push is in the PushPromise state. + #[test] + fn multiple_push_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + + // make a second request. + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id_2, + }, + ], + &[], + request_stream_id, + ); + assert_eq!(client.state(), Http3State::Connected); + } + + // The next test is for receiving a second PushPromise when Push is in the Active state. + #[test] + fn multiple_push_promise_active() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + + // make a second request. + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id_2, + }, + ], + &[5], + request_stream_id, + ); + assert_eq!(client.state(), Http3State::Connected); + } + + // The next test is for receiving a second PushPromise when the push is already closed. + // PushPromise will be ignored for the push streams that are consumed. + #[test] + fn multiple_push_promise_closed() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); + // Start a push stream with push_id 5. + send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 5, + ref_stream_id: request_stream_id, + }], + &[5], + request_stream_id, + ); + + // make a second request. + let request_stream_id_2 = make_request(&mut client, false, &[]); + assert_eq!(request_stream_id_2, 4); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); + + // Check that we do not have a Http3ClientEvent::PushPromise. + let push_event = |e| matches!(e, Http3ClientEvent::PushPromise { .. }); + assert!(!client.events().any(push_event)); + } + + // Test that max_push_id is enforced when a push promise frame is received. + #[test] + fn exceed_max_push_id_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send a push promise. max_push_id is set to 5, to trigger an error we send push_id=6. + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 6); + + assert_closed(&client, &Error::HttpId); + } + + // Test that max_push_id is enforced when a push stream is received. + #[test] + fn exceed_max_push_id_push_stream() { + // Connect and send a request + let (mut client, mut server) = connect(); + + // Send a push stream. max_push_id is set to 5, to trigger an error we send push_id=6. + send_push_data_and_exchange_packets(&mut client, &mut server, 6, true); + + assert_closed(&client, &Error::HttpId); + } + + // Test that max_push_id is enforced when a cancel push frame is received. + #[test] + fn exceed_max_push_id_cancel_push() { + // Connect and send a request + let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); + + // Send CANCEL_PUSH for push_id 6. + send_cancel_push_and_exchange_packets(&mut client, &mut server, 6); + + assert_closed(&client, &Error::HttpId); + } + + // Test that max_push_id is enforced when an app calls cancel_push. + #[test] + fn exceed_max_push_id_cancel_api() { + // Connect and send a request + let (mut client, _, _) = connect_and_send_request(true); + + assert_eq!(client.cancel_push(6), Err(Error::HttpId)); + assert_eq!(client.state(), Http3State::Connected); + } + + #[test] + fn test_max_push_id_frame_update_is_sent() { + const MAX_PUSH_ID_FRAME: &[u8] = &[0xd, 0x1, 0x8]; + + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send 3 push promises. + send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_promise(&mut server.conn, request_stream_id, 1); + send_push_promise(&mut server.conn, request_stream_id, 2); + + // create 3 push streams. + send_push_data(&mut server.conn, 0, true); + send_push_data(&mut server.conn, 1, true); + send_push_data_and_exchange_packets(&mut client, &mut server, 2, true); + + read_response_and_push_events( + &mut client, + &[ + PushPromiseInfo { + push_id: 0, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 1, + ref_stream_id: request_stream_id, + }, + PushPromiseInfo { + push_id: 2, + ref_stream_id: request_stream_id, + }, + ], + &[0, 1, 2], + request_stream_id, + ); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + + // Check max_push_id frame has been received + let control_stream_readable = + |e| matches!(e, ConnectionEvent::RecvStreamReadable{stream_id: x} if x == 2); + assert!(server.conn.events().any(control_stream_readable)); + let mut buf = [0_u8; 100]; + let (amount, fin) = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap(); + assert!(!fin); + + assert_eq!(amount, MAX_PUSH_ID_FRAME.len()); + assert_eq!(&buf[..3], MAX_PUSH_ID_FRAME); + + // Check that we can send push_id=8 now + send_push_promise(&mut server.conn, request_stream_id, 8); + send_push_data(&mut server.conn, 8, true); + + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + + assert_eq!(client.state(), Http3State::Connected); + + read_response_and_push_events( + &mut client, + &[PushPromiseInfo { + push_id: 8, + ref_stream_id: request_stream_id, + }], + &[8], + request_stream_id, + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test that 2 push streams with the same push_id are caught. + #[test] + fn duplicate_push_stream() { + // Connect and send a request + let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); + + // Start a push stream with push_id 0. + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + + // Send it again + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + + assert_closed(&client, &Error::HttpId); + } + + // Test that 2 push streams with the same push_id are caught. + #[test] + fn duplicate_push_stream_active() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise(&mut server.conn, request_stream_id, 0); + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + // Now the push_stream is in the PushState::Active state + + send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); + + assert_closed(&client, &Error::HttpId); + } + + fn assert_stop_sending_event( + server: &mut TestServer, + push_stream_id: StreamId, + expected_error: u64, + ) { + assert!(server.conn.events().any(|e| matches!( + e, + ConnectionEvent::SendStreamStopSending { + stream_id, + app_error, + } if stream_id == push_stream_id && app_error == expected_error + ))); + } + + // Test CANCEL_PUSH frame: after cancel push any new PUSH_PROMISE or push stream will be ignored. + #[test] + fn cancel_push_ignore_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + + send_push_promise(&mut server.conn, request_stream_id, 0); + // Start a push stream with push_id 0. + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + // Check that the push has been canceled by the client. + assert_stop_sending_event( + &mut server, + push_stream_id, + Error::HttpRequestCancelled.code(), + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test CANCEL_PUSH frame: after cancel push any already received PUSH_PROMISE or push stream + // events will be removed. + #[test] + fn cancel_push_removes_push_events() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise(&mut server.conn, request_stream_id, 0); + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + // Check that the push has been canceled by the client. + assert_stop_sending_event( + &mut server, + push_stream_id, + Error::HttpRequestCancelled.code(), + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test CANCEL_PUSH frame: after cancel push any already received push stream will be canceled. + #[test] + fn cancel_push_frame_after_push_stream() { + // Connect and send a request + let (mut client, mut server, _) = connect_and_send_request(true); + + // Start a push stream with push_id 0. + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + // Check that the push has been canceled by the client. + assert_stop_sending_event( + &mut server, + push_stream_id, + Error::HttpRequestCancelled.code(), + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test a push stream reset after a new PUSH_PROMISE or/and push stream. The events will be ignored. + #[test] + fn cancel_push_stream_after_push_promise_and_push_stream() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise(&mut server.conn, request_stream_id, 0); + // Start a push stream with push_id 0. + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + server + .conn + .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + let out = server.conn.process(None, now()).dgram(); + client.process(out, now()); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test that a PUSH_PROMISE will be ignored after a push stream reset. + #[test] + fn cancel_push_stream_before_push_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Start a push stream with push_id 0. + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + server + .conn + .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + let out = server.conn.process(None, now()).dgram(); + client.process(out, now()); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test that push_promise events will be removed after application calls cancel_push. + #[test] + fn app_cancel_push_after_push_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + + assert!(client.cancel_push(0).is_ok()); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test that push_promise and push data events will be removed after application calls cancel_push. + #[test] + fn app_cancel_push_after_push_promise_and_push_stream() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + assert!(client.cancel_push(0).is_ok()); + let out = client.process(None, now()).dgram(); + mem::drop(server.conn.process(out, now())); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + // Check that the push has been canceled by the client. + assert_stop_sending_event( + &mut server, + push_stream_id, + Error::HttpRequestCancelled.code(), + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + // Test that push_promise events will be ignored after application calls cancel_push. + #[test] + fn app_cancel_push_before_push_promise() { + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + let push_stream_id = + send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); + + assert!(client.cancel_push(0).is_ok()); + let out = client.process(None, now()).dgram(); + mem::drop(server.conn.process(out, now())); + + send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); + + // Assert that we do not have any push event. + assert!(!check_push_events(&mut client)); + + // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. + assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); + + // Check that the push has been canceled by the client. + assert_stop_sending_event( + &mut server, + push_stream_id, + Error::HttpRequestCancelled.code(), + ); + + assert_eq!(client.state(), Http3State::Connected); + } + + fn setup_server_side_encoder_param( + client: &mut Http3Client, + server: &mut TestServer, + max_blocked_streams: u64, + ) { + server + .encoder + .borrow_mut() + .set_max_capacity(max_blocked_streams) + .unwrap(); + server + .encoder + .borrow_mut() + .set_max_blocked_streams(100) + .unwrap(); + server + .encoder + .borrow_mut() + .send_encoder_updates(&mut server.conn) + .unwrap(); + let out = server.conn.process(None, now()); + mem::drop(client.process(out.dgram(), now())); + } + + fn setup_server_side_encoder(client: &mut Http3Client, server: &mut TestServer) { + setup_server_side_encoder_param(client, server, 100); + } + + fn send_push_promise_using_encoder( + client: &mut Http3Client, + server: &mut TestServer, + stream_id: StreamId, + push_id: u64, + ) -> Option<Datagram> { + send_push_promise_using_encoder_with_custom_headers( + client, + server, + stream_id, + push_id, + Header::new("my-header", "my-value"), + ) + } + + fn send_push_promise_using_encoder_with_custom_headers( + client: &mut Http3Client, + server: &mut TestServer, + stream_id: StreamId, + push_id: u64, + additional_header: Header, + ) -> Option<Datagram> { + let mut headers = vec![ + Header::new(":method", "GET"), + Header::new(":scheme", "https"), + Header::new(":authority", "something.com"), + Header::new(":path", "/"), + Header::new("content-length", "3"), + ]; + headers.push(additional_header); + + let encoded_headers = + server + .encoder + .borrow_mut() + .encode_header_block(&mut server.conn, &headers, stream_id); + let push_promise_frame = HFrame::PushPromise { + push_id, + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. + let encoder_inst_pkt = server.conn.process(None, now()).dgram(); + assert!(encoder_inst_pkt.is_some()); + + let mut d = Encoder::default(); + push_promise_frame.encode(&mut d); + server_send_response_and_exchange_packet(client, server, stream_id, &d, false); + + encoder_inst_pkt + } + + #[test] + fn push_promise_header_decoder_block() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let encoder_inst_pkt = + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt, now()); + + // PushPromise is blocked wathing for encoder instructions. + assert!(check_push_events(&mut client)); + } + + // If PushPromise is blocked, stream data can still be received. + #[test] + fn push_promise_blocked_but_stream_is_not_blocked() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + // Send response headers + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_HEADER_ONLY_1, + false, + ); + + let encoder_inst_pkt = + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + // Stream data can be still read + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_DATA_FRAME_1_ONLY_1, + false, + ); + + assert!(check_data_readable(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt, now()); + + // PushPromise is blocked wathing for encoder instructions. + assert!(check_push_events(&mut client)); + + // Stream data can be still read + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_DATA_FRAME_2_ONLY_1, + false, + ); + + assert!(check_data_readable(&mut client)); + } + + // The response Headers are not block if they do not refer the dynamic table. + #[test] + fn push_promise_does_not_block_headers() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let encoder_inst_pkt = + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + // Send response headers + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_HEADER_ONLY_1, + false, + ); + + assert!(check_header_ready(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt, now()); + + // PushPromise is blocked wathing for encoder instructions. + assert!(check_push_events(&mut client)); + } + + // The response Headers are blocked if they refer a dynamic table entry. + #[test] + fn push_promise_block_headers() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + // Insert an elemet into a dynamic table. + // insert "content-length: 1234 + server + .encoder + .borrow_mut() + .send_and_insert(&mut server.conn, b"content-length", b"1234") + .unwrap(); + let encoder_inst_pkt1 = server.conn.process(None, now()).dgram(); + let _out = client.process(encoder_inst_pkt1, now()); + + // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client. + let encoder_inst_pkt2 = + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + let response_headers = vec![ + Header::new(":status", "200"), + Header::new("content-length", "1234"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &response_headers, + request_stream_id, + ); + let header_hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + let mut d = Encoder::default(); + header_hframe.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // The response headers are blocked. + assert!(!check_header_ready(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt2, now()); + + // The response headers are blocked. + assert!(check_header_ready_and_push_promise(&mut client)); + } + + // In this test there are 2 push promises that are blocked and the response header is + // blocked as well. After a packet is received only the first push promises is unblocked. + #[test] + fn two_push_promises_and_header_block() { + let mut client = default_http3_client_param(200); + let mut server = TestServer::new_with_settings(&[ + HSetting::new(HSettingType::MaxTableCapacity, 200), + HSetting::new(HSettingType::BlockedStreams, 100), + HSetting::new(HSettingType::MaxHeaderListSize, 10000), + ]); + connect_only_transport_with(&mut client, &mut server); + server.create_control_stream(); + server.create_qpack_streams(); + setup_server_side_encoder_param(&mut client, &mut server, 200); + + let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, true); + + // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client. + let encoder_inst_pkt1 = send_push_promise_using_encoder_with_custom_headers( + &mut client, + &mut server, + request_stream_id, + 0, + Header::new("myn1", "myv1"), + ); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + let encoder_inst_pkt2 = send_push_promise_using_encoder_with_custom_headers( + &mut client, + &mut server, + request_stream_id, + 1, + Header::new("myn2", "myv2"), + ); + + // PushPromise is blocked wathing for encoder instructions. + assert!(!check_push_events(&mut client)); + + let response_headers = vec![ + Header::new(":status", "200"), + Header::new("content-length", "1234"), + Header::new("myn3", "myv3"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &response_headers, + request_stream_id, + ); + let header_hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + let mut d = Encoder::default(); + header_hframe.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // The response headers are blocked. + assert!(!check_header_ready(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt1, now()); + + assert!(check_push_events(&mut client)); + + // Let client receive the encoder instructions. + let _out = client.process(encoder_inst_pkt2, now()); + + assert!(check_header_ready_and_push_promise(&mut client)); + } + + // The PushPromise blocked on header decoding will be canceled if the stream is closed. + #[test] + fn blocked_push_promises_canceled() { + const STREAM_CANCELED_ID_0: &[u8] = &[0x40]; + + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + mem::drop( + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) + .unwrap(), + ); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_1, + true, + ); + + // Read response that will make stream change to closed state. + assert!(check_header_ready(&mut client)); + let mut buf = [0_u8; 100]; + let _ = client + .read_data(now(), request_stream_id, &mut buf) + .unwrap(); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + // Check that encoder got stream_canceled instruction. + let mut inst = [0_u8; 100]; + let (amount, fin) = server + .conn + .stream_recv(CLIENT_SIDE_DECODER_STREAM_ID, &mut inst) + .unwrap(); + assert!(!fin); + assert_eq!(amount, STREAM_CANCELED_ID_0.len()); + assert_eq!(&inst[..amount], STREAM_CANCELED_ID_0); + } + + #[test] + fn data_readable_in_decoder_blocked_state() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "0"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Delay encoder instruction so that the stream will be blocked. + let encoder_insts = server.conn.process(None, now()); + + // Send response headers. + let mut d = Encoder::default(); + hframe.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Headers are blocked waiting fro the encoder instructions. + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!client.events().any(header_ready_event)); + + // Now send data frame. This will trigger DataRead event. + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { len: 0 }; + d_frame.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + true, + ); + + // Now read headers. + mem::drop(client.process(encoder_insts.dgram(), now())); + } + + #[test] + fn qpack_stream_reset() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + setup_server_side_encoder(&mut client, &mut server); + // Cancel request. + mem::drop(client.cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn)); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); + } + + fn send_headers_using_encoder( + client: &mut Http3Client, + server: &mut TestServer, + request_stream_id: StreamId, + headers: &[Header], + data: &[u8], + ) -> Option<Datagram> { + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + let out = server.conn.process(None, now()); + + // Send response + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { + len: u64::try_from(data.len()).unwrap(), + }; + d_frame.encode(&mut d); + d.encode(data); + server_send_response_and_exchange_packet(client, server, request_stream_id, &d, true); + + out.dgram() + } + + #[test] + fn qpack_stream_reset_recv() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + setup_server_side_encoder(&mut client, &mut server); + + // Cancel request. + server + .conn + .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + let out = server.conn.process(None, now()); + let out = client.process(out.dgram(), now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn)); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); + } + + #[test] + fn qpack_stream_reset_during_header_qpack_blocked() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + mem::drop( + send_headers_using_encoder( + &mut client, + &mut server, + request_stream_id, + &[ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ], + &[0x61, 0x62, 0x63], + ) + .unwrap(), + ); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!client.events().any(header_ready_event)); + + // Cancel request. + client + .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); + } + + #[test] + fn qpack_no_stream_cancelled_after_fin() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let encoder_instruct = send_headers_using_encoder( + &mut client, + &mut server, + request_stream_id, + &[ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ], + &[], + ); + + // Exchange encoder instructions + mem::drop(client.process(encoder_instruct, now())); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(client.events().any(header_ready_event)); + // After this the recv_stream is in ClosePending state + + // Cancel request. + client + .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + } + + #[test] + fn qpack_stream_reset_push_promise_header_decoder_block() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new("content-length", "3"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions. + let out = server.conn.process(None, now()); + mem::drop(client.process(out.dgram(), now())); + + // Send PushPromise that will be blocked waiting for decoder instructions. + mem::drop( + send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) + .unwrap(), + ); + + // Send response + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { len: 0 }; + d_frame.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + true, + ); + + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(client.events().any(header_ready_event)); + + // Cancel request. + client + .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); + } + + #[test] + fn qpack_stream_reset_dynamic_table_zero() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + // Cancel request. + client + .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) + .unwrap(); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + let out = client.process(None, now()); + mem::drop(server.conn.process(out.dgram(), now())); + mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); + assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); + } + + #[test] + fn multiple_streams_in_decoder_blocked_state() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "0"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Delay encoder instruction so that the stream will be blocked. + let encoder_insts = server.conn.process(None, now()); + + // Send response headers. + let mut d = Encoder::default(); + hframe.encode(&mut d); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + true, + ); + + // Headers are blocked waiting for the encoder instructions. + let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); + assert!(!client.events().any(header_ready_event)); + + // Make another request. + let request2 = make_request_and_exchange_pkts(&mut client, &mut server, true); + // Send response headers. + server_send_response_and_exchange_packet(&mut client, &mut server, request2, &d, true); + + // Headers on the second request are blocked as well are blocked + // waiting for the encoder instructions. + assert!(!client.events().any(header_ready_event)); + + // Now make the encoder instructions available. + mem::drop(client.process(encoder_insts.dgram(), now())); + + // Header blocks for both streams should be ready. + let mut count_responses = 0; + while let Some(e) = client.next_event() { + if let Http3ClientEvent::HeaderReady { stream_id, .. } = e { + assert!((stream_id == request_stream_id) || (stream_id == request2)); + count_responses += 1; + } + } + assert_eq!(count_responses, 2); + } + + #[test] + fn reserved_frames() { + for f in H3_RESERVED_FRAME_TYPES { + let mut enc = Encoder::default(); + enc.encode_varint(*f); + test_wrong_frame_on_control_stream(enc.as_ref()); + test_wrong_frame_on_push_stream(enc.as_ref()); + test_wrong_frame_on_request_stream(enc.as_ref()); + } + } + + #[test] + fn send_reserved_settings() { + for s in H3_RESERVED_SETTINGS { + let (mut client, mut server) = connect_only_transport(); + let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); + // Send the control stream type(0x0). + let _ = server + .conn + .stream_send(control_stream, CONTROL_STREAM_TYPE) + .unwrap(); + // Create a settings frame of length 2. + let mut enc = Encoder::default(); + enc.encode_varint(H3_FRAME_TYPE_SETTINGS); + enc.encode_varint(2_u64); + // The settings frame contains a reserved settings type and some value (0x1). + enc.encode_varint(*s); + enc.encode_varint(1_u64); + let sent = server.conn.stream_send(control_stream, enc.as_ref()); + assert_eq!(sent, Ok(4)); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpSettings); + } + } + + #[test] + fn response_w_1xx() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let mut d = Encoder::default(); + let headers1xx: &[Header] = &[Header::new(":status", "103")]; + server.encode_headers(request_stream_id, headers1xx, &mut d); + + let headers200: &[Header] = &[ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + server.encode_headers(request_stream_id, headers200, &mut d); + + // Send 1xx and 200 headers response. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Sending response data. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_DATA_FRAME_ONLY_2, + true, + ); + + let mut events = client.events().filter_map(|e| { + if let Http3ClientEvent::HeaderReady { + stream_id, + interim, + headers, + .. + } = e + { + Some((stream_id, interim, headers)) + } else { + None + } + }); + let (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap(); + assert_eq!( + (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()), + (request_stream_id, true, headers1xx) + ); + + let (stream_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap(); + assert_eq!( + (stream_id_200_rec, interim200_rec, headers200_rec.as_ref()), + (request_stream_id, false, headers200) + ); + assert!(events.next().is_none()); + } + + #[test] + fn response_wo_status() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let mut d = Encoder::default(); + let headers = vec![ + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + server.encode_headers(request_stream_id, &headers, &mut d); + + // Send response + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Stream has been reset because of the malformed headers. + let e = client.events().next().unwrap(); + assert_eq!( + e, + Http3ClientEvent::Reset { + stream_id: request_stream_id, + error: Error::InvalidHeader.code(), + local: true, + } + ); + + let out = client.process(None, now()).dgram(); + mem::drop(server.conn.process(out, now())); + + // Check that server has received a reset. + let stop_sending_event = |e| { + matches!(e, ConnectionEvent::SendStreamStopSending { + stream_id, + app_error + } if stream_id == request_stream_id && app_error == Error::InvalidHeader.code()) + }; + assert!(server.conn.events().any(stop_sending_event)); + + // Stream should now be closed and gone + let mut buf = [0_u8; 100]; + assert_eq!( + client.read_data(now(), StreamId::new(0), &mut buf), + Err(Error::InvalidStreamId) + ); + } + + // Client: receive a push stream + #[test] + fn push_single_with_1xx() { + const FIRST_PUSH_ID: u64 = 0; + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID); + // Create a push stream + let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); + + let mut d = Encoder::default(); + let headers1xx: &[Header] = &[Header::new(":status", "100")]; + server.encode_headers(push_stream_id, headers1xx, &mut d); + + let headers200: &[Header] = &[ + Header::new(":status", "200"), + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + server.encode_headers(push_stream_id, headers200, &mut d); + + // create a push stream. + send_data_on_push( + &mut server.conn, + push_stream_id, + u8::try_from(FIRST_PUSH_ID).unwrap(), + &d, + true, + ); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + let mut events = client.events().filter_map(|e| { + if let Http3ClientEvent::PushHeaderReady { + push_id, + interim, + headers, + .. + } = e + { + Some((push_id, interim, headers)) + } else { + None + } + }); + + let (push_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap(); + assert_eq!( + (push_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()), + (FIRST_PUSH_ID, true, headers1xx) + ); + + let (push_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap(); + assert_eq!( + (push_id_200_rec, interim200_rec, headers200_rec.as_ref()), + (FIRST_PUSH_ID, false, headers200) + ); + assert!(events.next().is_none()); + } + + // Client: receive a push stream + #[test] + fn push_single_wo_status() { + const FIRST_PUSH_ID: u64 = 0; + // Connect and send a request + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + // Send a push promise. + send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID); + // Create a push stream + let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); + + let mut d = Encoder::default(); + let headers = vec![ + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + server.encode_headers(request_stream_id, &headers, &mut d); + + send_data_on_push( + &mut server.conn, + push_stream_id, + u8::try_from(FIRST_PUSH_ID).unwrap(), + &d, + false, + ); + + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + HTTP_RESPONSE_2, + true, + ); + + // Stream has been reset because of thei malformed headers. + let push_reset_event = |e| { + matches!(e, Http3ClientEvent::PushReset { + push_id, + error, + } if push_id == FIRST_PUSH_ID && error == Error::InvalidHeader.code()) + }; + + assert!(client.events().any(push_reset_event)); + + let out = client.process(None, now()).dgram(); + mem::drop(server.conn.process(out, now())); + + // Check that server has received a reset. + let stop_sending_event = |e| { + matches!(e, ConnectionEvent::SendStreamStopSending { + stream_id, + app_error + } if stream_id == push_stream_id && app_error == Error::InvalidHeader.code()) + }; + assert!(server.conn.events().any(stop_sending_event)); + } + + fn handshake_client_error(client: &mut Http3Client, server: &mut TestServer, error: &Error) { + let out = handshake_only(client, server); + client.process(out.dgram(), now()); + assert_closed(client, error); + } + + /// Client fails to create a control stream, since server does not allow it. + #[test] + fn client_control_stream_create_failed() { + let mut client = default_http3_client(); + let mut server = TestServer::new_with_conn(new_server( + DEFAULT_ALPN_H3, + ConnectionParameters::default().max_streams(StreamType::UniDi, 0), + )); + handshake_client_error(&mut client, &mut server, &Error::StreamLimitError); + } + + /// 2 streams isn't enough for control and QPACK streams. + #[test] + fn client_qpack_stream_create_failed() { + let mut client = default_http3_client(); + let mut server = TestServer::new_with_conn(new_server( + DEFAULT_ALPN_H3, + ConnectionParameters::default().max_streams(StreamType::UniDi, 2), + )); + handshake_client_error(&mut client, &mut server, &Error::StreamLimitError); + } + + fn do_malformed_response_test(headers: &[Header]) { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let mut d = Encoder::default(); + server.encode_headers(request_stream_id, headers, &mut d); + + // Send response + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Stream has been reset because of the malformed headers. + let e = client.events().next().unwrap(); + assert_eq!( + e, + Http3ClientEvent::Reset { + stream_id: request_stream_id, + error: Error::InvalidHeader.code(), + local: true, + } + ); + } + + #[test] + fn malformed_response_pseudo_header_after_regular_header() { + do_malformed_response_test(&[ + Header::new("content-type", "text/plain"), + Header::new(":status", "100"), + ]); + } + + #[test] + fn malformed_response_undefined_pseudo_header() { + do_malformed_response_test(&[Header::new(":status", "200"), Header::new(":cheese", "200")]); + } + + #[test] + fn malformed_response_duplicate_pseudo_header() { + do_malformed_response_test(&[ + Header::new(":status", "200"), + Header::new(":status", "100"), + Header::new("content-type", "text/plain"), + ]); + } + + #[test] + fn malformed_response_uppercase_header() { + do_malformed_response_test(&[ + Header::new(":status", "200"), + Header::new("content-Type", "text/plain"), + ]); + } + + #[test] + fn malformed_response_excluded_header() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let mut d = Encoder::default(); + server.encode_headers( + request_stream_id, + &[ + Header::new(":status", "200"), + Header::new("content-type", "text/plain"), + Header::new("connection", "close"), + ], + &mut d, + ); + + // Send response + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Stream has been reset because of the malformed headers. + let e = client.events().next().unwrap(); + assert_eq!( + e, + Http3ClientEvent::HeaderReady { + stream_id: request_stream_id, + headers: vec![ + Header::new(":status", "200"), + Header::new("content-type", "text/plain") + ], + interim: false, + fin: false, + } + ); + } + + #[test] + fn malformed_response_excluded_byte_in_header() { + do_malformed_response_test(&[ + Header::new(":status", "200"), + Header::new("content:type", "text/plain"), + ]); + } + + #[test] + fn malformed_response_request_header_in_response() { + do_malformed_response_test(&[ + Header::new(":status", "200"), + Header::new(":method", "GET"), + Header::new("content-type", "text/plain"), + ]); + } + + fn maybe_authenticate(conn: &mut Http3Client) { + let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); + if conn.events().any(authentication_needed) { + conn.authenticated(AuthenticationStatus::Ok, now()); + } + } + + const MAX_TABLE_SIZE: u64 = 65536; + const MAX_BLOCKED_STREAMS: u16 = 5; + + fn get_resumption_token(server: &mut Http3Server) -> ResumptionToken { + let mut client = default_http3_client_param(MAX_TABLE_SIZE); + + let mut datagram = None; + let is_done = |c: &Http3Client| matches!(c.state(), Http3State::Connected); + while !is_done(&mut client) { + maybe_authenticate(&mut client); + datagram = client.process(datagram, now()).dgram(); + datagram = server.process(datagram, now()).dgram(); + } + + // exchange qpack settings, server will send a token as well. + datagram = client.process(datagram, now()).dgram(); + datagram = server.process(datagram, now()).dgram(); + mem::drop(client.process(datagram, now()).dgram()); + + client + .events() + .find_map(|e| { + if let Http3ClientEvent::ResumptionToken(token) = e { + Some(token) + } else { + None + } + }) + .unwrap() + } + + // Test that decoder stream type is always sent before any other instruction also + // in case when 0RTT is used. + // A client will send a request that uses the dynamic table. This will trigger a header-ack + // from a server. We will use stats to check that a header-ack has been received. + #[test] + fn zerortt_request_use_dynamic_table() { + let mut server = Http3Server::new( + now(), + DEFAULT_KEYS, + DEFAULT_ALPN_H3, + anti_replay(), + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + Http3Parameters::default() + .max_table_size_encoder(MAX_TABLE_SIZE) + .max_table_size_decoder(MAX_TABLE_SIZE) + .max_blocked_streams(MAX_BLOCKED_STREAMS), + None, + ) + .unwrap(); + + let token = get_resumption_token(&mut server); + // Make a new connection. + let mut client = default_http3_client_param(MAX_TABLE_SIZE); + assert_eq!(client.state(), Http3State::Initializing); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + + assert_eq!(client.state(), Http3State::ZeroRtt); + let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); + assert!(client.events().any(zerortt_event)); + + // Make a request that uses the dynamic table. + let _ = make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]); + // Assert that the request has used dynamic table. That will trigger a header_ack. + assert_eq!(client.qpack_encoder_stats().dynamic_table_references, 1); + + // Exchange packets until header-ack is received. + // These many packet exchange is needed, to get a header-ack. + // TODO this may be optimize at Http3Server. + let out = client.process(None, now()).dgram(); + let out = server.process(out, now()).dgram(); + let out = client.process(out, now()).dgram(); + let out = server.process(out, now()).dgram(); + let out = client.process(out, now()).dgram(); + let out = server.process(out, now()).dgram(); + let out = client.process(out, now()).dgram(); + let out = server.process(out, now()).dgram(); + mem::drop(client.process(out, now())); + + // The header ack for the first request has been received. + assert_eq!(client.qpack_encoder_stats().header_acks_recv, 1); + } + + fn manipulate_conrol_stream(client: &mut Http3Client, stream_id: StreamId) { + assert_eq!( + client + .cancel_fetch(stream_id, Error::HttpNoError.code()) + .unwrap_err(), + Error::InvalidStreamId + ); + assert_eq!( + client.stream_close_send(stream_id).unwrap_err(), + Error::InvalidStreamId + ); + let mut buf = [0; 2]; + assert_eq!( + client.send_data(stream_id, &buf).unwrap_err(), + Error::InvalidStreamId + ); + assert_eq!( + client.read_data(now(), stream_id, &mut buf).unwrap_err(), + Error::InvalidStreamId + ); + } + + #[test] + fn manipulate_conrol_streams() { + let (mut client, server, request_stream_id) = connect_and_send_request(false); + manipulate_conrol_stream(&mut client, CLIENT_SIDE_CONTROL_STREAM_ID); + manipulate_conrol_stream(&mut client, CLIENT_SIDE_ENCODER_STREAM_ID); + manipulate_conrol_stream(&mut client, CLIENT_SIDE_DECODER_STREAM_ID); + manipulate_conrol_stream(&mut client, server.control_stream_id.unwrap()); + manipulate_conrol_stream(&mut client, server.encoder_stream_id.unwrap()); + manipulate_conrol_stream(&mut client, server.decoder_stream_id.unwrap()); + client + .cancel_fetch(request_stream_id, Error::HttpNoError.code()) + .unwrap(); + } + + // Client: receive a push stream + #[test] + fn incomple_push_stream() { + let (mut client, mut server) = connect(); + + // Create a push stream + let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); + let _ = server + .conn + .stream_send(push_stream_id, PUSH_STREAM_TYPE) + .unwrap(); + let _ = server.conn.stream_send(push_stream_id, &[0]).unwrap(); + server.conn.stream_close_send(push_stream_id).unwrap(); + let out = server.conn.process(None, now()); + client.process(out.dgram(), now()); + assert_closed(&client, &Error::HttpGeneralProtocol); + } + + #[test] + fn priority_update_during_full_buffer() { + // set a lower MAX_DATA on the server side to restrict the data the client can send + let (mut client, mut server) = + connect_with_connection_parameters(ConnectionParameters::default().max_data(1200)); + + let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, false); + let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); + assert!(client.events().any(data_writable)); + // Send a lot of data to reach the flow control limit + client.send_data(request_stream_id, &[0; 2000]).unwrap(); + + // now queue a priority_update packet for that stream + assert!(client + .priority_update(request_stream_id, Priority::new(6, false)) + .unwrap()); + + let md_before = server.conn.stats().frame_tx.max_data; + + // sending the http request and most most of the request data + let out = client.process(None, now()).dgram(); + let out = server.conn.process(out, now()).dgram(); + + // the server responses with an ack, but the max_data didn't change + assert_eq!(md_before, server.conn.stats().frame_tx.max_data); + + let out = client.process(out, now()).dgram(); + let out = server.conn.process(out, now()).dgram(); + + // the server increased the max_data during the second read if that isn't the case + // in the future and therefore this asserts fails, the request data on stream 0 could be read + // to cause a max_update frame + assert_eq!(md_before + 1, server.conn.stats().frame_tx.max_data); + + // make sure that the server didn't receive a priority_update on client control stream (stream_id 2) yet + let mut buf = [0; 32]; + assert_eq!( + server.conn.stream_recv(StreamId::new(2), &mut buf), + Ok((0, false)) + ); + + // the client now sends the priority update + let out = client.process(out, now()).dgram(); + server.conn.process_input(out.unwrap(), now()); + + // check that the priority_update arrived at the client control stream + let num_read = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap(); + assert_eq!(b"\x80\x0f\x07\x00\x04\x00\x75\x3d\x36", &buf[0..num_read.0]); + } + + #[test] + fn error_request_stream() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let headers = vec![ + Header::new(":status", "200"), + Header::new(":method", "GET"), // <- invalid + Header::new("my-header", "my-header"), + Header::new("content-length", "3"), + ]; + let encoded_headers = server.encoder.borrow_mut().encode_header_block( + &mut server.conn, + &headers, + request_stream_id, + ); + let hframe = HFrame::Headers { + header_block: encoded_headers.to_vec(), + }; + + // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. + let encoder_inst_pkt = server.conn.process(None, now()); + + // Send response + let mut d = Encoder::default(); + hframe.encode(&mut d); + let d_frame = HFrame::Data { len: 3 }; + d_frame.encode(&mut d); + d.encode(b"abc"); + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + true, + ); + + // Let client receive the encoder instructions. + client.process_input(encoder_inst_pkt.dgram().unwrap(), now()); + + let reset_event = |e| matches!(e, Http3ClientEvent::Reset { stream_id, .. } if stream_id == request_stream_id); + assert!(client.events().any(reset_event)); + } + + #[test] + fn response_w_101() { + let (mut client, mut server, request_stream_id) = connect_and_send_request(true); + + setup_server_side_encoder(&mut client, &mut server); + + let mut d = Encoder::default(); + let headers1xx = &[Header::new(":status", "101")]; + server.encode_headers(request_stream_id, headers1xx, &mut d); + + // Send 101 response. + server_send_response_and_exchange_packet( + &mut client, + &mut server, + request_stream_id, + &d, + false, + ); + + // Stream has been reset because of the 101 response. + let e = client.events().next().unwrap(); + assert_eq!( + e, + Http3ClientEvent::Reset { + stream_id: request_stream_id, + error: Error::InvalidHeader.code(), + local: true, + } + ); + } +} |