From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../tests/webtransport/datagrams.rs | 146 +++ .../extended_connect/tests/webtransport/mod.rs | 661 ++++++++++++ .../tests/webtransport/negotiation.rs | 280 +++++ .../tests/webtransport/sessions.rs | 456 ++++++++ .../extended_connect/tests/webtransport/streams.rs | 1131 ++++++++++++++++++++ 5 files changed, 2674 insertions(+) create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs (limited to 'third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport') diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs new file mode 100644 index 0000000000..1c58596dd3 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs @@ -0,0 +1,146 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::convert::TryFrom; + +use neqo_common::Encoder; +use neqo_transport::Error as TransportError; + +use crate::{ + features::extended_connect::tests::webtransport::{ + wt_default_parameters, WtTest, DATAGRAM_SIZE, + }, + Error, Http3Parameters, WebTransportRequest, +}; + +const DGRAM: &[u8] = &[0, 100]; + +#[test] +fn no_datagrams() { + let mut wt = WtTest::new_with_params( + Http3Parameters::default().webtransport(true), + Http3Parameters::default().webtransport(true), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + + assert_eq!( + wt_session.send_datagram(DGRAM, None), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + assert_eq!( + wt.send_datagram(wt_session.stream_id(), DGRAM), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + + wt.exchange_packets(); + wt.check_no_datagram_received_client(); + wt.check_no_datagram_received_server(); +} + +fn do_datagram_test(wt: &mut WtTest, wt_session: &mut WebTransportRequest) { + assert_eq!( + wt_session.max_datagram_size(), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + + assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(())); + assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(())); + + wt.exchange_packets(); + wt.check_datagram_received_client(wt_session.stream_id(), DGRAM); + wt.check_datagram_received_server(wt_session, DGRAM); +} + +#[test] +fn datagrams() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session); +} + +#[test] +fn datagrams_server_only() { + let mut wt = WtTest::new_with_params( + Http3Parameters::default().webtransport(true), + wt_default_parameters(), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + + assert_eq!( + wt_session.send_datagram(DGRAM, None), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(())); + + wt.exchange_packets(); + wt.check_datagram_received_server(&wt_session, DGRAM); + wt.check_no_datagram_received_client(); +} + +#[test] +fn datagrams_client_only() { + let mut wt = WtTest::new_with_params( + wt_default_parameters(), + Http3Parameters::default().webtransport(true), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + + assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(())); + assert_eq!( + wt.send_datagram(wt_session.stream_id(), DGRAM), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + + wt.exchange_packets(); + wt.check_datagram_received_client(wt_session.stream_id(), DGRAM); + wt.check_no_datagram_received_server(); +} + +#[test] +fn datagrams_multiple_session() { + let mut wt = WtTest::new(); + + let mut wt_session1 = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session1); + + let mut wt_session_2 = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session_2); +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs new file mode 100644 index 0000000000..51dc47e4c1 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs @@ -0,0 +1,661 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +mod datagrams; +mod negotiation; +mod sessions; +mod streams; +use std::{cell::RefCell, rc::Rc, time::Duration}; + +use neqo_common::event::Provider; +use neqo_crypto::AuthenticationStatus; +use neqo_transport::{ConnectionParameters, StreamId, StreamType}; +use test_fixture::{ + addr, anti_replay, fixture_init, now, CountingConnectionIdGenerator, DEFAULT_ALPN_H3, + DEFAULT_KEYS, DEFAULT_SERVER_NAME, +}; + +use crate::{ + features::extended_connect::SessionCloseReason, Error, Header, Http3Client, Http3ClientEvent, + Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, Http3State, + RecvStreamStats, SendStreamStats, WebTransportEvent, WebTransportRequest, + WebTransportServerEvent, WebTransportSessionAcceptAction, +}; + +const DATAGRAM_SIZE: u64 = 1200; + +pub fn wt_default_parameters() -> Http3Parameters { + Http3Parameters::default() + .webtransport(true) + .connection_parameters(ConnectionParameters::default().datagram_size(DATAGRAM_SIZE)) +} + +pub fn default_http3_client(client_params: Http3Parameters) -> Http3Client { + fixture_init(); + Http3Client::new( + DEFAULT_SERVER_NAME, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + addr(), + addr(), + client_params, + now(), + ) + .expect("create a default client") +} + +pub fn default_http3_server(server_params: Http3Parameters) -> Http3Server { + Http3Server::new( + now(), + DEFAULT_KEYS, + DEFAULT_ALPN_H3, + anti_replay(), + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + server_params, + None, + ) + .expect("create a server") +} + +fn exchange_packets(client: &mut Http3Client, server: &mut Http3Server) { + let mut out = None; + loop { + out = client.process(out.as_ref(), now()).dgram(); + out = server.process(out.as_ref(), now()).dgram(); + if out.is_none() { + break; + } + } +} + +// Perform only Quic transport handshake. +fn connect_with(client: &mut Http3Client, server: &mut Http3Server) { + assert_eq!(client.state(), Http3State::Initializing); + let out = client.process(None, now()); + assert_eq!(client.state(), Http3State::Initializing); + + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + assert!(out.as_dgram_ref().is_none()); + + let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); + assert!(client.events().any(authentication_needed)); + client.authenticated(AuthenticationStatus::Ok, now()); + + let out = client.process(out.as_dgram_ref(), now()); + let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected)); + assert!(client.events().any(connected)); + + assert_eq!(client.state(), Http3State::Connected); + + // Exchange H3 setttings + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + std::mem::drop(client.process(out.as_dgram_ref(), now())); +} + +fn connect( + client_params: Http3Parameters, + server_params: Http3Parameters, +) -> (Http3Client, Http3Server) { + let mut client = default_http3_client(client_params); + let mut server = default_http3_server(server_params); + connect_with(&mut client, &mut server); + (client, server) +} + +struct WtTest { + client: Http3Client, + server: Http3Server, +} + +impl WtTest { + pub fn new() -> Self { + let (client, server) = connect(wt_default_parameters(), wt_default_parameters()); + Self { client, server } + } + + pub fn new_with_params(client_params: Http3Parameters, server_params: Http3Parameters) -> Self { + let (client, server) = connect(client_params, server_params); + Self { client, server } + } + + pub fn new_with(mut client: Http3Client, mut server: Http3Server) -> Self { + connect_with(&mut client, &mut server); + Self { client, server } + } + fn negotiate_wt_session( + &mut self, + accept: &WebTransportSessionAcceptAction, + ) -> (StreamId, Option) { + let wt_session_id = self + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + self.exchange_packets(); + + let mut wt_server_session = None; + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + mut session, + headers, + }) => { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + session.response(accept).unwrap(); + wt_server_session = Some(session); + } + Http3ServerEvent::Data { .. } => { + panic!("There should not be any data events!"); + } + _ => {} + } + } + + self.exchange_packets(); + (wt_session_id, wt_server_session) + } + + fn create_wt_session(&mut self) -> WebTransportRequest { + let (wt_session_id, wt_server_session) = + self.negotiate_wt_session(&WebTransportSessionAcceptAction::Accept); + let wt_session_negotiated_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Session{ + stream_id, + status, + headers, + }) if ( + stream_id == wt_session_id && + status == 200 && + headers.contains(&Header::new(":status", "200")) + ) + ) + }; + assert!(self.client.events().any(wt_session_negotiated_event)); + + let wt_server_session = wt_server_session.unwrap(); + assert_eq!(wt_session_id, wt_server_session.stream_id()); + wt_server_session + } + + fn exchange_packets(&mut self) { + const RTT: Duration = Duration::from_millis(10); + let mut out = None; + let mut now = now(); + loop { + now += RTT / 2; + out = self.client.process(out.as_ref(), now).dgram(); + let client_none = out.is_none(); + now += RTT / 2; + out = self.server.process(out.as_ref(), now).dgram(); + if client_none && out.is_none() { + break; + } + } + } + + pub fn cancel_session_client(&mut self, wt_stream_id: StreamId) { + self.client + .cancel_fetch(wt_stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn session_closed_client( + e: &Http3ClientEvent, + id: StreamId, + expected_reason: &SessionCloseReason, + expected_headers: &Option>, + ) -> bool { + if let Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed { + stream_id, + reason, + headers, + }) = e + { + *stream_id == id && reason == expected_reason && headers == expected_headers + } else { + false + } + } + + pub fn check_session_closed_event_client( + &mut self, + wt_session_id: StreamId, + expected_reason: &SessionCloseReason, + expected_headers: &Option>, + ) { + let mut event_found = false; + + while let Some(event) = self.client.next_event() { + event_found = WtTest::session_closed_client( + &event, + wt_session_id, + expected_reason, + expected_headers, + ); + if event_found { + break; + } + } + assert!(event_found); + } + + pub fn cancel_session_server(&mut self, wt_session: &mut WebTransportRequest) { + wt_session.cancel_fetch(Error::HttpNoError.code()).unwrap(); + self.exchange_packets(); + } + + fn session_closed_server( + e: &Http3ServerEvent, + id: StreamId, + expected_reason: &SessionCloseReason, + ) -> bool { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { + session, + reason, + headers, + }) = e + { + session.stream_id() == id && reason == expected_reason && headers.is_none() + } else { + false + } + } + + pub fn check_session_closed_event_server( + &mut self, + wt_session: &mut WebTransportRequest, + expected_reeason: &SessionCloseReason, + ) { + let event = self.server.next_event().unwrap(); + assert!(WtTest::session_closed_server( + &event, + wt_session.stream_id(), + expected_reeason + )); + } + + fn create_wt_stream_client( + &mut self, + wt_session_id: StreamId, + stream_type: StreamType, + ) -> StreamId { + self.client + .webtransport_create_stream(wt_session_id, stream_type) + .unwrap() + } + + fn send_data_client(&mut self, wt_stream_id: StreamId, data: &[u8]) { + assert_eq!( + self.client.send_data(wt_stream_id, data).unwrap(), + data.len() + ); + self.exchange_packets(); + } + + fn send_stream_stats(&mut self, wt_stream_id: StreamId) -> Result { + self.client.webtransport_send_stream_stats(wt_stream_id) + } + + fn recv_stream_stats(&mut self, wt_stream_id: StreamId) -> Result { + self.client.webtransport_recv_stream_stats(wt_stream_id) + } + + fn receive_data_client( + &mut self, + expected_stream_id: StreamId, + new_stream: bool, + expected_data: &[u8], + expected_fin: bool, + ) { + let mut new_stream_received = false; + let mut data_received = false; + while let Some(event) = self.client.next_event() { + match event { + Http3ClientEvent::WebTransport(WebTransportEvent::NewStream { + stream_id, .. + }) => { + assert_eq!(stream_id, expected_stream_id); + new_stream_received = true; + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, expected_stream_id); + let mut buf = [0; 100]; + let (amount, fin) = self.client.read_data(now(), stream_id, &mut buf).unwrap(); + assert_eq!(fin, expected_fin); + assert_eq!(amount, expected_data.len()); + assert_eq!(&buf[..amount], expected_data); + data_received = true; + } + _ => {} + } + } + assert!(data_received); + assert_eq!(new_stream, new_stream_received); + } + + fn close_stream_sending_client(&mut self, wt_stream_id: StreamId) { + self.client.stream_close_send(wt_stream_id).unwrap(); + self.exchange_packets(); + } + + fn reset_stream_client(&mut self, wt_stream_id: StreamId) { + self.client + .stream_reset_send(wt_stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_reset_client(&mut self, expected_stream_id: StreamId) { + let wt_reset_event = |e| { + matches!( + e, + Http3ClientEvent::Reset { + stream_id, + error, + local + } if stream_id == expected_stream_id && error == Error::HttpNoError.code() && !local + ) + }; + assert!(self.client.events().any(wt_reset_event)); + } + + fn stream_stop_sending_client(&mut self, stream_id: StreamId) { + self.client + .stream_stop_sending(stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_stop_sending_client(&mut self, expected_stream_id: StreamId) { + let wt_stop_sending_event = |e| { + matches!( + e, + Http3ClientEvent::StopSending { + stream_id, + error + } if stream_id == expected_stream_id && error == Error::HttpNoError.code() + ) + }; + assert!(self.client.events().any(wt_stop_sending_event)); + } + + fn check_events_after_closing_session_client( + &mut self, + expected_reset_ids: &[StreamId], + expected_error_stream_reset: Option, + expected_stop_sending_ids: &[StreamId], + expected_error_stream_stop_sending: Option, + expected_local: bool, + expected_session_close: &Option<(StreamId, SessionCloseReason)>, + ) { + let mut reset_ids_count = 0; + let mut stop_sending_ids_count = 0; + let mut close_event = false; + while let Some(event) = self.client.next_event() { + match event { + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert!(expected_reset_ids.contains(&stream_id)); + assert_eq!(expected_error_stream_reset.unwrap(), error); + assert_eq!(expected_local, local); + reset_ids_count += 1; + } + Http3ClientEvent::StopSending { stream_id, error } => { + assert!(expected_stop_sending_ids.contains(&stream_id)); + assert_eq!(expected_error_stream_stop_sending.unwrap(), error); + stop_sending_ids_count += 1; + } + Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed { + stream_id, + reason, + headers, + }) => { + close_event = true; + assert_eq!(stream_id, expected_session_close.as_ref().unwrap().0); + assert_eq!(expected_session_close.as_ref().unwrap().1, reason); + assert!(headers.is_none()); + } + _ => {} + } + } + assert_eq!(reset_ids_count, expected_reset_ids.len()); + assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len()); + assert_eq!(close_event, expected_session_close.is_some()); + } + + fn create_wt_stream_server( + wt_server_session: &mut WebTransportRequest, + stream_type: StreamType, + ) -> Http3OrWebTransportStream { + wt_server_session.create_stream(stream_type).unwrap() + } + + fn send_data_server(&mut self, wt_stream: &mut Http3OrWebTransportStream, data: &[u8]) { + assert_eq!(wt_stream.send_data(data).unwrap(), data.len()); + self.exchange_packets(); + } + + fn receive_data_server( + &mut self, + stream_id: StreamId, + new_stream: bool, + expected_data: &[u8], + expected_fin: bool, + ) -> Http3OrWebTransportStream { + self.exchange_packets(); + let mut new_stream_received = false; + let mut data_received = false; + let mut wt_stream = None; + let mut stream_closed = false; + let mut recv_data = Vec::new(); + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(request)) => { + assert_eq!(stream_id, request.stream_id()); + new_stream_received = true; + } + Http3ServerEvent::Data { + mut data, + fin, + stream, + } => { + recv_data.append(&mut data); + stream_closed = fin; + data_received = true; + wt_stream = Some(stream); + } + _ => {} + } + } + assert_eq!(&recv_data[..], expected_data); + assert!(data_received); + assert_eq!(new_stream, new_stream_received); + assert_eq!(stream_closed, expected_fin); + wt_stream.unwrap() + } + + fn close_stream_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream.stream_close_send().unwrap(); + self.exchange_packets(); + } + + fn reset_stream_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream + .stream_reset_send(Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn stream_stop_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream + .stream_stop_sending(Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_reset_server(&mut self, expected_stream_id: StreamId, expected_error: u64) { + let stream_reset = |e| { + matches!( + e, + Http3ServerEvent::StreamReset { + stream, + error + } if stream.stream_id() == expected_stream_id && error == expected_error + ) + }; + assert!(self.server.events().any(stream_reset)); + } + + fn receive_stop_sending_server(&mut self, expected_stream_id: StreamId, expected_error: u64) { + let stop_sending = |e| { + matches!( + e, + Http3ServerEvent::StreamStopSending { + stream, + error + } if stream.stream_id() == expected_stream_id && error == expected_error + ) + }; + assert!(self.server.events().any(stop_sending)); + } + + fn check_events_after_closing_session_server( + &mut self, + expected_reset_ids: &[StreamId], + expected_error_stream_reset: Option, + expected_stop_sending_ids: &[StreamId], + expected_error_stream_stop_sending: Option, + expected_session_close: &Option<(StreamId, SessionCloseReason)>, + ) { + let mut reset_ids_count = 0; + let mut stop_sending_ids_count = 0; + let mut close_event = false; + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::StreamReset { stream, error } => { + assert!(expected_reset_ids.contains(&stream.stream_id())); + assert_eq!(expected_error_stream_reset.unwrap(), error); + reset_ids_count += 1; + } + Http3ServerEvent::StreamStopSending { stream, error } => { + assert!(expected_stop_sending_ids.contains(&stream.stream_id())); + assert_eq!(expected_error_stream_stop_sending.unwrap(), error); + stop_sending_ids_count += 1; + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { + session, + reason, + headers, + }) => { + close_event = true; + assert_eq!( + session.stream_id(), + expected_session_close.as_ref().unwrap().0 + ); + assert_eq!(expected_session_close.as_ref().unwrap().1, reason); + assert!(headers.is_none()); + } + _ => {} + } + } + assert_eq!(reset_ids_count, expected_reset_ids.len()); + assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len()); + assert_eq!(close_event, expected_session_close.is_some()); + } + + pub fn session_close_frame_client(&mut self, session_id: StreamId, error: u32, message: &str) { + self.client + .webtransport_close_session(session_id, error, message) + .unwrap(); + } + + pub fn session_close_frame_server( + wt_session: &mut WebTransportRequest, + error: u32, + message: &str, + ) { + wt_session.close_session(error, message).unwrap(); + } + + fn max_datagram_size(&self, stream_id: StreamId) -> Result { + self.client.webtransport_max_datagram_size(stream_id) + } + + fn send_datagram(&mut self, stream_id: StreamId, buf: &[u8]) -> Result<(), Error> { + self.client.webtransport_send_datagram(stream_id, buf, None) + } + + fn check_datagram_received_client( + &mut self, + expected_stream_id: StreamId, + expected_dgram: &[u8], + ) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Datagram { + session_id, + datagram + }) if session_id == expected_stream_id && datagram == expected_dgram + ) + }; + assert!(self.client.events().any(wt_datagram_event)); + } + + fn check_datagram_received_server( + &mut self, + expected_session: &WebTransportRequest, + expected_dgram: &[u8], + ) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { + session, + datagram + }) if session.stream_id() == expected_session.stream_id() && datagram == expected_dgram + ) + }; + assert!(self.server.events().any(wt_datagram_event)); + } + + fn check_no_datagram_received_client(&mut self) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Datagram { .. }) + ) + }; + assert!(!self.client.events().any(wt_datagram_event)); + } + + fn check_no_datagram_received_server(&mut self) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { .. }) + ) + }; + assert!(!self.server.events().any(wt_datagram_event)); + } +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs new file mode 100644 index 0000000000..27f669861d --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs @@ -0,0 +1,280 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::Duration; + +use neqo_common::{event::Provider, Encoder}; +use neqo_crypto::AuthenticationStatus; +use neqo_transport::{Connection, ConnectionError, StreamType}; +use test_fixture::{default_server_h3, now}; + +use super::{connect, default_http3_client, default_http3_server, exchange_packets}; +use crate::{ + settings::{HSetting, HSettingType, HSettings}, + Error, HFrame, Http3Client, Http3ClientEvent, Http3Parameters, Http3Server, Http3State, + WebTransportEvent, +}; + +fn check_wt_event(client: &mut Http3Client, wt_enable_client: bool, wt_enable_server: bool) { + let wt_event = client.events().find_map(|e| { + if let Http3ClientEvent::WebTransport(WebTransportEvent::Negotiated(neg)) = e { + Some(neg) + } else { + None + } + }); + + assert_eq!(wt_event.is_some(), wt_enable_client); + if let Some(wt) = wt_event { + assert_eq!(wt, wt_enable_client && wt_enable_server); + } +} + +fn connect_wt(wt_enabled_client: bool, wt_enabled_server: bool) -> (Http3Client, Http3Server) { + connect( + Http3Parameters::default().webtransport(wt_enabled_client), + Http3Parameters::default().webtransport(wt_enabled_server), + ) +} + +#[test] +fn negotiate_wt() { + let (mut client, _server) = connect_wt(true, true); + assert!(client.webtransport_enabled()); + check_wt_event(&mut client, true, true); + + let (mut client, _server) = connect_wt(true, false); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, true, false); + + let (mut client, _server) = connect_wt(false, true); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, false, true); + + let (mut client, _server) = connect_wt(false, false); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, false, false); +} + +#[derive(PartialEq, Eq)] +enum ClientState { + ClientEnabled, + ClientDisabled, +} + +#[derive(PartialEq, Eq)] +enum ServerState { + ServerEnabled, + ServerDisabled, +} + +fn zero_rtt( + client_state: &ClientState, + server_state: &ServerState, + client_resumed_state: &ClientState, + server_resumed_state: &ServerState, +) { + let client_org = ClientState::ClientEnabled.eq(client_state); + let server_org = ServerState::ServerEnabled.eq(server_state); + let client_resumed = ClientState::ClientEnabled.eq(client_resumed_state); + let server_resumed = ServerState::ServerEnabled.eq(server_resumed_state); + + let (mut client, mut server) = connect_wt(client_org, server_org); + assert_eq!(client.webtransport_enabled(), client_org && server_org); + + // exchange token + let out = server.process(None, now()); + // We do not have a token so we need to wait for a resumption token timer to trigger. + std::mem::drop(client.process(out.as_dgram_ref(), now() + Duration::from_millis(250))); + assert_eq!(client.state(), Http3State::Connected); + let token = client + .events() + .find_map(|e| { + if let Http3ClientEvent::ResumptionToken(token) = e { + Some(token) + } else { + None + } + }) + .unwrap(); + + let mut client = default_http3_client(Http3Parameters::default().webtransport(client_resumed)); + let mut server = default_http3_server(Http3Parameters::default().webtransport(server_resumed)); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + assert_eq!(client.state(), Http3State::ZeroRtt); + + exchange_packets(&mut client, &mut server); + + assert_eq!(&client.state(), &Http3State::Connected); + assert_eq!( + client.webtransport_enabled(), + client_resumed && server_resumed + ); + + let mut early_data_accepted = true; + // The only case we should not do 0-RTT is when webtransport was enabled + // originally and is disabled afterwards. + if server_org && !server_resumed { + early_data_accepted = false; + } + assert_eq!( + client.tls_info().unwrap().early_data_accepted(), + early_data_accepted + ); + + check_wt_event(&mut client, client_resumed, server_resumed); +} + +#[test] +fn zero_rtt_wt_settings() { + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); +} + +fn exchange_packets2(client: &mut Http3Client, server: &mut Connection) { + let mut out = None; + loop { + out = client.process(out.as_ref(), now()).dgram(); + out = server.process(out.as_ref(), now()).dgram(); + if out.is_none() { + break; + } + } +} + +#[test] +fn wrong_setting_value() { + const CONTROL_STREAM_TYPE: &[u8] = &[0x0]; + let mut client = default_http3_client(Http3Parameters::default()); + let mut server = default_server_h3(); + + exchange_packets2(&mut client, &mut server); + client.authenticated(AuthenticationStatus::Ok, now()); + exchange_packets2(&mut client, &mut server); + + let control = server.stream_create(StreamType::UniDi).unwrap(); + server.stream_send(control, CONTROL_STREAM_TYPE).unwrap(); + // Encode a settings frame and send it. + let mut enc = Encoder::default(); + let settings = HFrame::Settings { + settings: HSettings::new(&[HSetting::new(HSettingType::EnableWebTransport, 2)]), + }; + settings.encode(&mut enc); + assert_eq!( + server.stream_send(control, enc.as_ref()).unwrap(), + enc.as_ref().len() + ); + + exchange_packets2(&mut client, &mut server); + match client.state() { + Http3State::Closing(err) | Http3State::Closed(err) => { + assert_eq!( + err, + ConnectionError::Application(Error::HttpSettings.code()) + ); + } + _ => panic!("Wrong state {:?}", client.state()), + }; +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs new file mode 100644 index 0000000000..5f929d0e4b --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs @@ -0,0 +1,456 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::mem; + +use neqo_common::{event::Provider, Encoder}; +use neqo_transport::StreamType; +use test_fixture::now; + +use crate::{ + features::extended_connect::{ + tests::webtransport::{ + default_http3_client, default_http3_server, wt_default_parameters, WtTest, + }, + SessionCloseReason, + }, + frames::WebTransportFrame, + Error, Header, Http3ClientEvent, Http3OrWebTransportStream, Http3Server, Http3ServerEvent, + Http3State, Priority, WebTransportEvent, WebTransportServerEvent, + WebTransportSessionAcceptAction, +}; + +#[test] +fn wt_session() { + let mut wt = WtTest::new(); + mem::drop(wt.create_wt_session()); +} + +#[test] +fn wt_session_reject() { + let mut wt = WtTest::new(); + let headers = vec![Header::new(":status", "404")]; + let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone()); + let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res); + + wt.check_session_closed_event_client( + wt_session_id, + &SessionCloseReason::Status(404), + &Some(headers), + ); +} + +#[test] +fn wt_session_close_client() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.cancel_session_client(wt_session.stream_id()); + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Error(Error::HttpNoError.code()), + ); +} + +#[test] +fn wt_session_close_server() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.cancel_session_server(&mut wt_session); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_close_server_close_send() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session.stream_close_send().unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + &None, + ); +} + +#[test] +fn wt_session_close_server_stop_sending() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session + .stream_stop_sending(Error::HttpNoError.code()) + .unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_close_server_reset() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session + .stream_reset_send(Error::HttpNoError.code()) + .unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_response_with_1xx() { + let mut wt = WtTest::new(); + + let wt_session_id = wt + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + wt.exchange_packets(); + + let mut wt_server_session = None; + while let Some(event) = wt.server.next_event() { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + session, + headers, + }) = event + { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + wt_server_session = Some(session); + } + } + + let mut wt_server_session = wt_server_session.unwrap(); + + // Send interim response. + wt_server_session + .send_headers(&[Header::new(":status", "111")]) + .unwrap(); + wt_server_session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + + wt.exchange_packets(); + + let wt_session_negotiated_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Session{ + stream_id, + status, + headers, + }) if ( + stream_id == wt_session_id && + status == 200 && + headers.contains(&Header::new(":status", "200")) + ) + ) + }; + assert!(wt.client.events().any(wt_session_negotiated_event)); + + assert_eq!(wt_session_id, wt_server_session.stream_id()); +} + +#[test] +fn wt_session_response_with_redirect() { + let headers = [Header::new(":status", "302"), Header::new("location", "/")].to_vec(); + let mut wt = WtTest::new(); + + let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone()); + + let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res); + + wt.check_session_closed_event_client( + wt_session_id, + &SessionCloseReason::Status(302), + &Some(headers), + ); +} + +#[test] +fn wt_session_respone_200_with_fin() { + let mut wt = WtTest::new(); + + let wt_session_id = wt + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + wt.exchange_packets(); + let mut wt_server_session = None; + while let Some(event) = wt.server.next_event() { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + session, + headers, + }) = event + { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + wt_server_session = Some(session); + } + } + + let mut wt_server_session = wt_server_session.unwrap(); + wt_server_session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + wt_server_session.stream_close_send().unwrap(); + + wt.exchange_packets(); + + let wt_session_close_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed{ + stream_id, + reason, + headers, + .. + }) if ( + stream_id == wt_session_id && + reason == SessionCloseReason::Clean{ error: 0, message: String::new()} && + headers.is_none() + ) + ) + }; + assert!(wt.client.events().any(wt_session_close_event)); + + assert_eq!(wt_session_id, wt_server_session.stream_id()); +} + +#[test] +fn wt_session_close_frame_client() { + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + ); +} + +#[test] +fn wt_session_close_frame_server() { + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + &None, + ); +} + +#[test] +fn wt_unknown_session_frame_client() { + const UNKNOWN_FRAME_LEN: usize = 832; + const BUF: &[u8] = &[0; 10]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + // Send an unknown frame. + let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4); + enc.encode_varint(1028_u64); // Arbitrary type. + enc.encode_varint(UNKNOWN_FRAME_LEN as u64); + let mut buf: Vec<_> = enc.into(); + buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0); + wt.client.send_data(wt_session.stream_id(), &buf).unwrap(); + wt.exchange_packets(); + + // The session is still active + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, false); + + // Now close the session. + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); +} + +#[test] +fn wt_close_session_frame_broken_client() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + // Send a incorrect CloseSession frame. + let mut enc = Encoder::default(); + WebTransportFrame::CloseSession { + error: 5, + message: "Hello".to_string(), + } + .encode(&mut enc); + let mut buf: Vec<_> = enc.into(); + // Corrupt the string. + buf[9] = 0xff; + wt.client.send_data(wt_session.stream_id(), &buf).unwrap(); + wt.exchange_packets(); + + // check that the webtransport session is closed. + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()), + &None, + ); + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()), + ); + + // The Http3 session is still working. + assert_eq!(wt.client.state(), Http3State::Connected); + assert_eq!(wt_session.state(), Http3State::Connected); +} + +fn receive_request(server: &mut Http3Server) -> Option { + while let Some(event) = server.next_event() { + if let Http3ServerEvent::Headers { stream, .. } = event { + return Some(stream); + } + } + None +} + +#[test] +// Ignoring this test as it is panicking at wt.create_wt_stream_client +// Issue # 1386 is created to track this +#[ignore] +fn wt_close_session_cannot_be_sent_at_once() { + const BUF: &[u8] = &[0; 443]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + + let client = default_http3_client(wt_default_parameters()); + let server = default_http3_server(wt_default_parameters()); + let mut wt = WtTest::new_with(client, server); + + let mut wt_session = wt.create_wt_session(); + + // Fill the flow control window using an unrelated http stream. + let req_id = wt + .client + .fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default(), + ) + .unwrap(); + assert_eq!(req_id, 4); + wt.exchange_packets(); + let mut req = receive_request(&mut wt.server).unwrap(); + req.send_headers(&[ + Header::new(":status", "200"), + Header::new("content-length", BUF.len().to_string()), + ]) + .unwrap(); + req.send_data(BUF).unwrap(); + + // Now close the session. + WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE); + // server cannot create new streams. + assert_eq!( + wt_session.create_stream(StreamType::UniDi), + Err(Error::InvalidStreamId) + ); + + let out = wt.server.process(None, now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + + // Client has not received the full CloseSession frame and it can create more streams. + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + let out = wt.server.process(out.as_dgram_ref(), now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + let out = wt.server.process(out.as_dgram_ref(), now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + let out = wt.server.process(out.as_dgram_ref(), now()); + let _out = wt.client.process(out.as_dgram_ref(), now()); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs new file mode 100644 index 0000000000..b898dbb31e --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs @@ -0,0 +1,1131 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::mem; + +use neqo_transport::StreamType; + +use crate::{ + features::extended_connect::{tests::webtransport::WtTest, SessionCloseReason}, + Error, +}; + +#[test] +fn wt_client_stream_uni() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), 0); + assert_eq!(send_stats.bytes_sent(), 0); + assert_eq!(send_stats.bytes_acked(), 0); + + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.receive_data_server(wt_stream, true, BUF_CLIENT, false); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64); + + // Send data again to test if the stats has the expected values. + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.receive_data_server(wt_stream, false, BUF_CLIENT, false); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), (BUF_CLIENT.len() * 2) as u64); + assert_eq!(send_stats.bytes_sent(), (BUF_CLIENT.len() * 2) as u64); + assert_eq!(send_stats.bytes_acked(), (BUF_CLIENT.len() * 2) as u64); + + let recv_stats = wt.recv_stream_stats(wt_stream); + assert_eq!(recv_stats.unwrap_err(), Error::InvalidStreamId); +} + +#[test] +fn wt_client_stream_bidi() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(wt_client_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_client_stream, false, BUF_SERVER, false); + let send_stats = wt.send_stream_stats(wt_client_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64); + + let recv_stats = wt.recv_stream_stats(wt_client_stream).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_server_stream_uni() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + let send_stats = wt.send_stream_stats(wt_server_stream.stream_id()); + assert_eq!(send_stats.unwrap_err(), Error::InvalidStreamId); + + let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_server_stream_bidi() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT); + mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, false)); + let stats = wt.send_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(stats.bytes_acked(), BUF_CLIENT.len() as u64); + + let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_client_stream_uni_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.close_stream_sending_client(wt_stream); + wt.receive_data_server(wt_stream, true, BUF_CLIENT, true); +} + +#[test] +fn wt_client_stream_bidi_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + wt.close_stream_sending_client(wt_client_stream); + + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, true); + + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_client_stream, false, BUF_SERVER, true); +} + +#[test] +fn wt_server_stream_uni_closed() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true); +} + +#[test] +fn wt_server_stream_bidi_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true); + wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT); + wt.close_stream_sending_client(wt_server_stream.stream_id()); + mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, true)); +} + +#[test] +fn wt_client_stream_uni_reset() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + mem::drop(wt.receive_data_server(wt_stream, true, BUF_CLIENT, false)); + wt.reset_stream_client(wt_stream); + wt.receive_reset_server(wt_stream, Error::HttpNoError.code()); +} + +#[test] +fn wt_server_stream_uni_reset() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_client_stream_bidi_reset() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + + wt.reset_stream_client(wt_client_stream); + wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_client_stream); +} + +#[test] +fn wt_server_stream_bidi_reset() { + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + + wt.reset_stream_client(wt_server_stream.stream_id()); + wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_client_stream_uni_stop_sending() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_stream, true, BUF_CLIENT, false); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_stream); +} + +#[test] +fn wt_server_stream_uni_stop_sending() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.stream_stop_sending_client(wt_server_stream.stream_id()); + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); +} + +#[test] +fn wt_client_stream_bidi_stop_sending() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + + wt.stream_stop_sending_client(wt_client_stream); + + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_server_stream_bidi_stop_sending() { + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.stream_stop_sending_client(wt_server_stream.stream_id()); + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_server_stream.stream_id()); +} + +// For the following tests the client cancels a session. The streams are in different states: +// 1) Both sides of a bidirectional client stream are opened. +// 2) A client unidirectional stream is opened. +// 3) A client unidirectional stream has been closed and both sides consumed the closing info. +// 4) A client unidirectional stream has been closed, but only the server has consumed the closing +// info. +// 5) A client unidirectional stream has been closed, but only the client has consum the closing +// info. +// 6) Both sides of a bidirectional server stream are opened. +// 7) A server unidirectional stream is opened. +// 8) A server unidirectional stream has been closed and both sides consumed the closing info. +// 9) A server unidirectional stream has been closed, but only the server has consumed the closing +// info. +// 10) A server unidirectional stream has been closed, but only the client has consumed the closing +// info. +// 11) Both sides of a bidirectional stream have been closed and consumed by both sides. +// 12) Both sides of a bidirectional stream have been closed, but not consumed by both sides. +// 13) Multiples open streams + +#[test] +fn wt_client_session_close_1() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let bidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(bidi_from_client, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_2() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[unidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_from_client], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_3() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + wt.close_stream_sending_client(unidi_from_client); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_4() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + let mut unidi_from_client_s = wt.receive_data_server(unidi_from_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_from_client_s); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_from_client], + Some(Error::HttpNoError.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_5() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + wt.reset_stream_client(unidi_from_client); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[unidi_from_client], + Some(Error::HttpNoError.code()), + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_6() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_from_server, BUF); + wt.receive_data_client(bidi_from_server.stream_id(), true, BUF, false); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_7() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_from_server, BUF); + wt.receive_data_client(unidi_from_server.stream_id(), true, BUF, false); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[unidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_8() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, true); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_9() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.stream_stop_sending_client(unidi_server.stream_id()); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpNoError.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_10() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_11() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, true); + wt.stream_stop_sending_server(&mut bidi_server); + wt.receive_stop_sending_client(bidi_server.stream_id()); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_12() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.stream_stop_sending_server(&mut bidi_server); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpNoError.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_13() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_1, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false)); + let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_2, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +// For the following tests the server cancels a session. The streams are in different states: +// 1) Both sides of a bidirectional client stream are opened. +// 2) A client unidirectional stream is opened. +// 3) A client unidirectional stream has been closed and consumed by both sides. +// 4) A client unidirectional stream has been closed, but not consumed by the client. +// 5) Both sides of a bidirectional server stream are opened. +// 6) A server unidirectional stream is opened. +// 7) A server unidirectional stream has been closed and consumed by both sides. +// 8) A server unidirectional stream has been closed, but not consumed by the client. +// 9) Both sides of a bidirectional stream have been closed and consumed by both sides. +// 10) Both sides of a bidirectional stream have been closed, but not consumed by the client. +// 12) Multiples open streams + +#[test] +fn wt_client_session_server_close_1() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let bidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client, BUF); + std::mem::drop(wt.receive_data_server(bidi_client, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_2() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_client, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + &None, + ); +} + +#[test] +fn wt_client_session_server_close_3() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_client_s); + wt.receive_stop_sending_client(unidi_client); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_4() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_client_s); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpNoError.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_5() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, false); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_6() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, false); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_7() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, true); + + wt.cancel_session_server(&mut wt_session); + + // Already close stream will not have a reset event. + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_8() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + + wt.cancel_session_server(&mut wt_session); + + // The stream was only closed on the server side therefore it is cancelled on the client side. + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_9() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, true); + wt.stream_stop_sending_server(&mut bidi_server); + wt.receive_stop_sending_client(bidi_server.stream_id()); + + wt.cancel_session_server(&mut wt_session); + + // Already close stream will not have a reset event. + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_10() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.stream_stop_sending_server(&mut bidi_server); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpNoError.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_11() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_1, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false)); + let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_2, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_session_close_frame_and_streams_client() { + const BUF: &[u8] = &[0; 10]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.exchange_packets(); + + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); + wt.exchange_packets(); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); +} -- cgit v1.2.3