// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use crate::client_events::{Http3ClientEvent, Http3ClientEvents}; use crate::connection::{Http3Connection, Http3State, RequestDescription}; use crate::frames::HFrame; use crate::push_controller::{PushController, RecvPushEvents}; use crate::recv_message::{RecvMessage, RecvMessageInfo}; use crate::request_target::AsRequestTarget; use crate::settings::HSettings; use crate::{ Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler, ReceiveOutput, }; use neqo_common::{ event::Provider as EventProvider, hex, hex_with_len, qdebug, qinfo, qlog::NeqoQlog, qtrace, Datagram, Decoder, Encoder, Header, MessageType, Role, }; use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo}; use neqo_qpack::Stats as QpackStats; use neqo_transport::{ AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator, DatagramTracking, Output, Stats as TransportStats, StreamId, StreamType, Version, ZeroRttState, }; use std::cell::RefCell; use std::convert::TryFrom; use std::fmt::Debug; use std::fmt::Display; use std::mem; use std::net::SocketAddr; use std::rc::Rc; use std::time::Instant; use crate::{Error, Res}; // This is used for filtering send_streams and recv_Streams with a stream_ids greater than or equal a given id. // Only the same type (bidirectional or unidirectionsl) streams are filtered. fn id_gte(base: StreamId) -> impl FnMut((&StreamId, &U)) -> Option + 'static where U: ?Sized, { move |(id, _)| { if *id >= base && !(id.is_bidi() ^ base.is_bidi()) { Some(*id) } else { None } } } fn alpn_from_quic_version(version: Version) -> &'static str { match version { Version::Version2 | Version::Version1 => "h3", Version::Draft29 => "h3-29", Version::Draft30 => "h3-30", Version::Draft31 => "h3-31", Version::Draft32 => "h3-32", } } /// # The HTTP/3 client API /// /// This module implements the HTTP/3 client API. The main implementation of the protocol is in /// [connection.rs](https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs) which /// implements common behavior for the client-side and the server-side. `Http3Client` structure /// implements the public API and set of functions that differ between the client and the server. /// The API is used for: /// - create and close an endpoint: /// - [`new`](struct.Http3Client.html#method.new) /// - [`new_with_conn`](struct.Http3Client.html#method.new_with_conn) /// - [`close`](struct.Http3Client.html#method.close) /// - configuring an endpoint: /// - [`authenticated`](struct.Http3Client.html#method.authenticated) /// - [`enable_ech`](struct.Http3Client.html#method.enable_ech) /// - [`enable_resumption`](struct.Http3Client.html#method.enable_resumption) /// - [`initiate_key_update`](struct.Http3Client.html#method.initiate_key_update) /// - [`set_qlog`](struct.Http3Client.html#method.set_qlog) /// - retrieving information about a connection: /// - [`peer_certificate`](struct.Http3Client.html#method.peer_certificate) /// - [`qpack_decoder_stats`](struct.Http3Client.html#method.qpack_decoder_stats) /// - [`qpack_encoder_stats`](struct.Http3Client.html#method.qpack_encoder_stats) /// - [`transport_stats`](struct.Http3Client.html#method.transport_stats) /// - [`state`](struct.Http3Client.html#method.state) /// - [`take_resumption_token`](struct.Http3Client.html#method.take_resumption_token) /// - [`tls_inf`](struct.Http3Client.html#method.tls_info) /// - driving HTTP/3 session: /// - [`process_output`](struct.Http3Client.html#method.process_output) /// - [`process_input`](struct.Http3Client.html#method.process_input) /// - [`process`](struct.Http3Client.html#method.process) /// - create requests, send/receive data, and cancel requests: /// - [`fetch`](struct.Http3Client.html#method.fetch) /// - [`send_data`](struct.Http3Client.html#method.send_data) /// - [`read_dara`](struct.Http3Client.html#method.read_data) /// - [`stream_close_send`](struct.Http3Client.html#method.stream_close_send) /// - [`cancel_fetch`](struct.Http3Client.html#method.cancel_fetch) /// - [`stream_reset_send`](struct.Http3Client.html#method.stream_reset_send) /// - [`stream_stop_sending`](struct.Http3Client.html#method.stream_stop_sending) /// - [`set_stream_max_data`](struct.Http3Client.html#method.set_stream_max_data) /// - priority feature: /// - [`priority_update`](struct.Http3Client.html#method.priority_update) /// - `WebTransport` feature: /// - [`webtransport_create_session`](struct.Http3Client.html#method.webtransport_create_session) /// - [`webtransport_close_session`](struct.Http3Client.html#method.webtransport_close_session) /// - [`webtransport_create_stream`](struct.Http3Client.html#method.webtransport_create_sstream) /// - [`webtransport_enabled`](struct.Http3Client.html#method.webtransport_enabled) /// /// ## Examples /// /// ### Fetching a resource /// /// ```ignore /// let mut client = Http3Client::new(...); /// /// // Perform a handshake /// ... /// /// let req = client /// .fetch( /// Instant::now(), /// "GET", /// &("https", "something.com", "/"), /// &[Header::new("example1", "value1"), Header::new("example1", "value2")], /// Priority::default(), /// ) /// .unwrap(); /// /// client.stream_close_send(req).unwrap(); /// /// loop { /// // exchange packets /// ... /// /// while let Some(event) = client.next_event() { /// match event { /// Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin } => { /// println!("New response headers received for stream {:?} [fin={?}, interim={:?}]: {:?}", /// stream_id, /// fin, /// interim, /// headers, /// ); /// } /// Http3ClientEvent::DataReadable { stream_id } => { /// println!("New data available on stream {}", stream_id); /// let mut buf = [0; 100]; /// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); /// println!("Read {:?} bytes from stream {:?} [fin={?}]", /// amount, /// stream_id, /// fin, /// ); /// } /// _ => { /// println!("Unhandled event {:?}", event); /// } /// } /// } /// } ///``` /// /// ### Creating a `WebTransport` session /// /// ```ignore /// let mut client = Http3Client::new(...); /// /// // Perform a handshake /// ... /// /// // Create a session /// let wt_session_id = client /// .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) /// .unwrap(); /// /// loop { /// // exchange packets /// ... /// /// while let Some(event) = client.next_event() { /// match event { /// Http3ClientEvent::WebTransport(WebTransportEvent::Session{ /// stream_id, /// status, /// .. /// }) => { /// println!("The response from the server: WebTransport session ID {:?} status={:?}", /// stream_id, /// status, /// ); /// } /// _ => { /// println!("Unhandled event {:?}", event); /// } /// } /// } /// } /// ///``` /// /// ### `WebTransport`: create a stream, send and receive data on the stream /// /// ```ignore /// const BUF_CLIENT: &[u8] = &[0; 10]; /// // wt_session_id is the session ID of a newly created WebTransport session, see the example above. /// /// // create a stream /// let wt_stream_id = client /// .webtransport_create_stream(wt_session_id, StreamType::BiDi) /// .unwrap(); /// /// // send data /// let data_sent = client.send_data(wt_stream_id, BUF_CLIENT).unwrap(); /// assert_eq!(data_sent, BUF_CLIENT.len()); /// /// // close stream for sending /// client.stream_close_send(wt_stream_id).unwrap(); /// /// // wait for data from the server /// loop { /// // exchange packets /// ... /// /// while let Some(event) = client.next_event() { /// match event { /// Http3ClientEvent::DataReadable{ stream_id } => { /// println!("Data receivedd form the server on WebTransport stream ID {:?}", /// stream_id, /// ); /// let mut buf = [0; 100]; /// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); /// println!("Read {:?} bytes from stream {:?} [fin={?}]", /// amount, /// stream_id, /// fin, /// ); /// } /// _ => { /// println!("Unhandled event {:?}", event); /// } /// } /// } /// } /// ``` /// /// ### `WebTransport`: receive a new stream form the server /// /// ```ignore /// // wt_session_id is the session ID of a newly created WebTransport session, see the example above. /// /// // wait for a new stream from the server /// loop { /// // exchange packets /// ... /// /// while let Some(event) = client.next_event() { /// match event { /// Http3ClientEvent::WebTransport(WebTransportEvent::NewStream { /// stream_id, /// session_id, /// }) => { /// println!("New stream received on session{:?}, stream id={:?} stream type={:?}", /// sesson_id.stream_id(), /// stream_id.stream_id(), /// stream_id.stream_type() /// ); /// } /// Http3ClientEvent::DataReadable{ stream_id } => { /// println!("Data receivedd form the server on WebTransport stream ID {:?}", /// stream_id, /// ); /// let mut buf = [0; 100]; /// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); /// println!("Read {:?} bytes from stream {:?} [fin={:?}]", /// amount, /// stream_id, /// fin, /// ); /// } /// _ => { /// println!("Unhandled event {:?}", event); /// } /// } /// } /// } /// ``` /// pub struct Http3Client { conn: Connection, base_handler: Http3Connection, events: Http3ClientEvents, push_handler: Rc>, } impl Display for Http3Client { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { write!(f, "Http3 client") } } impl Http3Client { /// # Errors /// Making a `neqo-transport::connection` may produce an error. This can only be a crypto error if /// the crypto context can't be created or configured. pub fn new( server_name: impl Into, cid_manager: Rc>, local_addr: SocketAddr, remote_addr: SocketAddr, http3_parameters: Http3Parameters, now: Instant, ) -> Res { Ok(Self::new_with_conn( Connection::new_client( server_name, &[alpn_from_quic_version( http3_parameters .get_connection_parameters() .get_versions() .initial(), )], cid_manager, local_addr, remote_addr, http3_parameters.get_connection_parameters().clone(), now, )?, http3_parameters, )) } /// This is a similar function to `new`. In this case, `neqo-transport::connection` has been /// already created. /// /// It is recommended to use `new` instead. #[must_use] pub fn new_with_conn(c: Connection, http3_parameters: Http3Parameters) -> Self { let events = Http3ClientEvents::default(); let webtransport = http3_parameters.get_webtransport(); let push_streams = http3_parameters.get_max_concurrent_push_streams(); let mut base_handler = Http3Connection::new(http3_parameters, Role::Client); if webtransport { base_handler.set_features_listener(events.clone()); } Self { conn: c, events: events.clone(), push_handler: Rc::new(RefCell::new(PushController::new(push_streams, events))), base_handler, } } #[must_use] pub fn role(&self) -> Role { self.conn.role() } /// The function returns the current state of the connection. #[must_use] pub fn state(&self) -> Http3State { self.base_handler.state() } #[must_use] pub fn tls_info(&self) -> Option<&SecretAgentInfo> { self.conn.tls_info() } /// Get the peer's certificate. #[must_use] pub fn peer_certificate(&self) -> Option { self.conn.peer_certificate() } /// This called when peer certificates have been verified. /// /// `Http3ClientEvent::AuthenticationNeeded` event is emitted when peer’s certificates are /// available and need to be verified. When the verification is completed this function is /// called. To inform HTTP/3 session of the verification results. pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) { self.conn.authenticated(status, now); } pub fn set_qlog(&mut self, qlog: NeqoQlog) { self.conn.set_qlog(qlog); } /// Enable encrypted client hello (ECH). /// /// # Errors /// Fails when the configuration provided is bad. pub fn enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> { self.conn.client_enable_ech(ech_config_list)?; Ok(()) } /// Get the connection id, which is useful for disambiguating connections to /// the same origin. #[must_use] pub fn connection_id(&self) -> &ConnectionId { self.conn.odcid().expect("Client always has odcid") } fn encode_resumption_token(&self, token: &ResumptionToken) -> Option { self.base_handler.get_settings().map(|settings| { let mut enc = Encoder::default(); settings.encode_frame_contents(&mut enc); enc.encode(token.as_ref()); ResumptionToken::new(enc.into(), token.expiration_time()) }) } /// The correct way to obtain a resumption token is to wait for the /// `Http3ClientEvent::ResumptionToken` event. To emit the event we are waiting for a /// resumtion token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN` /// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a /// problem for short-lived connections, where the connection is closed before any events are /// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to /// arrive. /// /// In addition to the token, HTTP/3 settings are encoded into the token before giving it to /// the application(`encode_resumption_token`). When the resumption token is supplied to a new /// connection the HTTP/3 setting will be decoded and used until the setting are received from /// the server. pub fn take_resumption_token(&mut self, now: Instant) -> Option { self.conn .take_resumption_token(now) .and_then(|t| self.encode_resumption_token(&t)) } /// This may be call if an application has a resumption token. This must be called before connection starts. /// /// The resumption token also contains encoded HTTP/3 settings. The settings will be decoded /// and used until the setting are received from the server. /// /// # Errors /// An error is return if token cannot be decoded or a connection is is a wrong state. /// # Panics /// On closing if the base handler can't handle it (debug only). pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> { if self.base_handler.state != Http3State::Initializing { return Err(Error::InvalidState); } let mut dec = Decoder::from(token.as_ref()); let settings_slice = match dec.decode_vvec() { Some(v) => v, None => return Err(Error::InvalidResumptionToken), }; qtrace!([self], " settings {}", hex_with_len(settings_slice)); let mut dec_settings = Decoder::from(settings_slice); let mut settings = HSettings::default(); Error::map_error( settings.decode_frame_contents(&mut dec_settings), Error::InvalidResumptionToken, )?; let tok = dec.decode_remainder(); qtrace!([self], " Transport token {}", hex(tok)); self.conn.enable_resumption(now, tok)?; if self.conn.state().closed() { let state = self.conn.state().clone(); let res = self .base_handler .handle_state_change(&mut self.conn, &state); debug_assert_eq!(Ok(true), res); return Err(Error::FatalError); } if self.conn.zero_rtt_state() == ZeroRttState::Sending { self.base_handler .set_0rtt_settings(&mut self.conn, settings)?; self.events .connection_state_change(self.base_handler.state()); self.push_handler .borrow_mut() .maybe_send_max_push_id_frame(&mut self.base_handler); } Ok(()) } /// This is call to close a connection. pub fn close(&mut self, now: Instant, error: AppError, msg: S) where S: AsRef + Display, { qinfo!([self], "Close the connection error={} msg={}.", error, msg); if !matches!( self.base_handler.state, Http3State::Closing(_) | Http3State::Closed(_) ) { self.push_handler.borrow_mut().clear(); self.conn.close(now, error, msg); self.base_handler.close(error); self.events .connection_state_change(self.base_handler.state()); } } /// Attempt to force a key update. /// # Errors /// If the connection isn't confirmed, or there is an outstanding key update, this /// returns `Err(Error::TransportError(neqo_transport::Error::KeyUpdateBlocked))`. pub fn initiate_key_update(&mut self) -> Res<()> { self.conn.initiate_key_update()?; Ok(()) } // API: Request/response /// The function fetches a resource using `method`, `target` and `headers`. A response body /// may be added by calling `send_data`. `stream_close_send` must be sent to finish the request /// even if request data are not sent. /// # Errors /// If a new stream cannot be created an error will be return. /// # Panics /// `SendMessage` implements `http_stream` so it will not panic. pub fn fetch<'x, 't: 'x, T>( &mut self, now: Instant, method: &'t str, target: &'t T, headers: &'t [Header], priority: Priority, ) -> Res where T: AsRequestTarget<'x> + ?Sized + Debug, { let output = self.base_handler.fetch( &mut self.conn, Box::new(self.events.clone()), Box::new(self.events.clone()), Some(Rc::clone(&self.push_handler)), &RequestDescription { method, connect_type: None, target, headers, priority, }, ); if let Err(e) = &output { if e.connection_error() { self.close(now, e.code(), ""); } } output } /// Send an [`PRIORITY_UPDATE`-frame][1] on next `Http3Client::process_output()` call. /// Returns if the priority got changed. /// # Errors /// `InvalidStreamId` if the stream does not exist /// /// [1]: https://datatracker.ietf.org/doc/html/draft-kazuho-httpbis-priority-04#section-5.2 pub fn priority_update(&mut self, stream_id: StreamId, priority: Priority) -> Res { self.base_handler.queue_update_priority(stream_id, priority) } /// An application may cancel a stream(request). /// Both sides, the receiviing and sending side, sending and receiving side, will be closed. /// # Errors /// An error will be return if a stream does not exist. pub fn cancel_fetch(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { qinfo!([self], "reset_stream {} error={}.", stream_id, error); self.base_handler .cancel_fetch(stream_id, error, &mut self.conn) } /// This is call when application is done sending a request. /// # Errors /// An error will be return if stream does not exist. pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> { qinfo!([self], "Close sending side stream={}.", stream_id); self.base_handler .stream_close_send(&mut self.conn, stream_id) } /// # Errors /// An error will be return if a stream does not exist. pub fn stream_reset_send(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { qinfo!([self], "stream_reset_send {} error={}.", stream_id, error); self.base_handler .stream_reset_send(&mut self.conn, stream_id, error) } /// # Errors /// An error will be return if a stream does not exist. pub fn stream_stop_sending(&mut self, stream_id: StreamId, error: AppError) -> Res<()> { qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error); self.base_handler .stream_stop_sending(&mut self.conn, stream_id, error) } /// This function is used for regular HTTP requests and `WebTransport` streams. /// In the case of regular HTTP requests, the request body is supplied using this function, and /// headers are supplied through the `fetch` function. /// /// # Errors /// `InvalidStreamId` if the stream does not exist, /// `AlreadyClosed` if the stream has already been closed. /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output` /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.) /// `InvalidInput` if an empty buffer has been supplied. pub fn send_data(&mut self, stream_id: StreamId, buf: &[u8]) -> Res { qinfo!( [self], "send_data from stream {} sending {} bytes.", stream_id, buf.len() ); self.base_handler .send_streams .get_mut(&stream_id) .ok_or(Error::InvalidStreamId)? .send_data(&mut self.conn, buf) } /// Response data are read directly into a buffer supplied as a parameter of this function to avoid copying /// data. /// # Errors /// It returns an error if a stream does not exist or an error happen while reading a stream, e.g. /// early close, protocol error, etc. pub fn read_data( &mut self, now: Instant, stream_id: StreamId, buf: &mut [u8], ) -> Res<(usize, bool)> { qinfo!([self], "read_data from stream {}.", stream_id); let res = self.base_handler.read_data(&mut self.conn, stream_id, buf); if let Err(e) = &res { if e.connection_error() { self.close(now, e.code(), ""); } } res } // API: Push streams /// Cancel a push /// # Errors /// `InvalidStreamId` if the stream does not exist. pub fn cancel_push(&mut self, push_id: u64) -> Res<()> { self.push_handler .borrow_mut() .cancel(push_id, &mut self.conn, &mut self.base_handler) } /// Push response data are read directly into a buffer supplied as a parameter of this function /// to avoid copying data. /// # Errors /// It returns an error if a stream does not exist(`InvalidStreamId`) or an error has happened while /// reading a stream, e.g. early close, protocol error, etc. pub fn push_read_data( &mut self, now: Instant, push_id: u64, buf: &mut [u8], ) -> Res<(usize, bool)> { let stream_id = self .push_handler .borrow_mut() .get_active_stream_id(push_id) .ok_or(Error::InvalidStreamId)?; self.conn.stream_keep_alive(stream_id, true)?; self.read_data(now, stream_id, buf) } // API WebTransport /// # Errors /// If `WebTransport` cannot be created, e.g. the `WebTransport` support is /// not negotiated or the HTTP/3 connection is closed. pub fn webtransport_create_session<'x, 't: 'x, T>( &mut self, now: Instant, target: &'t T, headers: &'t [Header], ) -> Res where T: AsRequestTarget<'x> + ?Sized + Debug, { let output = self.base_handler.webtransport_create_session( &mut self.conn, Box::new(self.events.clone()), target, headers, ); if let Err(e) = &output { if e.connection_error() { self.close(now, e.code(), ""); } } output } /// Close `WebTransport` cleanly /// # Errors /// `InvalidStreamId` if the stream does not exist, /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if `process_output` /// has not been called when needed, and HTTP3 layer has not picked up the info that the stream has been closed.) /// `InvalidInput` if an empty buffer has been supplied. pub fn webtransport_close_session( &mut self, session_id: StreamId, error: u32, message: &str, ) -> Res<()> { self.base_handler .webtransport_close_session(&mut self.conn, session_id, error, message) } /// # Errors /// This may return an error if the particular session does not exist /// or the connection is not in the active state. pub fn webtransport_create_stream( &mut self, session_id: StreamId, stream_type: StreamType, ) -> Res { self.base_handler.webtransport_create_stream_local( &mut self.conn, session_id, stream_type, Box::new(self.events.clone()), Box::new(self.events.clone()), ) } /// Send `WebTransport` datagram. /// # Errors /// It may return `InvalidStreamId` if a stream does not exist anymore. /// The function returns `TooMuchData` if the supply buffer is bigger than /// the allowed remote datagram size. pub fn webtransport_send_datagram( &mut self, session_id: StreamId, buf: &[u8], id: impl Into, ) -> Res<()> { qtrace!("webtransport_send_datagram session:{:?}", session_id); self.base_handler .webtransport_send_datagram(session_id, &mut self.conn, buf, id) } /// Returns the current max size of a datagram that can fit into a packet. /// The value will change over time depending on the encoded size of the /// packet number, ack frames, etc. /// # Errors /// The function returns `NotAvailable` if datagrams are not enabled. /// # Panics /// This cannot panic. The max varint length is 8. pub fn webtransport_max_datagram_size(&self, session_id: StreamId) -> Res { Ok(self.conn.max_datagram_size()? - u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap()) } /// This function combines `process_input` and `process_output` function. pub fn process(&mut self, dgram: Option, now: Instant) -> Output { qtrace!([self], "Process."); if let Some(d) = dgram { self.process_input(d, now); } self.process_output(now) } /// The function should be called when there is a new UDP packet available. The function will /// handle the packet payload. /// /// First, the payload will be handled by the QUIC layer. Afterward, `process_http3` will be /// called to handle new [`ConnectionEvent`][1]s. /// /// After this function is called `process_output` should be called to check whether new /// packets need to be sent or if a timer needs to be updated. /// /// [1]: ../neqo_transport/enum.ConnectionEvent.html pub fn process_input(&mut self, dgram: Datagram, now: Instant) { qtrace!([self], "Process input."); self.conn.process_input(dgram, now); self.process_http3(now); } /// This should not be used because it gives access to functionalities that may disrupt the /// proper functioning of the HTTP/3 session. /// Only used by `neqo-interop`. pub fn conn(&mut self) -> &mut Connection { &mut self.conn } /// Process HTTP3 layer. /// When `process_output`, `process_input`, or `process` is called we must call this function /// as well. The functions calls `Http3Client::check_connection_events` to handle events from /// the QUC layer and calls `Http3Connection::process_sending` to ensure that HTTP/3 layer /// data, e.g. control frames, are sent. fn process_http3(&mut self, now: Instant) { qtrace!([self], "Process http3 internal."); match self.base_handler.state() { Http3State::ZeroRtt | Http3State::Connected | Http3State::GoingAway(..) => { let res = self.check_connection_events(); if self.check_result(now, &res) { return; } self.push_handler .borrow_mut() .maybe_send_max_push_id_frame(&mut self.base_handler); let res = self.base_handler.process_sending(&mut self.conn); self.check_result(now, &res); } Http3State::Closed { .. } => {} _ => { let res = self.check_connection_events(); let _ = self.check_result(now, &res); } } } /// The function should be called to check if there is a new UDP packet to be sent. It should /// be called after a new packet is received and processed and after a timer expires (QUIC /// needs timers to handle events like PTO detection and timers are not implemented by the neqo /// library, but instead must be driven by the application). /// /// `process_output` can return: /// - a [`Output::Datagram(Datagram)`][1]: data that should be sent as a UDP payload, /// - a [`Output::Callback(Duration)`][1]: the duration of a timer. `process_output` should be called at least after the time expires, /// - [`Output::None`][1]: this is returned when `Nttp3Client` is done and can be destroyed. /// /// The application should call this function repeatedly until a timer value or None is /// returned. After that, the application should call the function again if a new UDP packet is /// received and processed or the timer value expires. /// /// The HTTP/3 neqo implementation drives the HTTP/3 and QUC layers, therefore this function /// will call both layers: /// - First it calls HTTP/3 layer processing (`process_http3`) to make sure the layer writes /// data to QUIC layer or cancels streams if needed. /// - Then QUIC layer processing is called - [`Connection::process_output`][3]. This produces a /// packet or a timer value. It may also produce ned [`ConnectionEvent`][2]s, e.g. connection /// state-change event. /// - Therefore the HTTP/3 layer processing (`process_http3`) is called again. /// /// [1]: ../neqo_transport/enum.Output.html /// [2]: ../neqo_transport/struct.ConnectionEvents.html /// [3]: ../neqo_transport/struct.Connection.html#method.process_output pub fn process_output(&mut self, now: Instant) -> Output { qtrace!([self], "Process output."); // Maybe send() stuff on http3-managed streams self.process_http3(now); let out = self.conn.process_output(now); // Update H3 for any transport state changes and events self.process_http3(now); out } /// This function takes the provided result and check for an error. /// An error results in closing the connection. fn check_result(&mut self, now: Instant, res: &Res) -> bool { match &res { Err(Error::HttpGoaway) => { qinfo!([self], "Connection error: goaway stream_id increased."); self.close( now, Error::HttpGeneralProtocol.code(), "Connection error: goaway stream_id increased", ); true } Err(e) => { qinfo!([self], "Connection error: {}.", e); self.close(now, e.code(), &format!("{}", e)); true } _ => false, } } /// This function checks [`ConnectionEvent`][2]s emitted by the QUIC layer, e.g. connection change /// state events, new incoming stream data is available, a stream is was reset, etc. The HTTP/3 /// layer needs to handle these events. Most of the events are handled by /// [`Http3Connection`][1] by calling appropriate functions, e.g. `handle_state_change`, /// `handle_stream_reset`, etc. [`Http3Connection`][1] handle functionalities that are common /// for the client and server side. Some of the functionalities are specific to the client and /// they are handled by `Http3Client`. For example, [`ConnectionEvent::RecvStreamReadable`][3] event /// is handled by `Http3Client::handle_stream_readable`. The function calls /// `Http3Connection::handle_stream_readable` and then hands the return value as appropriate /// for the client-side. /// /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs /// [2]: ../neqo_transport/enum.ConnectionEvent.html /// [3]: ../neqo_transport/enum.ConnectionEvent.html#variant.RecvStreamReadable fn check_connection_events(&mut self) -> Res<()> { qtrace!([self], "Check connection events."); while let Some(e) = self.conn.next_event() { qdebug!([self], "check_connection_events - event {:?}.", e); match e { ConnectionEvent::NewStream { stream_id } => { // During this event we only add a new stream to the Http3Connection stream list, // with NewStreamHeadReader stream handler. // This function will not read from the stream and try to decode the stream. // RecvStreamReadable will be emitted after this event and reading, i.e. decoding // of a stream will happen during that event. self.base_handler.add_new_stream(stream_id); } ConnectionEvent::SendStreamWritable { stream_id } => { if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) { s.stream_writable(); } } ConnectionEvent::RecvStreamReadable { stream_id } => { self.handle_stream_readable(stream_id)?; } ConnectionEvent::RecvStreamReset { stream_id, app_error, } => self .base_handler .handle_stream_reset(stream_id, app_error, &mut self.conn)?, ConnectionEvent::SendStreamStopSending { stream_id, app_error, } => self.base_handler.handle_stream_stop_sending( stream_id, app_error, &mut self.conn, )?, ConnectionEvent::SendStreamCreatable { stream_type } => { self.events.new_requests_creatable(stream_type); } ConnectionEvent::AuthenticationNeeded => self.events.authentication_needed(), ConnectionEvent::EchFallbackAuthenticationNeeded { public_name } => { self.events.ech_fallback_authentication_needed(public_name); } ConnectionEvent::StateChange(state) => { if self .base_handler .handle_state_change(&mut self.conn, &state)? { self.events .connection_state_change(self.base_handler.state()); } } ConnectionEvent::ZeroRttRejected => { self.base_handler.handle_zero_rtt_rejected()?; self.events.zero_rtt_rejected(); self.push_handler.borrow_mut().handle_zero_rtt_rejected(); } ConnectionEvent::ResumptionToken(token) => { if let Some(t) = self.encode_resumption_token(&token) { self.events.resumption_token(t); } } ConnectionEvent::Datagram(dgram) => { self.base_handler.handle_datagram(&dgram); } ConnectionEvent::SendStreamComplete { .. } | ConnectionEvent::OutgoingDatagramOutcome { .. } | ConnectionEvent::IncomingDatagramDropped => {} } } Ok(()) } /// This function handled new data available on a stream. It calls /// `Http3Client::handle_stream_readable` and handles its response. Reading streams are mostly /// handled by [`Http3Connection`][1] because most part of it is common for the client and /// server. The following actions need to be handled by the client-specific code: /// - `ReceiveOutput::NewStream(NewStreamType::Push(_))` - the server cannot receive a push /// stream, /// - `ReceiveOutput::NewStream(NewStreamType::Http)` - client cannot receive a /// server-initiated HTTP request, /// - `ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_))` - because /// `Http3ClientEvents`is needed and events handler is specific to the client. /// - `ReceiveOutput::ControlFrames(control_frames)` - some control frame handling differs /// between the client and the server: /// - `HFrame::CancelPush` - only the client-side may receive it, /// - `HFrame::MaxPushId { .. }`, `HFrame::PriorityUpdateRequest { .. } ` and /// `HFrame::PriorityUpdatePush` can only be receive on the server side, /// - `HFrame::Goaway { stream_id }` needs specific handling by the client by the protocol /// specification. /// /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs fn handle_stream_readable(&mut self, stream_id: StreamId) -> Res<()> { match self .base_handler .handle_stream_readable(&mut self.conn, stream_id)? { ReceiveOutput::NewStream(NewStreamType::Push(push_id)) => { self.handle_new_push_stream(stream_id, push_id) } ReceiveOutput::NewStream(NewStreamType::Http) => Err(Error::HttpStreamCreation), ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => { self.base_handler.webtransport_create_stream_remote( StreamId::from(session_id), stream_id, Box::new(self.events.clone()), Box::new(self.events.clone()), )?; let res = self .base_handler .handle_stream_readable(&mut self.conn, stream_id)?; debug_assert!(matches!(res, ReceiveOutput::NoOutput)); Ok(()) } ReceiveOutput::ControlFrames(control_frames) => { for f in control_frames { match f { HFrame::CancelPush { push_id } => self .push_handler .borrow_mut() .handle_cancel_push(push_id, &mut self.conn, &mut self.base_handler), HFrame::MaxPushId { .. } | HFrame::PriorityUpdateRequest { .. } | HFrame::PriorityUpdatePush { .. } => Err(Error::HttpFrameUnexpected), HFrame::Goaway { stream_id } => self.handle_goaway(stream_id), _ => { unreachable!( "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames." ); } }?; } Ok(()) } _ => Ok(()), } } fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> { if !self.push_handler.borrow().can_receive_push() { return Err(Error::HttpId); } // Add a new push stream to `PushController`. `add_new_push_stream` may return an error // (this will be a connection error) or a bool. // If false is returned that means that the stream should be reset because the push has // been already canceled (CANCEL_PUSH frame or canceling push from the application). if !self .push_handler .borrow_mut() .add_new_push_stream(push_id, stream_id)? { // We are not interested in the result of stream_stop_sending, we are not interested // in this stream. mem::drop( self.conn .stream_stop_sending(stream_id, Error::HttpRequestCancelled.code()), ); return Ok(()); } self.base_handler.add_recv_stream( stream_id, Box::new(RecvMessage::new( &RecvMessageInfo { message_type: MessageType::Response, stream_type: Http3StreamType::Push, stream_id, header_frame_type_read: false, }, Rc::clone(&self.base_handler.qpack_decoder), Box::new(RecvPushEvents::new(push_id, Rc::clone(&self.push_handler))), None, // TODO: think about the right prority for the push streams. PriorityHandler::new(true, Priority::default()), )), ); let res = self .base_handler .handle_stream_readable(&mut self.conn, stream_id)?; debug_assert!(matches!(res, ReceiveOutput::NoOutput)); Ok(()) } fn handle_goaway(&mut self, goaway_stream_id: StreamId) -> Res<()> { qinfo!([self], "handle_goaway {}", goaway_stream_id); if goaway_stream_id.is_uni() || goaway_stream_id.is_server_initiated() { return Err(Error::HttpId); } match self.base_handler.state { Http3State::Connected => { self.base_handler.state = Http3State::GoingAway(goaway_stream_id); } Http3State::GoingAway(ref mut stream_id) => { if goaway_stream_id > *stream_id { return Err(Error::HttpGoaway); } *stream_id = goaway_stream_id; } Http3State::Closing(..) | Http3State::Closed(..) => {} _ => unreachable!("Should not receive Goaway frame in this state."), } // Issue reset events for streams >= goaway stream id let send_ids: Vec = self .base_handler .send_streams .iter() .filter_map(id_gte(goaway_stream_id)) .collect(); for id in send_ids { // We do not care about streams that are going to be closed. mem::drop(self.base_handler.handle_stream_stop_sending( id, Error::HttpRequestRejected.code(), &mut self.conn, )); } let recv_ids: Vec = self .base_handler .recv_streams .iter() .filter_map(id_gte(goaway_stream_id)) .collect(); for id in recv_ids { // We do not care about streams that are going to be closed. mem::drop(self.base_handler.handle_stream_reset( id, Error::HttpRequestRejected.code(), &mut self.conn, )); } self.events.goaway_received(); Ok(()) } /// Increases `max_stream_data` for a `stream_id`. /// # Errors /// Returns `InvalidStreamId` if a stream does not exist or the receiving /// side is closed. pub fn set_stream_max_data(&mut self, stream_id: StreamId, max_data: u64) -> Res<()> { self.conn.set_stream_max_data(stream_id, max_data)?; Ok(()) } #[must_use] pub fn qpack_decoder_stats(&self) -> QpackStats { self.base_handler.qpack_decoder.borrow().stats() } #[must_use] pub fn qpack_encoder_stats(&self) -> QpackStats { self.base_handler.qpack_encoder.borrow().stats() } #[must_use] pub fn transport_stats(&self) -> TransportStats { self.conn.stats() } #[must_use] pub fn webtransport_enabled(&self) -> bool { self.base_handler.webtransport_enabled() } } impl EventProvider for Http3Client { type Event = Http3ClientEvent; /// Return true if there are outstanding events. fn has_events(&self) -> bool { self.events.has_events() } /// Get events that indicate state changes on the connection. This method /// correctly handles cases where handling one event can obsolete /// previously-queued events, or cause new events to be generated. fn next_event(&mut self) -> Option { self.events.next_event() } } #[cfg(test)] mod tests { use super::{ AuthenticationStatus, Connection, Error, HSettings, Header, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Rc, RefCell, }; use crate::frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES}; use crate::qpack_encoder_receiver::EncoderRecvStream; use crate::settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS}; use crate::{Http3Server, Priority, RecvStream}; use neqo_common::{event::Provider, qtrace, Datagram, Decoder, Encoder}; use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken}; use neqo_qpack::{encoder::QPackEncoder, QpackSettings}; use neqo_transport::{ ConnectionError, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType, Version, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE, }; use std::convert::TryFrom; use std::mem; use std::time::Duration; use test_fixture::{ addr, anti_replay, default_server_h3, fixture_init, new_server, now, CountingConnectionIdGenerator, DEFAULT_ALPN_H3, DEFAULT_KEYS, DEFAULT_SERVER_NAME, }; fn assert_closed(client: &Http3Client, expected: &Error) { match client.state() { Http3State::Closing(err) | Http3State::Closed(err) => { assert_eq!(err, ConnectionError::Application(expected.code())); } _ => panic!("Wrong state {:?}", client.state()), }; } /// Create a http3 client with default configuration. pub fn default_http3_client() -> Http3Client { default_http3_client_param(100) } pub fn default_http3_client_param(max_table_size: u64) -> Http3Client { fixture_init(); Http3Client::new( DEFAULT_SERVER_NAME, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), addr(), addr(), Http3Parameters::default() .connection_parameters( // Disable compatible upgrade, which complicates tests. ConnectionParameters::default() .versions(Version::default(), vec![Version::default()]), ) .max_table_size_encoder(max_table_size) .max_table_size_decoder(max_table_size) .max_blocked_streams(100) .max_concurrent_push_streams(5), now(), ) .expect("create a default client") } const CONTROL_STREAM_TYPE: &[u8] = &[0x0]; // Encoder stream data const ENCODER_STREAM_DATA: &[u8] = &[0x2]; const ENCODER_STREAM_CAP_INSTRUCTION: &[u8] = &[0x3f, 0x45]; // Encoder stream data with a change capacity instruction(0x3f, 0x45 = change capacity to 100) // This data will be send when 0-RTT is used and we already have a max_table_capacity from // resumed settings. const ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION: &[u8] = &[0x2, 0x3f, 0x45]; const ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST: &[u8] = &[ 0x2, 0x3f, 0x45, 0x67, 0xa7, 0xd4, 0xe5, 0x1c, 0x85, 0xb1, 0x1f, 0x86, 0xa7, 0xd7, 0x71, 0xd1, 0x69, 0x7f, ]; // Decoder stream data const DECODER_STREAM_DATA: &[u8] = &[0x3]; const PUSH_STREAM_TYPE: &[u8] = &[0x1]; const CLIENT_SIDE_CONTROL_STREAM_ID: StreamId = StreamId::new(2); const CLIENT_SIDE_ENCODER_STREAM_ID: StreamId = StreamId::new(6); const CLIENT_SIDE_DECODER_STREAM_ID: StreamId = StreamId::new(10); struct TestServer { settings: HFrame, conn: Connection, control_stream_id: Option, encoder: Rc>, encoder_receiver: EncoderRecvStream, encoder_stream_id: Option, decoder_stream_id: Option, } impl TestServer { pub fn new() -> Self { Self::new_with_settings(&[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ]) } pub fn new_with_settings(server_settings: &[HSetting]) -> Self { fixture_init(); let max_table_size = server_settings .iter() .find(|s| s.setting_type == HSettingType::MaxTableCapacity) .map_or(100, |s| s.value); let max_blocked_streams = u16::try_from( server_settings .iter() .find(|s| s.setting_type == HSettingType::BlockedStreams) .map_or(100, |s| s.value), ) .unwrap(); let qpack = Rc::new(RefCell::new(QPackEncoder::new( &QpackSettings { max_table_size_encoder: max_table_size, max_table_size_decoder: max_table_size, max_blocked_streams, }, true, ))); Self { settings: HFrame::Settings { settings: HSettings::new(server_settings), }, conn: default_server_h3(), control_stream_id: None, encoder: Rc::clone(&qpack), encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack), encoder_stream_id: None, decoder_stream_id: None, } } pub fn new_with_conn(conn: Connection) -> Self { let qpack = Rc::new(RefCell::new(QPackEncoder::new( &QpackSettings { max_table_size_encoder: 128, max_table_size_decoder: 128, max_blocked_streams: 0, }, true, ))); Self { settings: HFrame::Settings { settings: HSettings::new(&[]), }, conn, control_stream_id: None, encoder: Rc::clone(&qpack), encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack), encoder_stream_id: None, decoder_stream_id: None, } } pub fn create_qpack_streams(&mut self) { // Create a QPACK encoder stream self.encoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap()); self.encoder .borrow_mut() .add_send_stream(self.encoder_stream_id.unwrap()); self.encoder .borrow_mut() .send_encoder_updates(&mut self.conn) .unwrap(); // Create decoder stream self.decoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap()); assert_eq!( self.conn .stream_send(self.decoder_stream_id.unwrap(), DECODER_STREAM_DATA) .unwrap(), 1 ); } pub fn create_control_stream(&mut self) { // Create control stream let control = self.conn.stream_create(StreamType::UniDi).unwrap(); qtrace!(["TestServer"], "control stream: {}", control); self.control_stream_id = Some(control); // Send stream type on the control stream. assert_eq!( self.conn .stream_send(self.control_stream_id.unwrap(), CONTROL_STREAM_TYPE) .unwrap(), 1 ); // Encode a settings frame and send it. let mut enc = Encoder::default(); self.settings.encode(&mut enc); assert_eq!( self.conn .stream_send(self.control_stream_id.unwrap(), enc.as_ref()) .unwrap(), enc.len() ); } pub fn check_client_control_qpack_streams_no_resumption(&mut self) { self.check_client_control_qpack_streams( ENCODER_STREAM_DATA, EXPECTED_REQUEST_HEADER_FRAME, false, true, ); } pub fn check_control_qpack_request_streams_resumption( &mut self, expect_encoder_stream_data: &[u8], expect_request_header: &[u8], expect_request: bool, ) { self.check_client_control_qpack_streams( expect_encoder_stream_data, expect_request_header, expect_request, false, ); } // Check that server has received correct settings and qpack streams. pub fn check_client_control_qpack_streams( &mut self, expect_encoder_stream_data: &[u8], expect_request_header: &[u8], expect_request: bool, expect_connected: bool, ) { let mut connected = false; let mut control_stream = false; let mut qpack_decoder_stream = false; let mut qpack_encoder_stream = false; let mut request = false; while let Some(e) = self.conn.next_event() { match e { ConnectionEvent::NewStream { stream_id } | ConnectionEvent::SendStreamWritable { stream_id } => { if expect_request { assert!(matches!(stream_id.as_u64(), 2 | 6 | 10 | 0)); } else { assert!(matches!(stream_id.as_u64(), 2 | 6 | 10)); } } ConnectionEvent::RecvStreamReadable { stream_id } => { if stream_id == CLIENT_SIDE_CONTROL_STREAM_ID { self.check_control_stream(); control_stream = true; } else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID { // the qpack encoder stream self.read_and_check_stream_data( stream_id, expect_encoder_stream_data, false, ); qpack_encoder_stream = true; } else if stream_id == CLIENT_SIDE_DECODER_STREAM_ID { // the qpack decoder stream self.read_and_check_stream_data(stream_id, DECODER_STREAM_DATA, false); qpack_decoder_stream = true; } else if stream_id == 0 { assert!(expect_request); self.read_and_check_stream_data(stream_id, expect_request_header, true); request = true; } else { panic!("unexpected event"); } } ConnectionEvent::StateChange(State::Connected) => connected = true, ConnectionEvent::StateChange(_) | ConnectionEvent::SendStreamCreatable { .. } => {} _ => panic!("unexpected event"), } } assert_eq!(connected, expect_connected); assert!(control_stream); assert!(qpack_encoder_stream); assert!(qpack_decoder_stream); assert_eq!(request, expect_request); } // Check that the control stream contains default values. // Expect a SETTINGS frame, some grease, and a MAX_PUSH_ID frame. // The default test configuration uses: // - max_table_capacity = 100 // - max_blocked_streams = 100 // and a maximum of 5 push streams. fn check_control_stream(&mut self) { let mut buf = [0_u8; 100]; let (amount, fin) = self .conn .stream_recv(CLIENT_SIDE_CONTROL_STREAM_ID, &mut buf) .unwrap(); let mut dec = Decoder::from(&buf[..amount]); assert_eq!(dec.decode_varint().unwrap(), 0); // control stream type assert_eq!(dec.decode_varint().unwrap(), 4); // SETTINGS assert_eq!( dec.decode_vvec().unwrap(), &[1, 0x40, 0x64, 7, 0x40, 0x64, 0xab, 0x60, 0x37, 0x42, 0x00] ); assert_eq!((dec.decode_varint().unwrap() - 0x21) % 0x1f, 0); // Grease assert!(dec.decode_vvec().unwrap().len() < 8); assert_eq!(dec.decode_varint().unwrap(), 0xd); // MAX_PUSH_ID assert_eq!(dec.decode_vvec().unwrap(), &[5]); assert_eq!(dec.remaining(), 0); assert!(!fin); } pub fn read_and_check_stream_data( &mut self, stream_id: StreamId, expected_data: &[u8], expected_fin: bool, ) { let mut buf = [0_u8; 100]; let (amount, fin) = self.conn.stream_recv(stream_id, &mut buf).unwrap(); assert_eq!(fin, expected_fin); assert_eq!(amount, expected_data.len()); assert_eq!(&buf[..amount], expected_data); } pub fn encode_headers( &mut self, stream_id: StreamId, headers: &[Header], encoder: &mut Encoder, ) { let header_block = self.encoder .borrow_mut() .encode_header_block(&mut self.conn, headers, stream_id); let hframe = HFrame::Headers { header_block: header_block.as_ref().to_vec(), }; hframe.encode(encoder); } } fn handshake_only(client: &mut Http3Client, server: &mut TestServer) -> Output { assert_eq!(client.state(), Http3State::Initializing); let out = client.process(None, now()); assert_eq!(client.state(), Http3State::Initializing); assert_eq!(*server.conn.state(), State::Init); let out = server.conn.process(out.dgram(), now()); assert_eq!(*server.conn.state(), State::Handshaking); let out = client.process(out.dgram(), now()); let out = server.conn.process(out.dgram(), now()); assert!(out.as_dgram_ref().is_none()); let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); assert!(client.events().any(authentication_needed)); client.authenticated(AuthenticationStatus::Ok, now()); out } // Perform only Quic transport handshake. fn connect_only_transport_with(client: &mut Http3Client, server: &mut TestServer) { let out = handshake_only(client, server); let out = client.process(out.dgram(), now()); let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected)); assert!(client.events().any(connected)); assert_eq!(client.state(), Http3State::Connected); server.conn.process_input(out.dgram().unwrap(), now()); assert!(server.conn.state().connected()); } // Perform only Quic transport handshake. fn connect_only_transport() -> (Http3Client, TestServer) { let mut client = default_http3_client(); let mut server = TestServer::new(); connect_only_transport_with(&mut client, &mut server); (client, server) } fn send_and_receive_client_settings(client: &mut Http3Client, server: &mut TestServer) { // send and receive client settings let dgram = client.process(None, now()).dgram(); server.conn.process_input(dgram.unwrap(), now()); server.check_client_control_qpack_streams_no_resumption(); } // Perform Quic transport handshake and exchange Http3 settings. fn connect_with(client: &mut Http3Client, server: &mut TestServer) { connect_only_transport_with(client, server); send_and_receive_client_settings(client, server); server.create_control_stream(); server.create_qpack_streams(); // Send the server's control and qpack streams data. let dgram = server.conn.process(None, now()).dgram(); client.process_input(dgram.unwrap(), now()); // assert no error occured. assert_eq!(client.state(), Http3State::Connected); } // Perform Quic transport handshake and exchange Http3 settings. fn connect_with_connection_parameters( server_conn_params: ConnectionParameters, ) -> (Http3Client, TestServer) { // connecting with default max_table_size let mut client = default_http3_client_param(100); let server = Connection::new_server( test_fixture::DEFAULT_KEYS, test_fixture::DEFAULT_ALPN_H3, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), server_conn_params, ) .unwrap(); let mut server = TestServer::new_with_conn(server); connect_with(&mut client, &mut server); (client, server) } // Perform Quic transport handshake and exchange Http3 settings. fn connect() -> (Http3Client, TestServer) { let mut client = default_http3_client(); let mut server = TestServer::new(); connect_with(&mut client, &mut server); (client, server) } // Fetch request fetch("GET", "https", "something.com", "/", headers). fn make_request( client: &mut Http3Client, close_sending_side: bool, headers: &[Header], ) -> StreamId { let request_stream_id = client .fetch( now(), "GET", "https://something.com/", headers, Priority::default(), ) .unwrap(); if close_sending_side { client.stream_close_send(request_stream_id).unwrap(); } request_stream_id } // For fetch request fetch("GET", "https", "something.com", "/", &[]) // the following request header frame will be sent: const EXPECTED_REQUEST_HEADER_FRAME: &[u8] = &[ 0x01, 0x10, 0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, 0x43, 0xd3, 0xc1, ]; // For fetch request fetch("GET", "https", "something.com", "/", &[(String::from("myheaders", "myvalue"))]) // the following request header frame will be sent: const EXPECTED_REQUEST_HEADER_FRAME_VERSION2: &[u8] = &[ 0x01, 0x11, 0x02, 0x80, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, 0x43, 0xd3, 0xc1, 0x10, ]; const HTTP_HEADER_FRAME_0: &[u8] = &[0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30]; // The response header from HTTP_HEADER_FRAME (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30) are // decoded into: fn check_response_header_0(header: &[Header]) { let expected_response_header_0 = &[ Header::new(":status", "200"), Header::new("content-length", "0"), ]; assert_eq!(header, expected_response_header_0); } const HTTP_RESPONSE_1: &[u8] = &[ // headers 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // the first data frame 0x0, 0x3, 0x61, 0x62, 0x63, // the second data frame 0x0, 0x4, 0x64, 0x65, 0x66, 0x67, ]; const HTTP_RESPONSE_HEADER_ONLY_1: &[u8] = &[ // headers 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, ]; const HTTP_RESPONSE_DATA_FRAME_1_ONLY_1: &[u8] = &[0x0, 0x3, 0x61, 0x62, 0x63]; const HTTP_RESPONSE_DATA_FRAME_2_ONLY_1: &[u8] = &[0x0, 0x4, 0x64, 0x65, 0x66, 0x67]; // The response header from HTTP_RESPONSE_1 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are // decoded into: fn check_response_header_1(header: &[Header]) { let expected_response_header_1 = &[ Header::new(":status", "200"), Header::new("content-length", "7"), ]; assert_eq!(header, expected_response_header_1); } const EXPECTED_RESPONSE_DATA_1: &[u8] = &[0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67]; const HTTP_RESPONSE_2: &[u8] = &[ // headers 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33, // the data frame 0x0, 0x3, 0x61, 0x62, 0x63, ]; const HTTP_RESPONSE_HEADER_ONLY_2: &[u8] = &[ // headers 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33, ]; const HTTP_RESPONSE_DATA_FRAME_ONLY_2: &[u8] = &[ // the data frame 0x0, 0x3, 0x61, 0x62, 0x63, ]; // The response header from HTTP_RESPONSE_2 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are // decoded into: fn check_response_header_2(header: &[Header]) { let expected_response_header_2 = &[ Header::new(":status", "200"), Header::new("content-length", "3"), ]; assert_eq!(header, expected_response_header_2); } // The data frame payload from HTTP_RESPONSE_2 is: const EXPECTED_RESPONSE_DATA_2_FRAME_1: &[u8] = &[0x61, 0x62, 0x63]; fn make_request_and_exchange_pkts( client: &mut Http3Client, server: &mut TestServer, close_sending_side: bool, ) -> StreamId { let request_stream_id = make_request(client, close_sending_side, &[]); let dgram = client.process(None, now()).dgram(); server.conn.process_input(dgram.unwrap(), now()); // find the new request/response stream and send frame v on it. while let Some(e) = server.conn.next_event() { match e { ConnectionEvent::NewStream { stream_id } => { assert_eq!(stream_id, request_stream_id); assert_eq!(stream_id.stream_type(), StreamType::BiDi); } ConnectionEvent::RecvStreamReadable { stream_id } => { if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID { server.read_and_check_stream_data( stream_id, ENCODER_STREAM_CAP_INSTRUCTION, false, ); } else { assert_eq!(stream_id, request_stream_id); server.read_and_check_stream_data( stream_id, EXPECTED_REQUEST_HEADER_FRAME, close_sending_side, ); } } _ => {} } } let dgram = server.conn.process_output(now()).dgram(); if let Some(d) = dgram { client.process_input(d, now()); } request_stream_id } fn connect_and_send_request(close_sending_side: bool) -> (Http3Client, TestServer, StreamId) { let (mut client, mut server) = connect(); let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, close_sending_side); assert_eq!(request_stream_id, 0); (client, server, request_stream_id) } fn server_send_response_and_exchange_packet( client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, response: impl AsRef<[u8]>, close_stream: bool, ) { let _ = server .conn .stream_send(stream_id, response.as_ref()) .unwrap(); if close_stream { server.conn.stream_close_send(stream_id).unwrap(); } let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); } const PUSH_PROMISE_DATA: &[u8] = &[ 0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, 0x43, 0xd3, 0xc1, ]; fn check_pushpromise_header(header: &[Header]) { let expected_response_header_1 = &[ Header::new(":method", "GET"), Header::new(":scheme", "https"), Header::new(":authority", "something.com"), Header::new(":path", "/"), ]; assert_eq!(header, expected_response_header_1); } // Send a push promise with push_id and request_stream_id. fn send_push_promise(conn: &mut Connection, stream_id: StreamId, push_id: u64) { let frame = HFrame::PushPromise { push_id, header_block: PUSH_PROMISE_DATA.to_vec(), }; let mut d = Encoder::default(); frame.encode(&mut d); let _ = conn.stream_send(stream_id, d.as_ref()).unwrap(); } fn send_push_data_and_exchange_packets( client: &mut Http3Client, server: &mut TestServer, push_id: u8, close_push_stream: bool, ) -> StreamId { let push_stream_id = send_push_data(&mut server.conn, push_id, close_push_stream); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); push_stream_id } fn send_push_promise_and_exchange_packets( client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, push_id: u64, ) { send_push_promise(&mut server.conn, stream_id, push_id); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); } fn send_cancel_push_and_exchange_packets( client: &mut Http3Client, server: &mut TestServer, push_id: u64, ) { let frame = HFrame::CancelPush { push_id }; let mut d = Encoder::default(); frame.encode(&mut d); server .conn .stream_send(server.control_stream_id.unwrap(), d.as_ref()) .unwrap(); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); } const PUSH_DATA: &[u8] = &[ // headers 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34, // the data frame. 0x0, 0x4, 0x61, 0x62, 0x63, 0x64, ]; // The response header from PUSH_DATA (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34) are // decoded into: fn check_push_response_header(header: &[Header]) { let expected_push_response_header = vec![ Header::new(":status", "200"), Header::new("content-length", "4"), ]; assert_eq!(header, &expected_push_response_header[..]); } // The data frame payload from PUSH_DATA is: const EXPECTED_PUSH_RESPONSE_DATA_FRAME: &[u8] = &[0x61, 0x62, 0x63, 0x64]; // Send push data on a push stream: // 1) push_stream_type PUSH_STREAM_TYPE // 2) push_id // 3) PUSH_DATA that contains encoded headers and a data frame. // This function can only handle small push_id numbers that fit in a varint of length 1 byte. fn send_data_on_push( conn: &mut Connection, push_stream_id: StreamId, push_id: u8, data: impl AsRef<[u8]>, close_push_stream: bool, ) { // send data let _ = conn.stream_send(push_stream_id, PUSH_STREAM_TYPE).unwrap(); let _ = conn.stream_send(push_stream_id, &[push_id]).unwrap(); let _ = conn.stream_send(push_stream_id, data.as_ref()).unwrap(); if close_push_stream { conn.stream_close_send(push_stream_id).unwrap(); } } // Send push data on a push stream: // 1) push_stream_type PUSH_STREAM_TYPE // 2) push_id // 3) PUSH_DATA that contains encoded headers and a data frame. // This function can only handle small push_id numbers that fit in a varint of length 1 byte. fn send_push_data(conn: &mut Connection, push_id: u8, close_push_stream: bool) -> StreamId { send_push_with_data(conn, push_id, PUSH_DATA, close_push_stream) } // Send push data on a push stream: // 1) push_stream_type PUSH_STREAM_TYPE // 2) push_id // 3) and supplied push data. // This function can only handle small push_id numbers that fit in a varint of length 1 byte. fn send_push_with_data( conn: &mut Connection, push_id: u8, data: &[u8], close_push_stream: bool, ) -> StreamId { // create a push stream let push_stream_id = conn.stream_create(StreamType::UniDi).unwrap(); // send data send_data_on_push(conn, push_stream_id, push_id, data, close_push_stream); push_stream_id } struct PushPromiseInfo { pub push_id: u64, pub ref_stream_id: StreamId, } // Helper function: read response when a server sends: // - HTTP_RESPONSE_2 on the request_stream_id stream, // - a number of push promises described by a list of PushPromiseInfo. // - and a push streams with push_id in the push_streams list. // All push stream contain PUSH_DATA that decodes to headers (that can be checked by calling // check_push_response_header) and EXPECTED_PUSH_RESPONSE_DATA_FRAME fn read_response_and_push_events( client: &mut Http3Client, push_promises: &[PushPromiseInfo], push_streams: &[u64], response_stream_id: StreamId, ) { let mut num_push_promises = 0; let mut num_push_stream_headers = 0; let mut num_push_stream_data = 0; while let Some(e) = client.next_event() { match e { Http3ClientEvent::PushPromise { push_id, request_stream_id, headers, } => { assert!(push_promises .iter() .any(|p| p.push_id == push_id && p.ref_stream_id == request_stream_id)); check_pushpromise_header(&headers[..]); num_push_promises += 1; } Http3ClientEvent::PushHeaderReady { push_id, headers, interim, fin, } => { assert!(push_streams.contains(&push_id)); check_push_response_header(&headers); num_push_stream_headers += 1; assert!(!fin); assert!(!interim); } Http3ClientEvent::PushDataReadable { push_id } => { assert!(push_streams.contains(&push_id)); let mut buf = [0_u8; 100]; let (amount, fin) = client.push_read_data(now(), push_id, &mut buf).unwrap(); assert!(fin); assert_eq!(amount, EXPECTED_PUSH_RESPONSE_DATA_FRAME.len()); assert_eq!(&buf[..amount], EXPECTED_PUSH_RESPONSE_DATA_FRAME); num_push_stream_data += 1; } Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, response_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, response_stream_id); let mut buf = [0_u8; 100]; let (amount, _) = client.read_data(now(), stream_id, &mut buf).unwrap(); assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); } _ => {} } } assert_eq!(num_push_promises, push_promises.len()); assert_eq!(num_push_stream_headers, push_streams.len()); assert_eq!(num_push_stream_data, push_streams.len()); } // Client: Test receiving a new control stream and a SETTINGS frame. #[test] fn test_client_connect_and_exchange_qpack_and_control_streams() { mem::drop(connect()); } // Client: Test that the connection will be closed if control stream // has been closed. #[test] fn test_client_close_control_stream() { let (mut client, mut server) = connect(); server .conn .stream_close_send(server.control_stream_id.unwrap()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the local control stream // has been reset. #[test] fn test_client_reset_control_stream() { let (mut client, mut server) = connect(); server .conn .stream_reset_send(server.control_stream_id.unwrap(), Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the server side encoder stream // has been reset. #[test] fn test_client_reset_server_side_encoder_stream() { let (mut client, mut server) = connect(); server .conn .stream_reset_send(server.encoder_stream_id.unwrap(), Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the server side decoder stream // has been reset. #[test] fn test_client_reset_server_side_decoder_stream() { let (mut client, mut server) = connect(); server .conn .stream_reset_send(server.decoder_stream_id.unwrap(), Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the local control stream // has received a stop_sending. #[test] fn test_client_stop_sending_control_stream() { let (mut client, mut server) = connect(); server .conn .stream_stop_sending(CLIENT_SIDE_CONTROL_STREAM_ID, Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the client side encoder stream // has received a stop_sending. #[test] fn test_client_stop_sending_encoder_stream() { let (mut client, mut server) = connect(); server .conn .stream_stop_sending(CLIENT_SIDE_ENCODER_STREAM_ID, Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: Test that the connection will be closed if the client side decoder stream // has received a stop_sending. #[test] fn test_client_stop_sending_decoder_stream() { let (mut client, mut server) = connect(); server .conn .stream_stop_sending(CLIENT_SIDE_DECODER_STREAM_ID, Error::HttpNoError.code()) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpClosedCriticalStream); } // Client: test missing SETTINGS frame // (the first frame sent is a garbage frame). #[test] fn test_client_missing_settings() { let (mut client, mut server) = connect_only_transport(); // Create server control stream. let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); // Send a HEADERS frame instead (which contains garbage). let sent = server .conn .stream_send(control_stream, &[0x0, 0x1, 0x3, 0x0, 0x1, 0x2]); assert_eq!(sent, Ok(6)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpMissingSettings); } // Client: receiving SETTINGS frame twice causes connection close // with error HTTP_UNEXPECTED_FRAME. #[test] fn test_client_receive_settings_twice() { let (mut client, mut server) = connect(); // send the second SETTINGS frame. let sent = server.conn.stream_send( server.control_stream_id.unwrap(), &[0x4, 0x6, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64], ); assert_eq!(sent, Ok(8)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpFrameUnexpected); } fn test_wrong_frame_on_control_stream(v: &[u8]) { let (mut client, mut server) = connect(); // send a frame that is not allowed on the control stream. let _ = server .conn .stream_send(server.control_stream_id.unwrap(), v) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpFrameUnexpected); } // send DATA frame on a cortrol stream #[test] fn test_data_frame_on_control_stream() { test_wrong_frame_on_control_stream(&[0x0, 0x2, 0x1, 0x2]); } // send HEADERS frame on a cortrol stream #[test] fn test_headers_frame_on_control_stream() { test_wrong_frame_on_control_stream(&[0x1, 0x2, 0x1, 0x2]); } // send PUSH_PROMISE frame on a cortrol stream #[test] fn test_push_promise_frame_on_control_stream() { test_wrong_frame_on_control_stream(&[0x5, 0x2, 0x1, 0x2]); } // send PRIORITY_UPDATE frame on a control stream to the client #[test] fn test_priority_update_request_on_control_stream() { test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); } #[test] fn test_priority_update_push_on_control_stream() { test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); } fn test_wrong_frame_on_push_stream(v: &[u8]) { let (mut client, mut server, request_stream_id) = connect_and_send_request(false); send_push_promise(&mut server.conn, request_stream_id, 0); // Create a push stream let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); // Send the push stream type byte, push_id and frame v. let _ = server .conn .stream_send(push_stream_id, &[0x01, 0x0]) .unwrap(); let _ = server.conn.stream_send(push_stream_id, v).unwrap(); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); assert_closed(&client, &Error::HttpFrameUnexpected); } #[test] fn test_cancel_push_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0x3, 0x1, 0x5]); } #[test] fn test_settings_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]); } #[test] fn test_push_promise_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0x5, 0x2, 0x1, 0x2]); } #[test] fn test_priority_update_request_on_push_stream() { test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); } #[test] fn test_priority_update_push_on_push_stream() { test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); } #[test] fn test_goaway_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0x7, 0x1, 0x5]); } #[test] fn test_max_push_id_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0xd, 0x1, 0x5]); } // send DATA frame before a header frame #[test] fn test_data_frame_on_push_stream() { test_wrong_frame_on_push_stream(&[0x0, 0x2, 0x1, 0x2]); } // Client: receive unknown stream type // This function also tests getting stream id that does not fit into a single byte. #[test] fn test_client_received_unknown_stream() { let (mut client, mut server) = connect(); // create a stream with unknown type. let new_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); let _ = server .conn .stream_send(new_stream_id, &[0x41, 0x19, 0x4, 0x4, 0x6, 0x0, 0x8, 0x0]) .unwrap(); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); // check for stop-sending with Error::HttpStreamCreation. let mut stop_sending_event_found = false; while let Some(e) = server.conn.next_event() { if let ConnectionEvent::SendStreamStopSending { stream_id, app_error, } = e { stop_sending_event_found = true; assert_eq!(stream_id, new_stream_id); assert_eq!(app_error, Error::HttpStreamCreation.code()); } } assert!(stop_sending_event_found); assert_eq!(client.state(), Http3State::Connected); } // Test wrong frame on req/rec stream fn test_wrong_frame_on_request_stream(v: &[u8]) { let (mut client, mut server, request_stream_id) = connect_and_send_request(false); let _ = server.conn.stream_send(request_stream_id, v).unwrap(); // Generate packet with the above bad h3 input let out = server.conn.process(None, now()); // Process bad input and close the connection. mem::drop(client.process(out.dgram(), now())); assert_closed(&client, &Error::HttpFrameUnexpected); } #[test] fn test_cancel_push_frame_on_request_stream() { test_wrong_frame_on_request_stream(&[0x3, 0x1, 0x5]); } #[test] fn test_settings_frame_on_request_stream() { test_wrong_frame_on_request_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]); } #[test] fn test_goaway_frame_on_request_stream() { test_wrong_frame_on_request_stream(&[0x7, 0x1, 0x5]); } #[test] fn test_max_push_id_frame_on_request_stream() { test_wrong_frame_on_request_stream(&[0xd, 0x1, 0x5]); } #[test] fn test_priority_update_request_on_request_stream() { test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]); } #[test] fn test_priority_update_push_on_request_stream() { test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]); } // Test reading of a slowly streamed frame. bytes are received one by one #[test] fn test_frame_reading() { let (mut client, mut server) = connect_only_transport(); // create a control stream. let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); // send the stream type let mut sent = server.conn.stream_send(control_stream, &[0x0]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // start sending SETTINGS frame sent = server.conn.stream_send(control_stream, &[0x4]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x4]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x6]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x0]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x8]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x0]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_eq!(client.state(), Http3State::Connected); // Now test PushPromise sent = server.conn.stream_send(control_stream, &[0x5]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x5]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x4]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x61]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x62]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x63]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); sent = server.conn.stream_send(control_stream, &[0x64]); assert_eq!(sent, Ok(1)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // PUSH_PROMISE on a control stream will cause an error assert_closed(&client, &Error::HttpFrameUnexpected); } #[test] fn fetch_basic() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // send response - 200 Content-Length: 7 // with content: 'abcdefg'. // The content will be send in 2 DATA frames. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, true, ); let http_events = client.events().collect::>(); assert_eq!(http_events.len(), 2); for e in http_events { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_1(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!(amount, EXPECTED_RESPONSE_DATA_1.len()); assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_1); } _ => {} } } // after this stream will be removed from hcoon. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } /// Force both endpoints into an idle state. /// Do this by opening unidirectional streams at both endpoints and sending /// a partial unidirectional stream type (which the receiver has to buffer), /// then delivering packets out of order. /// This forces the receiver to create an acknowledgment, which will allow /// the peer to become idle. fn force_idle(client: &mut Http3Client, server: &mut TestServer) { // Send a partial unidirectional stream ID. // Note that this can't close the stream as that causes the receiver // to send `MAX_STREAMS`, which would prevent it from becoming idle. fn dgram(c: &mut Connection) -> Datagram { let stream = c.stream_create(StreamType::UniDi).unwrap(); let _ = c.stream_send(stream, &[0xc0]).unwrap(); c.process_output(now()).dgram().unwrap() } let d1 = dgram(&mut client.conn); let d2 = dgram(&mut client.conn); server.conn.process_input(d2, now()); server.conn.process_input(d1, now()); let d3 = dgram(&mut server.conn); let d4 = dgram(&mut server.conn); client.process_input(d4, now()); client.process_input(d3, now()); let ack = client.process_output(now()).dgram(); server.conn.process_input(ack.unwrap(), now()); } /// The client should keep a connection alive if it has unanswered requests. #[test] fn fetch_keep_alive() { let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); force_idle(&mut client, &mut server); let idle_timeout = ConnectionParameters::default().get_idle_timeout(); assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } // Helper function: read response when a server sends HTTP_RESPONSE_2. fn read_response( client: &mut Http3Client, server: &mut Connection, request_stream_id: StreamId, ) { let out = server.process(None, now()); client.process(out.dgram(), now()); while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); } _ => {} } } // after this stream will be removed from client. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } // Data sent with a request: const REQUEST_BODY: &[u8] = &[0x64, 0x65, 0x66]; // Corresponding data frame that server will receive. const EXPECTED_REQUEST_BODY_FRAME: &[u8] = &[0x0, 0x3, 0x64, 0x65, 0x66]; // Send a request with the request body. #[test] fn fetch_with_data() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Get DataWritable for the request stream so that we can write the request body. let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); assert!(client.events().any(data_writable)); let sent = client.send_data(request_stream_id, REQUEST_BODY).unwrap(); assert_eq!(sent, REQUEST_BODY.len()); client.stream_close_send(request_stream_id).unwrap(); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); // find the new request/response stream and send response on it. while let Some(e) = server.conn.next_event() { match e { ConnectionEvent::NewStream { stream_id } => { assert_eq!(stream_id, request_stream_id); assert_eq!(stream_id.stream_type(), StreamType::BiDi); } ConnectionEvent::RecvStreamReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); // Read request body. let mut buf = [0_u8; 100]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!(amount, EXPECTED_REQUEST_BODY_FRAME.len()); assert_eq!(&buf[..amount], EXPECTED_REQUEST_BODY_FRAME); // send response - 200 Content-Length: 3 // with content: 'abc'. let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); server.conn.stream_close_send(stream_id).unwrap(); } _ => {} } } read_response(&mut client, &mut server.conn, request_stream_id); } // send a request with request body containing request_body. We expect to receive expected_data_frame_header. fn fetch_with_data_length_xbytes(request_body: &[u8], expected_data_frame_header: &[u8]) { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Get DataWritable for the request stream so that we can write the request body. let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); assert!(client.events().any(data_writable)); let sent = client.send_data(request_stream_id, request_body); assert_eq!(sent, Ok(request_body.len())); // Close stream. client.stream_close_send(request_stream_id).unwrap(); // We need to loop a bit until all data has been sent. let mut out = client.process(None, now()); for _i in 0..20 { out = server.conn.process(out.dgram(), now()); out = client.process(out.dgram(), now()); } // check request body is received. // Then send a response. while let Some(e) = server.conn.next_event() { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read the DATA frame. let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!( amount, request_body.len() + expected_data_frame_header.len() ); // Check the DATA frame header assert_eq!( &buf[..expected_data_frame_header.len()], expected_data_frame_header ); // Check data. assert_eq!(&buf[expected_data_frame_header.len()..amount], request_body); // send response - 200 Content-Length: 3 // with content: 'abc'. let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); server.conn.stream_close_send(stream_id).unwrap(); } } } read_response(&mut client, &mut server.conn, request_stream_id); } // send a request with 63 bytes. The DATA frame length field will still have 1 byte. #[test] fn fetch_with_data_length_63bytes() { fetch_with_data_length_xbytes(&[0_u8; 63], &[0x0, 0x3f]); } // send a request with 64 bytes. The DATA frame length field will need 2 byte. #[test] fn fetch_with_data_length_64bytes() { fetch_with_data_length_xbytes(&[0_u8; 64], &[0x0, 0x40, 0x40]); } // send a request with 16383 bytes. The DATA frame length field will still have 2 byte. #[test] fn fetch_with_data_length_16383bytes() { fetch_with_data_length_xbytes(&[0_u8; 16383], &[0x0, 0x7f, 0xff]); } // send a request with 16384 bytes. The DATA frame length field will need 4 byte. #[test] fn fetch_with_data_length_16384bytes() { fetch_with_data_length_xbytes(&[0_u8; 16384], &[0x0, 0x80, 0x0, 0x40, 0x0]); } // Send 2 data frames so that the second one cannot fit into the send_buf and it is only // partialy sent. We check that the sent data is correct. #[allow(clippy::useless_vec)] fn fetch_with_two_data_frames( first_frame: &[u8], expected_first_data_frame_header: &[u8], expected_second_data_frame_header: &[u8], expected_second_data_frame: &[u8], ) { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Get DataWritable for the request stream so that we can write the request body. let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); assert!(client.events().any(data_writable)); // Send the first frame. let sent = client.send_data(request_stream_id, first_frame); assert_eq!(sent, Ok(first_frame.len())); // The second frame cannot fit. let sent = client.send_data(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]); assert_eq!(sent, Ok(expected_second_data_frame.len())); // Close stream. client.stream_close_send(request_stream_id).unwrap(); let mut out = client.process(None, now()); // We need to loop a bit until all data has been sent. Once for every 1K // of data. for _i in 0..SEND_BUFFER_SIZE / 1000 { out = server.conn.process(out.dgram(), now()); out = client.process(out.dgram(), now()); } // check received frames and send a response. while let Some(e) = server.conn.next_event() { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { if stream_id == request_stream_id { // Read DATA frames. let mut buf = vec![1_u8; RECV_BUFFER_SIZE]; let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!( amount, expected_first_data_frame_header.len() + first_frame.len() + expected_second_data_frame_header.len() + expected_second_data_frame.len() ); // Check the first DATA frame header let end = expected_first_data_frame_header.len(); assert_eq!(&buf[..end], expected_first_data_frame_header); // Check the first frame data. let start = end; let end = end + first_frame.len(); assert_eq!(&buf[start..end], first_frame); // Check the second DATA frame header let start2 = end; let end2 = end + expected_second_data_frame_header.len(); assert_eq!(&buf[start2..end2], expected_second_data_frame_header); // Check the second frame data. let start3 = end2; let end3 = end2 + expected_second_data_frame.len(); assert_eq!(&buf[start3..end3], expected_second_data_frame); // send response - 200 Content-Length: 3 // with content: 'abc'. let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap(); server.conn.stream_close_send(stream_id).unwrap(); } } } read_response(&mut client, &mut server.conn, request_stream_id); } fn alloc_buffer(size: usize) -> (Vec, Vec) { let data_frame = HFrame::Data { len: size as u64 }; let mut enc = Encoder::default(); data_frame.encode(&mut enc); (vec![0_u8; size], enc.as_ref().to_vec()) } // Send 2 frames. For the second one we can only send 63 bytes. // After the first frame there is exactly 63+2 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_63bytes() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } // Send 2 frames. For the second one we can only send 63 bytes. // After the first frame there is exactly 63+3 bytes left in the send buffer, // but we can only send 63 bytes. #[test] fn fetch_two_data_frame_second_63bytes_place_for_66() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]); } // Send 2 frames. For the second one we can only send 64 bytes. // After the first frame there is exactly 64+3 bytes left in the send buffer, // but we can only send 64 bytes. #[test] fn fetch_two_data_frame_second_64bytes_place_for_67() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]); } // Send 2 frames. For the second one we can only send 16383 bytes. // After the first frame there is exactly 16383+3 bytes left in the send buffer. #[test] fn fetch_two_data_frame_second_16383bytes() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } // Send 2 frames. For the second one we can only send 16383 bytes. // After the first frame there is exactly 16383+4 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16387() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } // Send 2 frames. For the second one we can only send 16383 bytes. // After the first frame there is exactly 16383+5 bytes left in the send buffer, but we can only send 16383 bytes. #[test] fn fetch_two_data_frame_second_16383bytes_place_for_16388() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]); } // Send 2 frames. For the second one we can send 16384 bytes. // After the first frame there is exactly 16384+5 bytes left in the send buffer, but we can send 16384 bytes. #[test] fn fetch_two_data_frame_second_16384bytes_place_for_16389() { let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412); fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]); } // Test receiving STOP_SENDING with the HttpNoError error code. #[test] fn test_stop_sending_early_response() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Stop sending with early_response. assert_eq!( Ok(()), server .conn .stream_stop_sending(request_stream_id, Error::HttpNoError.code()) ); // send response - 200 Content-Length: 3 // with content: 'abc'. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); let mut stop_sending = false; let mut response_headers = false; let mut response_body = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { stream_id, error } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpNoError.code()); // assert that we cannot send any more request data. assert_eq!( Err(Error::InvalidStreamId), client.send_data(request_stream_id, &[0_u8; 10]) ); stop_sending = true; } Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); response_headers = true; } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); assert!(fin); assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); response_body = true; } _ => {} } } assert!(response_headers); assert!(response_body); assert!(stop_sending); // after this stream will be removed from client. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } // Server sends stop sending and reset. #[test] fn test_stop_sending_other_error_with_reset() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Stop sending with RequestRejected. assert_eq!( Ok(()), server .conn .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code()) ); // also reset with RequestRejected. assert_eq!( Ok(()), server .conn .stream_reset_send(request_stream_id, Error::HttpRequestRejected.code()) ); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut reset = false; let mut stop_sending = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { stream_id, error } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestRejected.code()); stop_sending = true; } Http3ClientEvent::Reset { stream_id, error, local, } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestRejected.code()); assert!(!local); reset = true; } Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { panic!("We should not get any headers or data"); } _ => {} } } assert!(reset); assert!(stop_sending); // after this stream will be removed from client. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } // Server sends stop sending with RequestRejected, but it does not send reset. #[test] fn test_stop_sending_other_error_wo_reset() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Stop sending with RequestRejected. assert_eq!( Ok(()), server .conn .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code()) ); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut stop_sending = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { stream_id, error } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestRejected.code()); stop_sending = true; } Http3ClientEvent::Reset { .. } => { panic!("We should not get StopSending."); } Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { panic!("We should not get any headers or data"); } _ => {} } } assert!(stop_sending); // after this we can still read from a stream. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_ok()); client.close(now(), 0, ""); } // Server sends stop sending and reset. We have some events for that stream already // in client.events. The events will be removed. #[test] fn test_stop_sending_and_reset_other_error_with_events() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // send response - 200 Content-Length: 3 // with content: 'abc'. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, false, ); // At this moment we have some new events, i.e. a HeadersReady event // Send a stop sending and reset. assert_eq!( Ok(()), server .conn .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code()) ); assert_eq!( Ok(()), server .conn .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) ); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut reset = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { stream_id, error } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestCancelled.code()); } Http3ClientEvent::Reset { stream_id, error, local, } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestCancelled.code()); assert!(!local); reset = true; } Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { panic!("We should not get any headers or data"); } _ => {} } } assert!(reset); // after this stream will be removed from client. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } // Server sends stop sending with code that is not HttpNoError. // We have some events for that stream already in the client.events. // The events will be removed. #[test] fn test_stop_sending_other_error_with_events() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // send response - 200 Content-Length: 3 // with content: 'abc'. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, false, ); // At this moment we have some new event, i.e. a HeadersReady event // Send a stop sending. assert_eq!( Ok(()), server .conn .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code()) ); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut stop_sending = false; let mut header_ready = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { stream_id, error } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestCancelled.code()); stop_sending = true; } Http3ClientEvent::Reset { .. } => { panic!("We should not get StopSending."); } Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { header_ready = true; } _ => {} } } assert!(stop_sending); assert!(header_ready); // after this, we can sill read data from a sttream. let mut buf = [0_u8; 100]; let (amount, fin) = client .read_data(now(), request_stream_id, &mut buf) .unwrap(); assert!(!fin); assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1); client.close(now(), 0, ""); } // Server sends a reset. We will close sending side as well. #[test] fn test_reset_wo_stop_sending() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Send a reset. assert_eq!( Ok(()), server .conn .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) ); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut reset = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::StopSending { .. } => { panic!("We should not get StopSending."); } Http3ClientEvent::Reset { stream_id, error, local, } => { assert_eq!(stream_id, request_stream_id); assert_eq!(error, Error::HttpRequestCancelled.code()); assert!(!local); reset = true; } Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => { panic!("We should not get any headers or data"); } _ => {} } } assert!(reset); // after this stream will be removed from client. We will check this by trying to read // from the stream and that should fail. let mut buf = [0_u8; 100]; let res = client.read_data(now(), request_stream_id, &mut buf); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); client.close(now(), 0, ""); } fn test_incomplet_frame(buf: &[u8], error: &Error) { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, buf, true, ); while let Some(e) = client.next_event() { if let Http3ClientEvent::DataReadable { stream_id } = e { assert_eq!(stream_id, request_stream_id); let mut buf_res = [0_u8; 100]; let res = client.read_data(now(), stream_id, &mut buf_res); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::HttpFrame); } } assert_closed(&client, error); } // Incomplete DATA frame #[test] fn test_incomplet_data_frame() { test_incomplet_frame(&HTTP_RESPONSE_2[..12], &Error::HttpFrame); } // Incomplete HEADERS frame #[test] fn test_incomplet_headers_frame() { test_incomplet_frame(&HTTP_RESPONSE_2[..7], &Error::HttpFrame); } #[test] fn test_incomplet_unknown_frame() { test_incomplet_frame(&[0x21], &Error::HttpFrame); } // test goaway #[test] fn test_goaway() { let (mut client, mut server) = connect(); let request_stream_id_1 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_1, 0); let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let request_stream_id_3 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_3, 8); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) .unwrap(); // find the new request/response stream and send frame v on it. while let Some(e) = server.conn.next_event() { if let ConnectionEvent::RecvStreamReadable { stream_id } = e { let mut buf = [0_u8; 100]; let _ = server.conn.stream_recv(stream_id, &mut buf).unwrap(); if (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2) { // send response - 200 Content-Length: 7 // with content: 'abcdefg'. // The content will be send in 2 DATA frames. let _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_1).unwrap(); server.conn.stream_close_send(stream_id).unwrap(); } } } let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let mut stream_reset = false; while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { headers, fin, .. } => { check_response_header_1(&headers); assert!(!fin); } Http3ClientEvent::DataReadable { stream_id } => { assert!( (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2) ); let mut buf = [0_u8; 100]; assert_eq!( (EXPECTED_RESPONSE_DATA_1.len(), true), client.read_data(now(), stream_id, &mut buf).unwrap() ); } Http3ClientEvent::Reset { stream_id, error, local, } => { assert_eq!(stream_id, request_stream_id_3); assert_eq!(error, Error::HttpRequestRejected.code()); assert!(!local); stream_reset = true; } _ => {} } } assert!(stream_reset); assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8))); // Check that a new request cannot be made. assert_eq!( client.fetch( now(), "GET", &("https", "something.com", "/"), &[], Priority::default() ), Err(Error::AlreadyClosed) ); client.close(now(), 0, ""); } #[test] fn multiple_goaways() { let (mut client, mut server) = connect(); let request_stream_id_1 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_1, 0); let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let request_stream_id_3 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_3, 8); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); // First send a Goaway frame with an higher number let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Check that there is one reset for stream_id 8 let mut stream_reset_1 = 0; while let Some(e) = client.next_event() { if let Http3ClientEvent::Reset { stream_id, error, local, } = e { assert_eq!(stream_id, request_stream_id_3); assert_eq!(error, Error::HttpRequestRejected.code()); assert!(!local); stream_reset_1 += 1; } } assert_eq!(stream_reset_1, 1); assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8))); // Server sends another GOAWAY frame let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4]) .unwrap(); // Send response for stream 0 server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id_1, HTTP_RESPONSE_1, true, ); let mut stream_reset_2 = 0; while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { headers, fin, .. } => { check_response_header_1(&headers); assert!(!fin); } Http3ClientEvent::DataReadable { stream_id } => { assert!(stream_id == request_stream_id_1); let mut buf = [0_u8; 100]; assert_eq!( (EXPECTED_RESPONSE_DATA_1.len(), true), client.read_data(now(), stream_id, &mut buf).unwrap() ); } Http3ClientEvent::Reset { stream_id, error, local, } => { assert_eq!(stream_id, request_stream_id_2); assert_eq!(error, Error::HttpRequestRejected.code()); assert!(!local); stream_reset_2 += 1; } _ => {} } } assert_eq!(stream_reset_2, 1); assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4))); } #[test] fn multiple_goaways_stream_id_increased() { let (mut client, mut server) = connect(); let request_stream_id_1 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_1, 0); let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let request_stream_id_3 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_3, 8); // First send a Goaway frame with a smaller number let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4]) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4))); // Now send a Goaway frame with an higher number let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8]) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpGeneralProtocol); } #[test] fn goaway_wrong_stream_id() { let (mut client, mut server) = connect(); let _ = server .conn .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x9]) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpId); } // Close stream before headers. #[test] fn test_stream_fin_wo_headers() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // send fin before sending any data. server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Recv HeaderReady wo headers with fin. let e = client.events().next().unwrap(); assert_eq!( e, Http3ClientEvent::Reset { stream_id: request_stream_id, error: Error::HttpGeneralProtocolStream.code(), local: true, } ); // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } // Close stream imemediately after headers. #[test] fn test_stream_fin_after_headers() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2, true, ); // Recv HeaderReady with headers and fin. let e = client.events().next().unwrap(); if let Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } = e { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(fin); assert!(!interim); } else { panic!("wrong event type"); } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } // Send headers, read headers and than close stream. // We should get HeaderReady and a DataReadable #[test] fn test_stream_fin_after_headers_are_read_wo_data_frame() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send some good data wo fin server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2, false, ); // Recv headers wo fin while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { .. } => { panic!("We should not receive a DataGeadable event!"); } _ => {} }; } // ok NOW send fin server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Recv DataReadable wo data with fin while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { .. } => { panic!("We should not get another HeaderReady!"); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let res = client.read_data(now(), stream_id, &mut buf); let (len, fin) = res.expect("should read"); assert_eq!(0, len); assert!(fin); } _ => {} }; } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } // Send headers and an empty data frame, then close the stream. #[test] fn test_stream_fin_after_headers_and_a_empty_data_frame() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send headers. let _ = server .conn .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) .unwrap(); // Send an empty data frame. let _ = server .conn .stream_send(request_stream_id, &[0x00, 0x00]) .unwrap(); // ok NOW send fin server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Recv HeaderReady with fin. while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; assert_eq!(Ok((0, true)), client.read_data(now(), stream_id, &mut buf)); } _ => {} }; } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), request_stream_id, &mut buf), Err(Error::InvalidStreamId) ); } // Send headers and an empty data frame. Read headers and then close the stream. // We should get a HeaderReady without fin and a DataReadable wo data and with fin. #[test] fn test_stream_fin_after_headers_an_empty_data_frame_are_read() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send some good data wo fin // Send headers. let _ = server .conn .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) .unwrap(); // Send an empty data frame. let _ = server .conn .stream_send(request_stream_id, &[0x00, 0x00]) .unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Recv headers wo fin while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { .. } => { panic!("We should not receive a DataGeadable event!"); } _ => {} }; } // ok NOW send fin server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Recv no data, but do get fin while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { .. } => { panic!("We should not get another HeaderReady!"); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let res = client.read_data(now(), stream_id, &mut buf); let (len, fin) = res.expect("should read"); assert_eq!(0, len); assert!(fin); } _ => {} }; } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } #[test] fn test_stream_fin_after_a_data_frame() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send some good data wo fin server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, false, ); // Recv some good data wo fin while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } => { assert_eq!(stream_id, request_stream_id); check_response_header_2(&headers); assert!(!fin); assert!(!interim); } Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let res = client.read_data(now(), stream_id, &mut buf); let (len, fin) = res.expect("should have data"); assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1); assert!(!fin); } _ => {} }; } // ok NOW send fin server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // fin wo data should generate DataReadable let e = client.events().next().unwrap(); if let Http3ClientEvent::DataReadable { stream_id } = e { assert_eq!(stream_id, request_stream_id); let mut buf = [0; 100]; let res = client.read_data(now(), stream_id, &mut buf); let (len, fin) = res.expect("should read"); assert_eq!(0, len); assert!(fin); } else { panic!("wrong event type"); } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } #[test] fn test_multiple_data_frames() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send two data frames with fin server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, true, ); // Read first frame match client.events().nth(1).unwrap() { Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; assert_eq!( (EXPECTED_RESPONSE_DATA_1.len(), true), client.read_data(now(), stream_id, &mut buf).unwrap() ); } x => { panic!("event {:?}", x); } } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } #[test] fn test_receive_grease_before_response() { // Construct an unknown frame. const UNKNOWN_FRAME_LEN: usize = 832; let (mut client, mut server, request_stream_id) = connect_and_send_request(true); let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4); enc.encode_varint(1028_u64); // Arbitrary type. enc.encode_varint(UNKNOWN_FRAME_LEN as u64); let mut buf: Vec<_> = enc.into(); buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0); let _ = server.conn.stream_send(request_stream_id, &buf).unwrap(); // Send a headers and a data frame with fin server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); // Read first frame match client.events().nth(1).unwrap() { Http3ClientEvent::DataReadable { stream_id } => { assert_eq!(stream_id, request_stream_id); let mut buf = [0_u8; 100]; let (len, fin) = client.read_data(now(), stream_id, &mut buf).unwrap(); assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len()); assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1); assert!(fin); } x => { panic!("event {:?}", x); } } // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } #[test] fn test_read_frames_header_blocked() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. let encoder_inst_pkt = server.conn.process(None, now()); // Send response let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: 3 }; d_frame.encode(&mut d); d.encode(&[0x61, 0x62, 0x63]); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, true, ); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!client.events().any(header_ready_event)); // Let client receive the encoder instructions. mem::drop(client.process(encoder_inst_pkt.dgram(), now())); let out = server.conn.process(None, now()); mem::drop(client.process(out.dgram(), now())); mem::drop(client.process(None, now())); let mut recv_header = false; let mut recv_data = false; // Now the stream is unblocked and both headers and data will be consumed. while let Some(e) = client.next_event() { match e { Http3ClientEvent::HeaderReady { stream_id, .. } => { assert_eq!(stream_id, request_stream_id); recv_header = true; } Http3ClientEvent::DataReadable { stream_id } => { recv_data = true; assert_eq!(stream_id, request_stream_id); } x => { panic!("event {:?}", x); } } } assert!(recv_header && recv_data); } #[test] fn test_read_frames_header_blocked_with_fin_after_headers() { let (mut hconn, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut hconn, &mut server); let sent_headers = vec![ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "0"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &sent_headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. let encoder_inst_pkt = server.conn.process(None, now()); let mut d = Encoder::default(); hframe.encode(&mut d); server_send_response_and_exchange_packet( &mut hconn, &mut server, request_stream_id, &d, true, ); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!hconn.events().any(header_ready_event)); // Let client receive the encoder instructions. let _out = hconn.process(encoder_inst_pkt.dgram(), now()); let mut recv_header = false; // Now the stream is unblocked. After headers we will receive a fin. while let Some(e) = hconn.next_event() { if let Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } = e { assert_eq!(stream_id, request_stream_id); assert_eq!(headers.as_ref(), sent_headers); assert!(fin); assert!(!interim); recv_header = true; } else { panic!("event {:?}", e); } } assert!(recv_header); } fn exchange_token(client: &mut Http3Client, server: &mut Connection) -> ResumptionToken { server.send_ticket(now(), &[]).expect("can send ticket"); let out = server.process_output(now()); assert!(out.as_dgram_ref().is_some()); client.process_input(out.dgram().unwrap(), now()); // We do not have a token so we need to wait for a resumption token timer to trigger. client.process_output(now() + Duration::from_millis(250)); assert_eq!(client.state(), Http3State::Connected); client .events() .find_map(|e| { if let Http3ClientEvent::ResumptionToken(token) = e { Some(token) } else { None } }) .unwrap() } fn start_with_0rtt() -> (Http3Client, TestServer) { let (mut client, mut server) = connect(); let token = exchange_token(&mut client, &mut server.conn); let mut client = default_http3_client(); let server = TestServer::new(); assert_eq!(client.state(), Http3State::Initializing); client .enable_resumption(now(), &token) .expect("Set resumption token."); assert_eq!(client.state(), Http3State::ZeroRtt); let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); assert!(client.events().any(zerortt_event)); (client, server) } #[test] fn zero_rtt_negotiated() { let (mut client, mut server) = start_with_0rtt(); let out = client.process(None, now()); assert_eq!(client.state(), Http3State::ZeroRtt); assert_eq!(*server.conn.state(), State::Init); let out = server.conn.process(out.dgram(), now()); // Check that control and qpack streams are received and a // SETTINGS frame has been received. // Also qpack encoder stream will send "change capacity" instruction because it has // the peer settings already. server.check_control_qpack_request_streams_resumption( ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, EXPECTED_REQUEST_HEADER_FRAME, false, ); assert_eq!(*server.conn.state(), State::Handshaking); let out = client.process(out.dgram(), now()); assert_eq!(client.state(), Http3State::Connected); mem::drop(server.conn.process(out.dgram(), now())); assert!(server.conn.state().connected()); assert!(client.tls_info().unwrap().resumed()); assert!(server.conn.tls_info().unwrap().resumed()); } #[test] fn zero_rtt_send_request() { let (mut client, mut server) = start_with_0rtt(); let request_stream_id = make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]); assert_eq!(request_stream_id, 0); let out = client.process(None, now()); assert_eq!(client.state(), Http3State::ZeroRtt); assert_eq!(*server.conn.state(), State::Init); let out = server.conn.process(out.dgram(), now()); // Check that control and qpack streams are received and a // SETTINGS frame has been received. // Also qpack encoder stream will send "change capacity" instruction because it has // the peer settings already. server.check_control_qpack_request_streams_resumption( ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST, EXPECTED_REQUEST_HEADER_FRAME_VERSION2, true, ); assert_eq!(*server.conn.state(), State::Handshaking); let out = client.process(out.dgram(), now()); assert_eq!(client.state(), Http3State::Connected); let out = server.conn.process(out.dgram(), now()); assert!(server.conn.state().connected()); let out = client.process(out.dgram(), now()); assert!(out.as_dgram_ref().is_none()); // After the server has been connected, send a response. let res = server.conn.stream_send(request_stream_id, HTTP_RESPONSE_2); assert_eq!(res, Ok(HTTP_RESPONSE_2.len())); server.conn.stream_close_send(request_stream_id).unwrap(); read_response(&mut client, &mut server.conn, request_stream_id); assert!(client.tls_info().unwrap().resumed()); assert!(server.conn.tls_info().unwrap().resumed()); } #[test] fn zero_rtt_before_resumption_token() { let mut client = default_http3_client(); assert!(client .fetch( now(), "GET", &("https", "something.com", "/"), &[], Priority::default() ) .is_err()); } #[test] fn zero_rtt_send_reject() { let (mut client, mut server) = connect(); let token = exchange_token(&mut client, &mut server.conn); let mut client = default_http3_client(); let mut server = Connection::new_server( test_fixture::DEFAULT_KEYS, test_fixture::DEFAULT_ALPN_H3, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), ConnectionParameters::default(), ) .unwrap(); // Using a freshly initialized anti-replay context // should result in the server rejecting 0-RTT. let ar = AntiReplay::new(now(), test_fixture::ANTI_REPLAY_WINDOW, 1, 3) .expect("setup anti-replay"); server .server_enable_0rtt(&ar, AllowZeroRtt {}) .expect("enable 0-RTT"); assert_eq!(client.state(), Http3State::Initializing); client .enable_resumption(now(), &token) .expect("Set resumption token."); let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); assert!(client.events().any(zerortt_event)); // Send ClientHello. let client_hs = client.process(None, now()); assert!(client_hs.as_dgram_ref().is_some()); // Create a request let request_stream_id = make_request(&mut client, false, &[]); assert_eq!(request_stream_id, 0); let client_0rtt = client.process(None, now()); assert!(client_0rtt.as_dgram_ref().is_some()); let server_hs = server.process(client_hs.dgram(), now()); assert!(server_hs.as_dgram_ref().is_some()); // Should produce ServerHello etc... let server_ignored = server.process(client_0rtt.dgram(), now()); assert!(server_ignored.as_dgram_ref().is_none()); // The server shouldn't receive that 0-RTT data. let recvd_stream_evt = |e| matches!(e, ConnectionEvent::NewStream { .. }); assert!(!server.events().any(recvd_stream_evt)); // Client should get a rejection. let client_out = client.process(server_hs.dgram(), now()); assert!(client_out.as_dgram_ref().is_some()); let recvd_0rtt_reject = |e| e == Http3ClientEvent::ZeroRttRejected; assert!(client.events().any(recvd_0rtt_reject)); // ...and the client stream should be gone. let res = client.stream_close_send(request_stream_id); assert!(res.is_err()); assert_eq!(res.unwrap_err(), Error::InvalidStreamId); // Client will send Setting frame and open new qpack streams. mem::drop(server.process(client_out.dgram(), now())); TestServer::new_with_conn(server).check_client_control_qpack_streams_no_resumption(); // Check that we can send a request and that the stream_id starts again from 0. assert_eq!(make_request(&mut client, false, &[]), 0); } // Connect to a server, get token and reconnect using 0-rtt. Seerver sends new Settings. fn zero_rtt_change_settings( original_settings: &[HSetting], resumption_settings: &[HSetting], expected_client_state: &Http3State, expected_encoder_stream_data: &[u8], ) { let mut client = default_http3_client(); let mut server = TestServer::new_with_settings(original_settings); // Connect and get a token connect_with(&mut client, &mut server); let token = exchange_token(&mut client, &mut server.conn); let mut client = default_http3_client(); let mut server = TestServer::new_with_settings(resumption_settings); assert_eq!(client.state(), Http3State::Initializing); client .enable_resumption(now(), &token) .expect("Set resumption token."); assert_eq!(client.state(), Http3State::ZeroRtt); let out = client.process(None, now()); assert_eq!(client.state(), Http3State::ZeroRtt); assert_eq!(*server.conn.state(), State::Init); let out = server.conn.process(out.dgram(), now()); // Check that control and qpack streams anda SETTINGS frame are received. // Also qpack encoder stream will send "change capacity" instruction because it has // the peer settings already. server.check_control_qpack_request_streams_resumption( expected_encoder_stream_data, EXPECTED_REQUEST_HEADER_FRAME, false, ); assert_eq!(*server.conn.state(), State::Handshaking); let out = client.process(out.dgram(), now()); assert_eq!(client.state(), Http3State::Connected); mem::drop(server.conn.process(out.dgram(), now())); assert!(server.conn.state().connected()); assert!(client.tls_info().unwrap().resumed()); assert!(server.conn.tls_info().unwrap().resumed()); // Send new settings. let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); let mut enc = Encoder::default(); server.settings.encode(&mut enc); let mut sent = server.conn.stream_send(control_stream, CONTROL_STREAM_TYPE); assert_eq!(sent.unwrap(), CONTROL_STREAM_TYPE.len()); sent = server.conn.stream_send(control_stream, enc.as_ref()); assert_eq!(sent.unwrap(), enc.len()); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_eq!(&client.state(), expected_client_state); assert!(server.conn.state().connected()); } #[test] fn zero_rtt_new_server_setting_are_the_same() { // Send a new server settings that are the same as the old one. zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Connected, ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_omit_max_table() { // Send a new server settings without MaxTableCapacity zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_omit_blocked_streams() { // Send a new server settings without BlockedStreams zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_omit_header_list_size() { // Send a new server settings without MaxHeaderListSize zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), ], &Http3State::Connected, ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_max_table_size_bigger() { // Send a new server settings MaxTableCapacity=200 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 200), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(514)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_max_table_size_smaller() { // Send a new server settings MaxTableCapacity=50 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 50), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_blocked_streams_bigger() { // Send a new server settings withBlockedStreams=200 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 200), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Connected, ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_blocked_streams_smaller() { // Send a new server settings withBlockedStreams=50 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 50), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_max_header_size_bigger() { // Send a new server settings with MaxHeaderListSize=20000 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 20000), ], &Http3State::Connected, ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_new_server_setting_max_headers_size_smaller() { // Send the new server settings with MaxHeaderListSize=5000 zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 5000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_max_table_size_first_omitted() { // send server original settings without MaxTableCapacity // send new server setting with MaxTableCapacity zero_rtt_change_settings( &[ HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Connected, ENCODER_STREAM_DATA, ); } #[test] fn zero_rtt_blocked_streams_first_omitted() { // Send server original settings without BlockedStreams // Send the new server settings with BlockedStreams zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Connected, ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn zero_rtt_max_header_size_first_omitted() { // Send server settings without MaxHeaderListSize // Send new settings with MaxHeaderListSize. zero_rtt_change_settings( &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 10000), ], &[ HSetting::new(HSettingType::MaxTableCapacity, 100), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ], &Http3State::Closing(ConnectionError::Application(265)), ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION, ); } #[test] fn test_trailers_with_fin_after_headers() { // Make a new connection. let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send HEADER frame. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, false, ); // Check response headers. let mut response_headers = false; while let Some(e) = client.next_event() { if let Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } = e { assert_eq!(stream_id, request_stream_id); check_response_header_0(&headers); assert!(!fin); assert!(!interim); response_headers = true; } } assert!(response_headers); // Send trailers server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, true, ); let events: Vec = client.events().collect(); // We already had HeaderReady let header_ready: fn(&Http3ClientEvent) -> _ = |e| matches!(*e, Http3ClientEvent::HeaderReady { .. }); assert!(!events.iter().any(header_ready)); // Check that we have a DataReady event. Reading from the stream will return fin=true. let data_readable: fn(&Http3ClientEvent) -> _ = |e| matches!(*e, Http3ClientEvent::DataReadable { .. }); assert!(events.iter().any(data_readable)); let mut buf = [0_u8; 100]; let (len, fin) = client .read_data(now(), request_stream_id, &mut buf) .unwrap(); assert_eq!(0, len); assert!(fin); } #[test] fn test_trailers_with_later_fin_after_headers() { // Make a new connection. let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send HEADER frame. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, false, ); // Check response headers. let mut response_headers = false; while let Some(e) = client.next_event() { if let Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } = e { assert_eq!(stream_id, request_stream_id); check_response_header_0(&headers); assert!(!fin); assert!(!interim); response_headers = true; } } assert!(response_headers); // Send trailers server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, false, ); // Check that we do not have a DataReady event. let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); assert!(!client.events().any(data_readable)); server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); let events: Vec = client.events().collect(); // We already had HeaderReady let header_ready: fn(&Http3ClientEvent) -> _ = |e| matches!(*e, Http3ClientEvent::HeaderReady { .. }); assert!(!events.iter().any(header_ready)); // Check that we have a DataReady event. Reading from the stream will return fin=true. let data_readable_fn: fn(&Http3ClientEvent) -> _ = |e| matches!(*e, Http3ClientEvent::DataReadable { .. }); assert!(events.iter().any(data_readable_fn)); let mut buf = [0_u8; 100]; let (len, fin) = client .read_data(now(), request_stream_id, &mut buf) .unwrap(); assert_eq!(0, len); assert!(fin); } #[test] fn test_data_after_trailers_after_headers() { // Make a new connection. let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send HEADER frame. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, false, ); // Check response headers. let mut response_headers = false; while let Some(e) = client.next_event() { if let Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin, } = e { assert_eq!(stream_id, request_stream_id); check_response_header_0(&headers); assert!(!fin); assert!(!interim); response_headers = true; } } assert!(response_headers); // Send trailers server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_HEADER_FRAME_0, false, ); // Check that we do not have a DataReady event. let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); assert!(!client.events().any(data_readable)); // Send Data frame. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, [0x0, 0x3, 0x61, 0x62, 0x63], // a data frame false, ); assert_closed(&client, &Error::HttpFrameUnexpected); } #[test] fn transport_stream_readable_event_after_all_data() { let (mut client, mut server, request_stream_id) = connect_and_send_request(false); // Send headers. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, false, ); // Send an empty data frame and a fin server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, [0x0, 0x0], true, ); let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Ok((3, true)) ); client.process(None, now()); } #[test] fn no_data_ready_events_after_fin() { // Connect exchange headers and send a request. Also check if the correct header frame has been sent. let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // send response - 200 Content-Length: 7 // with content: 'abcdefg'. // The content will be send in 2 DATA frames. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, true, ); let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id); assert!(client.events().any(data_readable_event)); let mut buf = [0_u8; 100]; assert_eq!( (EXPECTED_RESPONSE_DATA_1.len(), true), client .read_data(now(), request_stream_id, &mut buf) .unwrap() ); assert!(!client.events().any(data_readable_event)); } #[test] fn reading_small_chunks_of_data() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // send response - 200 Content-Length: 7 // with content: 'abcdefg'. // The content will be send in 2 DATA frames. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, true, ); let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id); assert!(client.events().any(data_readable_event)); let mut buf1 = [0_u8; 1]; assert_eq!( (1, false), client .read_data(now(), request_stream_id, &mut buf1) .unwrap() ); assert!(!client.events().any(data_readable_event)); // Now read only until the end of the first frame. The firs framee has 3 bytes. let mut buf2 = [0_u8; 2]; assert_eq!( (2, false), client .read_data(now(), request_stream_id, &mut buf2) .unwrap() ); assert!(!client.events().any(data_readable_event)); // Read a half of the second frame. assert_eq!( (2, false), client .read_data(now(), request_stream_id, &mut buf2) .unwrap() ); assert!(!client.events().any(data_readable_event)); // Read the rest. // Read a half of the second frame. assert_eq!( (2, true), client .read_data(now(), request_stream_id, &mut buf2) .unwrap() ); assert!(!client.events().any(data_readable_event)); } #[test] fn zero_length_data_at_end() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // send response - 200 Content-Length: 7 // with content: 'abcdefg'. // The content will be send in 2 DATA frames. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, false, ); // Send a zero-length frame at the end of the stream. let _ = server.conn.stream_send(request_stream_id, &[0, 0]).unwrap(); server.conn.stream_close_send(request_stream_id).unwrap(); let dgram = server.conn.process_output(now()).dgram(); client.process_input(dgram.unwrap(), now()); let data_readable_event = |e: &_| matches!(e, Http3ClientEvent::DataReadable { stream_id } if *stream_id == request_stream_id); assert_eq!(client.events().filter(data_readable_event).count(), 1); let mut buf = [0_u8; 10]; assert_eq!( (7, true), client .read_data(now(), request_stream_id, &mut buf) .unwrap() ); assert!(!client.events().any(|e| data_readable_event(&e))); } #[test] fn stream_blocked_no_remote_encoder_stream() { let (mut client, mut server) = connect_only_transport(); send_and_receive_client_settings(&mut client, &mut server); server.create_control_stream(); // Send the server's control stream data. let out = server.conn.process(None, now()); client.process(out.dgram(), now()); server.create_qpack_streams(); let qpack_pkt1 = server.conn.process(None, now()); // delay delivery of this packet. let request_stream_id = make_request(&mut client, true, &[]); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Send the encoder instructions, let out = server.conn.process(None, now()); client.process(out.dgram(), now()); // Send response let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: 3 }; d_frame.encode(&mut d); d.encode(&[0x61, 0x62, 0x63]); let _ = server .conn .stream_send(request_stream_id, d.as_ref()) .unwrap(); server.conn.stream_close_send(request_stream_id).unwrap(); let out = server.conn.process(None, now()); mem::drop(client.process(out.dgram(), now())); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!client.events().any(header_ready_event)); // Let client receive the encoder instructions. mem::drop(client.process(qpack_pkt1.dgram(), now())); assert!(client.events().any(header_ready_event)); } // Client: receive a push stream #[test] fn push_single() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, 0); // create a push stream. let _ = send_push_data(&mut server.conn, 0, true); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }], &[0], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); } /// We can't keep the connection alive on the basis of a push promise, /// nor do we want to if the push promise is not interesting to the client. /// We do the next best thing, which is keep any push stream alive if the /// client reads from it. #[test] fn push_keep_alive() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); let idle_timeout = ConnectionParameters::default().get_idle_timeout(); // Promise a push and deliver, but don't close the stream. send_push_promise(&mut server.conn, request_stream_id, 0); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }], &[], // No push streams yet. request_stream_id, ); // The client will become idle here. force_idle(&mut client, &mut server); assert_eq!(client.process_output(now()).callback(), idle_timeout); // Reading push data will stop the client from being idle. let _ = send_push_data(&mut server.conn, 0, false); let dgram = server.conn.process_output(now()).dgram(); client.process_input(dgram.unwrap(), now()); let mut buf = [0; 16]; let (read, fin) = client.push_read_data(now(), 0, &mut buf).unwrap(); assert!(read < buf.len()); assert!(!fin); force_idle(&mut client, &mut server); assert_eq!(client.process_output(now()).callback(), idle_timeout / 2); } #[test] fn push_multiple() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, 0); send_push_promise(&mut server.conn, request_stream_id, 1); // create a push stream. let _ = send_push_data(&mut server.conn, 0, true); // create a second push stream. let _ = send_push_data(&mut server.conn, 1, true); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 1, ref_stream_id: request_stream_id, }, ], &[0, 1], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); assert_eq!(client.cancel_push(1), Err(Error::InvalidStreamId)); } #[test] fn push_after_headers() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send response headers let _ = server .conn .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2) .unwrap(); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, 0); // create a push stream. let _ = send_push_data(&mut server.conn, 0, true); // Send response data server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_DATA_FRAME_ONLY_2, true, ); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }], &[0], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } #[test] fn push_after_response() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send response headers and data frames let _ = server .conn .stream_send(request_stream_id, HTTP_RESPONSE_2) .unwrap(); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, 0); // create a push stream. send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }], &[0], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } fn check_push_events(client: &mut Http3Client) -> bool { let any_push_event = |e| { matches!( e, Http3ClientEvent::PushPromise { .. } | Http3ClientEvent::PushHeaderReady { .. } | Http3ClientEvent::PushDataReadable { .. } ) }; client.events().any(any_push_event) } fn check_data_readable(client: &mut Http3Client) -> bool { let any_data_event = |e| matches!(e, Http3ClientEvent::DataReadable { .. }); client.events().any(any_data_event) } fn check_header_ready(client: &mut Http3Client) -> bool { let any_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); client.events().any(any_event) } fn check_header_ready_and_push_promise(client: &mut Http3Client) -> bool { let any_event = |e| { matches!( e, Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::PushPromise { .. } ) }; client.events().any(any_event) } #[test] fn push_stream_before_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // create a push stream. send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Now send push_promise send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }], &[0], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // Test receiving pushes out of order. // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state. // Start push_id 3 by receiving a push_promise and then a push stream with the push_id 3. #[test] fn push_out_of_order_1() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); // Start a push stream with push_id 3. send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); assert_eq!(client.state(), Http3State::Connected); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 3, ref_stream_id: request_stream_id, }, ], &[3], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // Test receiving pushes out of order. // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state. // Start push_id 3 by receiving a push stream with push_id 3 and then a push_promise. #[test] fn push_out_of_order_2() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 3, ref_stream_id: request_stream_id, }, ], &[3], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // Test receiving pushes out of order. // Push_id 5 is received first and read so that it is removed from the list, // therefore Push_id 3 will be in the PushState:Init state. // Start push_id 3 by receiving a push stream with the push_id 3 and then a push_promise. #[test] fn push_out_of_order_3() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); assert_eq!(client.state(), Http3State::Connected); // Read push stream with push_id 5 to make it change to closed state. read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }], &[5], request_stream_id, ); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3); send_push_data_and_exchange_packets(&mut client, &mut server, 3, true); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 3, ref_stream_id: request_stream_id, }], &[3], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // The next test is for receiving a second PushPromise when Push is in the PushPromise state. #[test] fn multiple_push_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); // make a second request. let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id_2, }, ], &[], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // The next test is for receiving a second PushPromise when Push is in the Active state. #[test] fn multiple_push_promise_active() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); // make a second request. let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id_2, }, ], &[5], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // The next test is for receiving a second PushPromise when the push is already closed. // PushPromise will be ignored for the push streams that are consumed. #[test] fn multiple_push_promise_closed() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5); // Start a push stream with push_id 5. send_push_data_and_exchange_packets(&mut client, &mut server, 5, true); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 5, ref_stream_id: request_stream_id, }], &[5], request_stream_id, ); // make a second request. let request_stream_id_2 = make_request(&mut client, false, &[]); assert_eq!(request_stream_id_2, 4); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5); // Check that we do not have a Http3ClientEvent::PushPromise. let push_event = |e| matches!(e, Http3ClientEvent::PushPromise { .. }); assert!(!client.events().any(push_event)); } // Test that max_push_id is enforced when a push promise frame is received. #[test] fn exceed_max_push_id_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. max_push_id is set to 5, to trigger an error we send push_id=6. send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 6); assert_closed(&client, &Error::HttpId); } // Test that max_push_id is enforced when a push stream is received. #[test] fn exceed_max_push_id_push_stream() { // Connect and send a request let (mut client, mut server) = connect(); // Send a push stream. max_push_id is set to 5, to trigger an error we send push_id=6. send_push_data_and_exchange_packets(&mut client, &mut server, 6, true); assert_closed(&client, &Error::HttpId); } // Test that max_push_id is enforced when a cancel push frame is received. #[test] fn exceed_max_push_id_cancel_push() { // Connect and send a request let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); // Send CANCEL_PUSH for push_id 6. send_cancel_push_and_exchange_packets(&mut client, &mut server, 6); assert_closed(&client, &Error::HttpId); } // Test that max_push_id is enforced when an app calls cancel_push. #[test] fn exceed_max_push_id_cancel_api() { // Connect and send a request let (mut client, _, _) = connect_and_send_request(true); assert_eq!(client.cancel_push(6), Err(Error::HttpId)); assert_eq!(client.state(), Http3State::Connected); } #[test] fn test_max_push_id_frame_update_is_sent() { const MAX_PUSH_ID_FRAME: &[u8] = &[0xd, 0x1, 0x8]; // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send 3 push promises. send_push_promise(&mut server.conn, request_stream_id, 0); send_push_promise(&mut server.conn, request_stream_id, 1); send_push_promise(&mut server.conn, request_stream_id, 2); // create 3 push streams. send_push_data(&mut server.conn, 0, true); send_push_data(&mut server.conn, 1, true); send_push_data_and_exchange_packets(&mut client, &mut server, 2, true); read_response_and_push_events( &mut client, &[ PushPromiseInfo { push_id: 0, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 1, ref_stream_id: request_stream_id, }, PushPromiseInfo { push_id: 2, ref_stream_id: request_stream_id, }, ], &[0, 1, 2], request_stream_id, ); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); // Check max_push_id frame has been received let control_stream_readable = |e| matches!(e, ConnectionEvent::RecvStreamReadable{stream_id: x} if x == 2); assert!(server.conn.events().any(control_stream_readable)); let mut buf = [0_u8; 100]; let (amount, fin) = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap(); assert!(!fin); assert_eq!(amount, MAX_PUSH_ID_FRAME.len()); assert_eq!(&buf[..3], MAX_PUSH_ID_FRAME); // Check that we can send push_id=8 now send_push_promise(&mut server.conn, request_stream_id, 8); send_push_data(&mut server.conn, 8, true); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); assert_eq!(client.state(), Http3State::Connected); read_response_and_push_events( &mut client, &[PushPromiseInfo { push_id: 8, ref_stream_id: request_stream_id, }], &[8], request_stream_id, ); assert_eq!(client.state(), Http3State::Connected); } // Test that 2 push streams with the same push_id are caught. #[test] fn duplicate_push_stream() { // Connect and send a request let (mut client, mut server, _request_stream_id) = connect_and_send_request(true); // Start a push stream with push_id 0. send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); // Send it again send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); assert_closed(&client, &Error::HttpId); } // Test that 2 push streams with the same push_id are caught. #[test] fn duplicate_push_stream_active() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise(&mut server.conn, request_stream_id, 0); send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); // Now the push_stream is in the PushState::Active state send_push_data_and_exchange_packets(&mut client, &mut server, 0, true); assert_closed(&client, &Error::HttpId); } fn assert_stop_sending_event( server: &mut TestServer, push_stream_id: StreamId, expected_error: u64, ) { assert!(server.conn.events().any(|e| matches!( e, ConnectionEvent::SendStreamStopSending { stream_id, app_error, } if stream_id == push_stream_id && app_error == expected_error ))); } // Test CANCEL_PUSH frame: after cancel push any new PUSH_PROMISE or push stream will be ignored. #[test] fn cancel_push_ignore_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); send_push_promise(&mut server.conn, request_stream_id, 0); // Start a push stream with push_id 0. let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); // Check that the push has been canceled by the client. assert_stop_sending_event( &mut server, push_stream_id, Error::HttpRequestCancelled.code(), ); assert_eq!(client.state(), Http3State::Connected); } // Test CANCEL_PUSH frame: after cancel push any already received PUSH_PROMISE or push stream // events will be removed. #[test] fn cancel_push_removes_push_events() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise(&mut server.conn, request_stream_id, 0); let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); // Check that the push has been canceled by the client. assert_stop_sending_event( &mut server, push_stream_id, Error::HttpRequestCancelled.code(), ); assert_eq!(client.state(), Http3State::Connected); } // Test CANCEL_PUSH frame: after cancel push any already received push stream will be canceled. #[test] fn cancel_push_frame_after_push_stream() { // Connect and send a request let (mut client, mut server, _) = connect_and_send_request(true); // Start a push stream with push_id 0. let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); send_cancel_push_and_exchange_packets(&mut client, &mut server, 0); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); // Check that the push has been canceled by the client. assert_stop_sending_event( &mut server, push_stream_id, Error::HttpRequestCancelled.code(), ); assert_eq!(client.state(), Http3State::Connected); } // Test a push stream reset after a new PUSH_PROMISE or/and push stream. The events will be ignored. #[test] fn cancel_push_stream_after_push_promise_and_push_stream() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise(&mut server.conn, request_stream_id, 0); // Start a push stream with push_id 0. let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); server .conn .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); let out = server.conn.process(None, now()).dgram(); client.process(out, now()); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); assert_eq!(client.state(), Http3State::Connected); } // Test that a PUSH_PROMISE will be ignored after a push stream reset. #[test] fn cancel_push_stream_before_push_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Start a push stream with push_id 0. let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); server .conn .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); let out = server.conn.process(None, now()).dgram(); client.process(out, now()); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); assert_eq!(client.state(), Http3State::Connected); } // Test that push_promise events will be removed after application calls cancel_push. #[test] fn app_cancel_push_after_push_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); assert!(client.cancel_push(0).is_ok()); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); assert_eq!(client.state(), Http3State::Connected); } // Test that push_promise and push data events will be removed after application calls cancel_push. #[test] fn app_cancel_push_after_push_promise_and_push_stream() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); assert!(client.cancel_push(0).is_ok()); let out = client.process(None, now()).dgram(); mem::drop(server.conn.process(out, now())); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); // Check that the push has been canceled by the client. assert_stop_sending_event( &mut server, push_stream_id, Error::HttpRequestCancelled.code(), ); assert_eq!(client.state(), Http3State::Connected); } // Test that push_promise events will be ignored after application calls cancel_push. #[test] fn app_cancel_push_before_push_promise() { // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); let push_stream_id = send_push_data_and_exchange_packets(&mut client, &mut server, 0, false); assert!(client.cancel_push(0).is_ok()); let out = client.process(None, now()).dgram(); mem::drop(server.conn.process(out, now())); send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0); // Assert that we do not have any push event. assert!(!check_push_events(&mut client)); // Check that the push has been closed, e.g. calling cancel_push should return InvalidStreamId. assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId)); // Check that the push has been canceled by the client. assert_stop_sending_event( &mut server, push_stream_id, Error::HttpRequestCancelled.code(), ); assert_eq!(client.state(), Http3State::Connected); } fn setup_server_side_encoder_param( client: &mut Http3Client, server: &mut TestServer, max_blocked_streams: u64, ) { server .encoder .borrow_mut() .set_max_capacity(max_blocked_streams) .unwrap(); server .encoder .borrow_mut() .set_max_blocked_streams(100) .unwrap(); server .encoder .borrow_mut() .send_encoder_updates(&mut server.conn) .unwrap(); let out = server.conn.process(None, now()); mem::drop(client.process(out.dgram(), now())); } fn setup_server_side_encoder(client: &mut Http3Client, server: &mut TestServer) { setup_server_side_encoder_param(client, server, 100); } fn send_push_promise_using_encoder( client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, push_id: u64, ) -> Option { send_push_promise_using_encoder_with_custom_headers( client, server, stream_id, push_id, Header::new("my-header", "my-value"), ) } fn send_push_promise_using_encoder_with_custom_headers( client: &mut Http3Client, server: &mut TestServer, stream_id: StreamId, push_id: u64, additional_header: Header, ) -> Option { let mut headers = vec![ Header::new(":method", "GET"), Header::new(":scheme", "https"), Header::new(":authority", "something.com"), Header::new(":path", "/"), Header::new("content-length", "3"), ]; headers.push(additional_header); let encoded_headers = server .encoder .borrow_mut() .encode_header_block(&mut server.conn, &headers, stream_id); let push_promise_frame = HFrame::PushPromise { push_id, header_block: encoded_headers.to_vec(), }; // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. let encoder_inst_pkt = server.conn.process(None, now()).dgram(); assert!(encoder_inst_pkt.is_some()); let mut d = Encoder::default(); push_promise_frame.encode(&mut d); server_send_response_and_exchange_packet(client, server, stream_id, &d, false); encoder_inst_pkt } #[test] fn push_promise_header_decoder_block() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let encoder_inst_pkt = send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt, now()); // PushPromise is blocked wathing for encoder instructions. assert!(check_push_events(&mut client)); } // If PushPromise is blocked, stream data can still be received. #[test] fn push_promise_blocked_but_stream_is_not_blocked() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); // Send response headers server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_HEADER_ONLY_1, false, ); let encoder_inst_pkt = send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); // Stream data can be still read server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_DATA_FRAME_1_ONLY_1, false, ); assert!(check_data_readable(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt, now()); // PushPromise is blocked wathing for encoder instructions. assert!(check_push_events(&mut client)); // Stream data can be still read server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_DATA_FRAME_2_ONLY_1, false, ); assert!(check_data_readable(&mut client)); } // The response Headers are not block if they do not refer the dynamic table. #[test] fn push_promise_does_not_block_headers() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let encoder_inst_pkt = send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); // Send response headers server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_HEADER_ONLY_1, false, ); assert!(check_header_ready(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt, now()); // PushPromise is blocked wathing for encoder instructions. assert!(check_push_events(&mut client)); } // The response Headers are blocked if they refer a dynamic table entry. #[test] fn push_promise_block_headers() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); // Insert an elemet into a dynamic table. // insert "content-length: 1234 server .encoder .borrow_mut() .send_and_insert(&mut server.conn, b"content-length", b"1234") .unwrap(); let encoder_inst_pkt1 = server.conn.process(None, now()).dgram(); let _out = client.process(encoder_inst_pkt1, now()); // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client. let encoder_inst_pkt2 = send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); let response_headers = vec![ Header::new(":status", "200"), Header::new("content-length", "1234"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &response_headers, request_stream_id, ); let header_hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; let mut d = Encoder::default(); header_hframe.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // The response headers are blocked. assert!(!check_header_ready(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt2, now()); // The response headers are blocked. assert!(check_header_ready_and_push_promise(&mut client)); } // In this test there are 2 push promises that are blocked and the response header is // blocked as well. After a packet is received only the first push promises is unblocked. #[test] fn two_push_promises_and_header_block() { let mut client = default_http3_client_param(200); let mut server = TestServer::new_with_settings(&[ HSetting::new(HSettingType::MaxTableCapacity, 200), HSetting::new(HSettingType::BlockedStreams, 100), HSetting::new(HSettingType::MaxHeaderListSize, 10000), ]); connect_only_transport_with(&mut client, &mut server); server.create_control_stream(); server.create_qpack_streams(); setup_server_side_encoder_param(&mut client, &mut server, 200); let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, true); // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client. let encoder_inst_pkt1 = send_push_promise_using_encoder_with_custom_headers( &mut client, &mut server, request_stream_id, 0, Header::new("myn1", "myv1"), ); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); let encoder_inst_pkt2 = send_push_promise_using_encoder_with_custom_headers( &mut client, &mut server, request_stream_id, 1, Header::new("myn2", "myv2"), ); // PushPromise is blocked wathing for encoder instructions. assert!(!check_push_events(&mut client)); let response_headers = vec![ Header::new(":status", "200"), Header::new("content-length", "1234"), Header::new("myn3", "myv3"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &response_headers, request_stream_id, ); let header_hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; let mut d = Encoder::default(); header_hframe.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // The response headers are blocked. assert!(!check_header_ready(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt1, now()); assert!(check_push_events(&mut client)); // Let client receive the encoder instructions. let _out = client.process(encoder_inst_pkt2, now()); assert!(check_header_ready_and_push_promise(&mut client)); } // The PushPromise blocked on header decoding will be canceled if the stream is closed. #[test] fn blocked_push_promises_canceled() { const STREAM_CANCELED_ID_0: &[u8] = &[0x40]; let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); mem::drop( send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) .unwrap(), ); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_1, true, ); // Read response that will make stream change to closed state. assert!(check_header_ready(&mut client)); let mut buf = [0_u8; 100]; let _ = client .read_data(now(), request_stream_id, &mut buf) .unwrap(); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); // Check that encoder got stream_canceled instruction. let mut inst = [0_u8; 100]; let (amount, fin) = server .conn .stream_recv(CLIENT_SIDE_DECODER_STREAM_ID, &mut inst) .unwrap(); assert!(!fin); assert_eq!(amount, STREAM_CANCELED_ID_0.len()); assert_eq!(&inst[..amount], STREAM_CANCELED_ID_0); } #[test] fn data_readable_in_decoder_blocked_state() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "0"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Delay encoder instruction so that the stream will be blocked. let encoder_insts = server.conn.process(None, now()); // Send response headers. let mut d = Encoder::default(); hframe.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Headers are blocked waiting fro the encoder instructions. let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!client.events().any(header_ready_event)); // Now send data frame. This will trigger DataRead event. let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: 0 }; d_frame.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, true, ); // Now read headers. mem::drop(client.process(encoder_insts.dgram(), now())); } #[test] fn qpack_stream_reset() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); // Cancel request. mem::drop(client.cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn)); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); } fn send_headers_using_encoder( client: &mut Http3Client, server: &mut TestServer, request_stream_id: StreamId, headers: &[Header], data: &[u8], ) -> Option { let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; let out = server.conn.process(None, now()); // Send response let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: u64::try_from(data.len()).unwrap(), }; d_frame.encode(&mut d); d.encode(data); server_send_response_and_exchange_packet(client, server, request_stream_id, &d, true); out.dgram() } #[test] fn qpack_stream_reset_recv() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); // Cancel request. server .conn .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); let out = server.conn.process(None, now()); let out = client.process(out.dgram(), now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn)); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); } #[test] fn qpack_stream_reset_during_header_qpack_blocked() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); mem::drop( send_headers_using_encoder( &mut client, &mut server, request_stream_id, &[ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ], &[0x61, 0x62, 0x63], ) .unwrap(), ); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!client.events().any(header_ready_event)); // Cancel request. client .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); } #[test] fn qpack_no_stream_cancelled_after_fin() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let encoder_instruct = send_headers_using_encoder( &mut client, &mut server, request_stream_id, &[ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ], &[], ); // Exchange encoder instructions mem::drop(client.process(encoder_instruct, now())); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(client.events().any(header_ready_event)); // After this the recv_stream is in ClosePending state // Cancel request. client .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); } #[test] fn qpack_stream_reset_push_promise_header_decoder_block() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new("content-length", "3"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Send the encoder instructions. let out = server.conn.process(None, now()); mem::drop(client.process(out.dgram(), now())); // Send PushPromise that will be blocked waiting for decoder instructions. mem::drop( send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0) .unwrap(), ); // Send response let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: 0 }; d_frame.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, true, ); let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(client.events().any(header_ready_event)); // Cancel request. client .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1); } #[test] fn qpack_stream_reset_dynamic_table_zero() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Cancel request. client .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()) .unwrap(); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); let out = client.process(None, now()); mem::drop(server.conn.process(out.dgram(), now())); mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap()); assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0); } #[test] fn multiple_streams_in_decoder_blocked_state() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "0"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Delay encoder instruction so that the stream will be blocked. let encoder_insts = server.conn.process(None, now()); // Send response headers. let mut d = Encoder::default(); hframe.encode(&mut d); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, true, ); // Headers are blocked waiting for the encoder instructions. let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. }); assert!(!client.events().any(header_ready_event)); // Make another request. let request2 = make_request_and_exchange_pkts(&mut client, &mut server, true); // Send response headers. server_send_response_and_exchange_packet(&mut client, &mut server, request2, &d, true); // Headers on the second request are blocked as well are blocked // waiting for the encoder instructions. assert!(!client.events().any(header_ready_event)); // Now make the encoder instructions available. mem::drop(client.process(encoder_insts.dgram(), now())); // Header blocks for both streams should be ready. let mut count_responses = 0; while let Some(e) = client.next_event() { if let Http3ClientEvent::HeaderReady { stream_id, .. } = e { assert!((stream_id == request_stream_id) || (stream_id == request2)); count_responses += 1; } } assert_eq!(count_responses, 2); } #[test] fn reserved_frames() { for f in H3_RESERVED_FRAME_TYPES { let mut enc = Encoder::default(); enc.encode_varint(*f); test_wrong_frame_on_control_stream(enc.as_ref()); test_wrong_frame_on_push_stream(enc.as_ref()); test_wrong_frame_on_request_stream(enc.as_ref()); } } #[test] fn send_reserved_settings() { for s in H3_RESERVED_SETTINGS { let (mut client, mut server) = connect_only_transport(); let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap(); // Send the control stream type(0x0). let _ = server .conn .stream_send(control_stream, CONTROL_STREAM_TYPE) .unwrap(); // Create a settings frame of length 2. let mut enc = Encoder::default(); enc.encode_varint(H3_FRAME_TYPE_SETTINGS); enc.encode_varint(2_u64); // The settings frame contains a reserved settings type and some value (0x1). enc.encode_varint(*s); enc.encode_varint(1_u64); let sent = server.conn.stream_send(control_stream, enc.as_ref()); assert_eq!(sent, Ok(4)); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpSettings); } } #[test] fn response_w_1xx() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let mut d = Encoder::default(); let headers1xx: &[Header] = &[Header::new(":status", "103")]; server.encode_headers(request_stream_id, headers1xx, &mut d); let headers200: &[Header] = &[ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; server.encode_headers(request_stream_id, headers200, &mut d); // Send 1xx and 200 headers response. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Sending response data. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_DATA_FRAME_ONLY_2, true, ); let mut events = client.events().filter_map(|e| { if let Http3ClientEvent::HeaderReady { stream_id, interim, headers, .. } = e { Some((stream_id, interim, headers)) } else { None } }); let (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap(); assert_eq!( (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()), (request_stream_id, true, headers1xx) ); let (stream_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap(); assert_eq!( (stream_id_200_rec, interim200_rec, headers200_rec.as_ref()), (request_stream_id, false, headers200) ); assert!(events.next().is_none()); } #[test] fn response_wo_status() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let mut d = Encoder::default(); let headers = vec![ Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; server.encode_headers(request_stream_id, &headers, &mut d); // Send response server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Stream has been reset because of the malformed headers. let e = client.events().next().unwrap(); assert_eq!( e, Http3ClientEvent::Reset { stream_id: request_stream_id, error: Error::InvalidHeader.code(), local: true, } ); let out = client.process(None, now()).dgram(); mem::drop(server.conn.process(out, now())); // Check that server has received a reset. let stop_sending_event = |e| { matches!(e, ConnectionEvent::SendStreamStopSending { stream_id, app_error } if stream_id == request_stream_id && app_error == Error::InvalidHeader.code()) }; assert!(server.conn.events().any(stop_sending_event)); // Stream should now be closed and gone let mut buf = [0_u8; 100]; assert_eq!( client.read_data(now(), StreamId::new(0), &mut buf), Err(Error::InvalidStreamId) ); } // Client: receive a push stream #[test] fn push_single_with_1xx() { const FIRST_PUSH_ID: u64 = 0; // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID); // Create a push stream let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); let mut d = Encoder::default(); let headers1xx: &[Header] = &[Header::new(":status", "100")]; server.encode_headers(push_stream_id, headers1xx, &mut d); let headers200: &[Header] = &[ Header::new(":status", "200"), Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; server.encode_headers(push_stream_id, headers200, &mut d); // create a push stream. send_data_on_push( &mut server.conn, push_stream_id, u8::try_from(FIRST_PUSH_ID).unwrap(), &d, true, ); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); let mut events = client.events().filter_map(|e| { if let Http3ClientEvent::PushHeaderReady { push_id, interim, headers, .. } = e { Some((push_id, interim, headers)) } else { None } }); let (push_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap(); assert_eq!( (push_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()), (FIRST_PUSH_ID, true, headers1xx) ); let (push_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap(); assert_eq!( (push_id_200_rec, interim200_rec, headers200_rec.as_ref()), (FIRST_PUSH_ID, false, headers200) ); assert!(events.next().is_none()); } // Client: receive a push stream #[test] fn push_single_wo_status() { const FIRST_PUSH_ID: u64 = 0; // Connect and send a request let (mut client, mut server, request_stream_id) = connect_and_send_request(true); // Send a push promise. send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID); // Create a push stream let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); let mut d = Encoder::default(); let headers = vec![ Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; server.encode_headers(request_stream_id, &headers, &mut d); send_data_on_push( &mut server.conn, push_stream_id, u8::try_from(FIRST_PUSH_ID).unwrap(), &d, false, ); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, HTTP_RESPONSE_2, true, ); // Stream has been reset because of thei malformed headers. let push_reset_event = |e| { matches!(e, Http3ClientEvent::PushReset { push_id, error, } if push_id == FIRST_PUSH_ID && error == Error::InvalidHeader.code()) }; assert!(client.events().any(push_reset_event)); let out = client.process(None, now()).dgram(); mem::drop(server.conn.process(out, now())); // Check that server has received a reset. let stop_sending_event = |e| { matches!(e, ConnectionEvent::SendStreamStopSending { stream_id, app_error } if stream_id == push_stream_id && app_error == Error::InvalidHeader.code()) }; assert!(server.conn.events().any(stop_sending_event)); } fn handshake_client_error(client: &mut Http3Client, server: &mut TestServer, error: &Error) { let out = handshake_only(client, server); client.process(out.dgram(), now()); assert_closed(client, error); } /// Client fails to create a control stream, since server does not allow it. #[test] fn client_control_stream_create_failed() { let mut client = default_http3_client(); let mut server = TestServer::new_with_conn(new_server( DEFAULT_ALPN_H3, ConnectionParameters::default().max_streams(StreamType::UniDi, 0), )); handshake_client_error(&mut client, &mut server, &Error::StreamLimitError); } /// 2 streams isn't enough for control and QPACK streams. #[test] fn client_qpack_stream_create_failed() { let mut client = default_http3_client(); let mut server = TestServer::new_with_conn(new_server( DEFAULT_ALPN_H3, ConnectionParameters::default().max_streams(StreamType::UniDi, 2), )); handshake_client_error(&mut client, &mut server, &Error::StreamLimitError); } fn do_malformed_response_test(headers: &[Header]) { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let mut d = Encoder::default(); server.encode_headers(request_stream_id, headers, &mut d); // Send response server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Stream has been reset because of the malformed headers. let e = client.events().next().unwrap(); assert_eq!( e, Http3ClientEvent::Reset { stream_id: request_stream_id, error: Error::InvalidHeader.code(), local: true, } ); } #[test] fn malformed_response_pseudo_header_after_regular_header() { do_malformed_response_test(&[ Header::new("content-type", "text/plain"), Header::new(":status", "100"), ]); } #[test] fn malformed_response_undefined_pseudo_header() { do_malformed_response_test(&[Header::new(":status", "200"), Header::new(":cheese", "200")]); } #[test] fn malformed_response_duplicate_pseudo_header() { do_malformed_response_test(&[ Header::new(":status", "200"), Header::new(":status", "100"), Header::new("content-type", "text/plain"), ]); } #[test] fn malformed_response_uppercase_header() { do_malformed_response_test(&[ Header::new(":status", "200"), Header::new("content-Type", "text/plain"), ]); } #[test] fn malformed_response_excluded_header() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let mut d = Encoder::default(); server.encode_headers( request_stream_id, &[ Header::new(":status", "200"), Header::new("content-type", "text/plain"), Header::new("connection", "close"), ], &mut d, ); // Send response server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Stream has been reset because of the malformed headers. let e = client.events().next().unwrap(); assert_eq!( e, Http3ClientEvent::HeaderReady { stream_id: request_stream_id, headers: vec![ Header::new(":status", "200"), Header::new("content-type", "text/plain") ], interim: false, fin: false, } ); } #[test] fn malformed_response_excluded_byte_in_header() { do_malformed_response_test(&[ Header::new(":status", "200"), Header::new("content:type", "text/plain"), ]); } #[test] fn malformed_response_request_header_in_response() { do_malformed_response_test(&[ Header::new(":status", "200"), Header::new(":method", "GET"), Header::new("content-type", "text/plain"), ]); } fn maybe_authenticate(conn: &mut Http3Client) { let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); if conn.events().any(authentication_needed) { conn.authenticated(AuthenticationStatus::Ok, now()); } } const MAX_TABLE_SIZE: u64 = 65536; const MAX_BLOCKED_STREAMS: u16 = 5; fn get_resumption_token(server: &mut Http3Server) -> ResumptionToken { let mut client = default_http3_client_param(MAX_TABLE_SIZE); let mut datagram = None; let is_done = |c: &Http3Client| matches!(c.state(), Http3State::Connected); while !is_done(&mut client) { maybe_authenticate(&mut client); datagram = client.process(datagram, now()).dgram(); datagram = server.process(datagram, now()).dgram(); } // exchange qpack settings, server will send a token as well. datagram = client.process(datagram, now()).dgram(); datagram = server.process(datagram, now()).dgram(); mem::drop(client.process(datagram, now()).dgram()); client .events() .find_map(|e| { if let Http3ClientEvent::ResumptionToken(token) = e { Some(token) } else { None } }) .unwrap() } // Test that decoder stream type is always sent before any other instruction also // in case when 0RTT is used. // A client will send a request that uses the dynamic table. This will trigger a header-ack // from a server. We will use stats to check that a header-ack has been received. #[test] fn zerortt_request_use_dynamic_table() { let mut server = Http3Server::new( now(), DEFAULT_KEYS, DEFAULT_ALPN_H3, anti_replay(), Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), Http3Parameters::default() .max_table_size_encoder(MAX_TABLE_SIZE) .max_table_size_decoder(MAX_TABLE_SIZE) .max_blocked_streams(MAX_BLOCKED_STREAMS), None, ) .unwrap(); let token = get_resumption_token(&mut server); // Make a new connection. let mut client = default_http3_client_param(MAX_TABLE_SIZE); assert_eq!(client.state(), Http3State::Initializing); client .enable_resumption(now(), &token) .expect("Set resumption token."); assert_eq!(client.state(), Http3State::ZeroRtt); let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt)); assert!(client.events().any(zerortt_event)); // Make a request that uses the dynamic table. let _ = make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]); // Assert that the request has used dynamic table. That will trigger a header_ack. assert_eq!(client.qpack_encoder_stats().dynamic_table_references, 1); // Exchange packets until header-ack is received. // These many packet exchange is needed, to get a header-ack. // TODO this may be optimize at Http3Server. let out = client.process(None, now()).dgram(); let out = server.process(out, now()).dgram(); let out = client.process(out, now()).dgram(); let out = server.process(out, now()).dgram(); let out = client.process(out, now()).dgram(); let out = server.process(out, now()).dgram(); let out = client.process(out, now()).dgram(); let out = server.process(out, now()).dgram(); mem::drop(client.process(out, now())); // The header ack for the first request has been received. assert_eq!(client.qpack_encoder_stats().header_acks_recv, 1); } fn manipulate_conrol_stream(client: &mut Http3Client, stream_id: StreamId) { assert_eq!( client .cancel_fetch(stream_id, Error::HttpNoError.code()) .unwrap_err(), Error::InvalidStreamId ); assert_eq!( client.stream_close_send(stream_id).unwrap_err(), Error::InvalidStreamId ); let mut buf = [0; 2]; assert_eq!( client.send_data(stream_id, &buf).unwrap_err(), Error::InvalidStreamId ); assert_eq!( client.read_data(now(), stream_id, &mut buf).unwrap_err(), Error::InvalidStreamId ); } #[test] fn manipulate_conrol_streams() { let (mut client, server, request_stream_id) = connect_and_send_request(false); manipulate_conrol_stream(&mut client, CLIENT_SIDE_CONTROL_STREAM_ID); manipulate_conrol_stream(&mut client, CLIENT_SIDE_ENCODER_STREAM_ID); manipulate_conrol_stream(&mut client, CLIENT_SIDE_DECODER_STREAM_ID); manipulate_conrol_stream(&mut client, server.control_stream_id.unwrap()); manipulate_conrol_stream(&mut client, server.encoder_stream_id.unwrap()); manipulate_conrol_stream(&mut client, server.decoder_stream_id.unwrap()); client .cancel_fetch(request_stream_id, Error::HttpNoError.code()) .unwrap(); } // Client: receive a push stream #[test] fn incomple_push_stream() { let (mut client, mut server) = connect(); // Create a push stream let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap(); let _ = server .conn .stream_send(push_stream_id, PUSH_STREAM_TYPE) .unwrap(); let _ = server.conn.stream_send(push_stream_id, &[0]).unwrap(); server.conn.stream_close_send(push_stream_id).unwrap(); let out = server.conn.process(None, now()); client.process(out.dgram(), now()); assert_closed(&client, &Error::HttpGeneralProtocol); } #[test] fn priority_update_during_full_buffer() { // set a lower MAX_DATA on the server side to restrict the data the client can send let (mut client, mut server) = connect_with_connection_parameters(ConnectionParameters::default().max_data(1200)); let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, false); let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. }); assert!(client.events().any(data_writable)); // Send a lot of data to reach the flow control limit client.send_data(request_stream_id, &[0; 2000]).unwrap(); // now queue a priority_update packet for that stream assert!(client .priority_update(request_stream_id, Priority::new(6, false)) .unwrap()); let md_before = server.conn.stats().frame_tx.max_data; // sending the http request and most most of the request data let out = client.process(None, now()).dgram(); let out = server.conn.process(out, now()).dgram(); // the server responses with an ack, but the max_data didn't change assert_eq!(md_before, server.conn.stats().frame_tx.max_data); let out = client.process(out, now()).dgram(); let out = server.conn.process(out, now()).dgram(); // the server increased the max_data during the second read if that isn't the case // in the future and therefore this asserts fails, the request data on stream 0 could be read // to cause a max_update frame assert_eq!(md_before + 1, server.conn.stats().frame_tx.max_data); // make sure that the server didn't receive a priority_update on client control stream (stream_id 2) yet let mut buf = [0; 32]; assert_eq!( server.conn.stream_recv(StreamId::new(2), &mut buf), Ok((0, false)) ); // the client now sends the priority update let out = client.process(out, now()).dgram(); server.conn.process_input(out.unwrap(), now()); // check that the priority_update arrived at the client control stream let num_read = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap(); assert_eq!(b"\x80\x0f\x07\x00\x04\x00\x75\x3d\x36", &buf[0..num_read.0]); } #[test] fn error_request_stream() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let headers = vec![ Header::new(":status", "200"), Header::new(":method", "GET"), // <- invalid Header::new("my-header", "my-header"), Header::new("content-length", "3"), ]; let encoded_headers = server.encoder.borrow_mut().encode_header_block( &mut server.conn, &headers, request_stream_id, ); let hframe = HFrame::Headers { header_block: encoded_headers.to_vec(), }; // Send the encoder instructions, but delay them so that the stream is blocked on decoding headers. let encoder_inst_pkt = server.conn.process(None, now()); // Send response let mut d = Encoder::default(); hframe.encode(&mut d); let d_frame = HFrame::Data { len: 3 }; d_frame.encode(&mut d); d.encode(b"abc"); server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, true, ); // Let client receive the encoder instructions. client.process_input(encoder_inst_pkt.dgram().unwrap(), now()); let reset_event = |e| matches!(e, Http3ClientEvent::Reset { stream_id, .. } if stream_id == request_stream_id); assert!(client.events().any(reset_event)); } #[test] fn response_w_101() { let (mut client, mut server, request_stream_id) = connect_and_send_request(true); setup_server_side_encoder(&mut client, &mut server); let mut d = Encoder::default(); let headers1xx = &[Header::new(":status", "101")]; server.encode_headers(request_stream_id, headers1xx, &mut d); // Send 101 response. server_send_response_and_exchange_packet( &mut client, &mut server, request_stream_id, &d, false, ); // Stream has been reset because of the 101 response. let e = client.events().next().unwrap(); assert_eq!( e, Http3ClientEvent::Reset { stream_id: request_stream_id, error: Error::InvalidHeader.code(), local: true, } ); } }