summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/connection/tests/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-transport/src/connection/tests/stream.rs')
-rw-r--r--third_party/rust/neqo-transport/src/connection/tests/stream.rs1162
1 files changed, 1162 insertions, 0 deletions
diff --git a/third_party/rust/neqo-transport/src/connection/tests/stream.rs b/third_party/rust/neqo-transport/src/connection/tests/stream.rs
new file mode 100644
index 0000000000..586a537b9d
--- /dev/null
+++ b/third_party/rust/neqo-transport/src/connection/tests/stream.rs
@@ -0,0 +1,1162 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::{cmp::max, collections::HashMap, convert::TryFrom, mem};
+
+use neqo_common::{event::Provider, qdebug};
+use test_fixture::now;
+
+use super::{
+ super::State, assert_error, connect, connect_force_idle, default_client, default_server,
+ maybe_authenticate, new_client, new_server, send_something, DEFAULT_STREAM_DATA,
+};
+use crate::{
+ events::ConnectionEvent,
+ recv_stream::RECV_BUFFER_SIZE,
+ send_stream::{OrderGroup, SendStreamState, SEND_BUFFER_SIZE},
+ streams::{SendOrder, StreamOrder},
+ tparams::{self, TransportParameter},
+ // tracking::DEFAULT_ACK_PACKET_TOLERANCE,
+ Connection,
+ ConnectionError,
+ ConnectionParameters,
+ Error,
+ StreamId,
+ StreamType,
+};
+
+#[test]
+fn stream_create() {
+ let mut client = default_client();
+
+ let out = client.process(None, now());
+ let mut server = default_server();
+ let out = server.process(out.as_dgram_ref(), now());
+
+ let out = client.process(out.as_dgram_ref(), now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+ assert!(maybe_authenticate(&mut client));
+ let out = client.process(None, now());
+
+ // client now in State::Connected
+ assert_eq!(client.stream_create(StreamType::UniDi).unwrap(), 2);
+ assert_eq!(client.stream_create(StreamType::UniDi).unwrap(), 6);
+ assert_eq!(client.stream_create(StreamType::BiDi).unwrap(), 0);
+ assert_eq!(client.stream_create(StreamType::BiDi).unwrap(), 4);
+
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+ // server now in State::Connected
+ assert_eq!(server.stream_create(StreamType::UniDi).unwrap(), 3);
+ assert_eq!(server.stream_create(StreamType::UniDi).unwrap(), 7);
+ assert_eq!(server.stream_create(StreamType::BiDi).unwrap(), 1);
+ assert_eq!(server.stream_create(StreamType::BiDi).unwrap(), 5);
+}
+
+#[test]
+// tests stream send/recv after connection is established.
+fn transfer() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect_force_idle(&mut client, &mut server);
+
+ qdebug!("---- client sends");
+ // Send
+ let client_stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ client.stream_send(client_stream_id, &[6; 100]).unwrap();
+ client.stream_send(client_stream_id, &[7; 40]).unwrap();
+ client.stream_send(client_stream_id, &[8; 4000]).unwrap();
+
+ // Send to another stream but some data after fin has been set
+ let client_stream_id2 = client.stream_create(StreamType::UniDi).unwrap();
+ client.stream_send(client_stream_id2, &[6; 60]).unwrap();
+ client.stream_close_send(client_stream_id2).unwrap();
+ client.stream_send(client_stream_id2, &[7; 50]).unwrap_err();
+ // Sending this much takes a few datagrams.
+ let mut datagrams = vec![];
+ let mut out = client.process_output(now());
+ while let Some(d) = out.dgram() {
+ datagrams.push(d);
+ out = client.process_output(now());
+ }
+ assert_eq!(datagrams.len(), 4);
+ assert_eq!(*client.state(), State::Confirmed);
+
+ qdebug!("---- server receives");
+ for d in datagrams {
+ let out = server.process(Some(&d), now());
+ // With an RTT of zero, the server will acknowledge every packet immediately.
+ assert!(out.as_dgram_ref().is_some());
+ qdebug!("Output={:0x?}", out.as_dgram_ref());
+ }
+ assert_eq!(*server.state(), State::Confirmed);
+
+ let mut buf = vec![0; 4000];
+
+ let mut stream_ids = server.events().filter_map(|evt| match evt {
+ ConnectionEvent::NewStream { stream_id, .. } => Some(stream_id),
+ _ => None,
+ });
+ let first_stream = stream_ids.next().expect("should have a new stream event");
+ let second_stream = stream_ids
+ .next()
+ .expect("should have a second new stream event");
+ assert!(stream_ids.next().is_none());
+ let (received1, fin1) = server.stream_recv(first_stream, &mut buf).unwrap();
+ assert_eq!(received1, 4000);
+ assert!(!fin1);
+ let (received2, fin2) = server.stream_recv(first_stream, &mut buf).unwrap();
+ assert_eq!(received2, 140);
+ assert!(!fin2);
+
+ let (received3, fin3) = server.stream_recv(second_stream, &mut buf).unwrap();
+ assert_eq!(received3, 60);
+ assert!(fin3);
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord)]
+struct IdEntry {
+ sendorder: StreamOrder,
+ stream_id: StreamId,
+}
+
+// tests stream sendorder priorization
+fn sendorder_test(order_of_sendorder: &[Option<SendOrder>]) {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect_force_idle(&mut client, &mut server);
+
+ qdebug!("---- client sends");
+ // open all streams and set the sendorders
+ let mut ordered = Vec::new();
+ let mut streams = Vec::<StreamId>::new();
+ for sendorder in order_of_sendorder {
+ let id = client.stream_create(StreamType::UniDi).unwrap();
+ streams.push(id);
+ ordered.push((id, *sendorder));
+ // must be set before sendorder
+ client.streams.set_fairness(id, true).ok();
+ client.streams.set_sendorder(id, *sendorder).ok();
+ }
+ // Write some data to all the streams
+ for stream_id in streams {
+ client.stream_send(stream_id, &[6; 100]).unwrap();
+ }
+
+ // Sending this much takes a few datagrams.
+ // Note: this test uses an RTT of 0 which simplifies things (no pacing)
+ let mut datagrams = Vec::new();
+ let mut out = client.process_output(now());
+ while let Some(d) = out.dgram() {
+ datagrams.push(d);
+ out = client.process_output(now());
+ }
+ assert_eq!(*client.state(), State::Confirmed);
+
+ qdebug!("---- server receives");
+ for d in datagrams {
+ let out = server.process(Some(&d), now());
+ qdebug!("Output={:0x?}", out.as_dgram_ref());
+ }
+ assert_eq!(*server.state(), State::Confirmed);
+
+ let stream_ids = server
+ .events()
+ .filter_map(|evt| match evt {
+ ConnectionEvent::RecvStreamReadable { stream_id, .. } => Some(stream_id),
+ _ => None,
+ })
+ .enumerate()
+ .map(|(a, b)| (b, a))
+ .collect::<HashMap<_, _>>();
+
+ // streams should arrive in priority order, not order of creation, if sendorder prioritization
+ // is working correctly
+
+ // 'ordered' has the send order currently. Re-sort it by sendorder, but
+ // if two items from the same sendorder exist, secondarily sort by the ordering in
+ // the stream_ids vector (HashMap<StreamId, index: usize>)
+ ordered.sort_unstable_by_key(|(stream_id, sendorder)| {
+ (
+ StreamOrder {
+ sendorder: *sendorder,
+ },
+ stream_ids[stream_id],
+ )
+ });
+ // make sure everything now is in the same order, since we modified the order of
+ // same-sendorder items to match the ordering of those we saw in reception
+ for (i, (stream_id, _sendorder)) in ordered.iter().enumerate() {
+ assert_eq!(i, stream_ids[stream_id]);
+ }
+}
+
+#[test]
+fn sendorder_0() {
+ sendorder_test(&[None, Some(1), Some(2), Some(3)]);
+}
+#[test]
+fn sendorder_1() {
+ sendorder_test(&[Some(3), Some(2), Some(1), None]);
+}
+#[test]
+fn sendorder_2() {
+ sendorder_test(&[Some(3), None, Some(2), Some(1)]);
+}
+#[test]
+fn sendorder_3() {
+ sendorder_test(&[Some(1), Some(2), None, Some(3)]);
+}
+#[test]
+fn sendorder_4() {
+ sendorder_test(&[
+ Some(1),
+ Some(2),
+ Some(1),
+ None,
+ Some(3),
+ Some(1),
+ Some(3),
+ None,
+ ]);
+}
+
+// Tests stream sendorder priorization
+// Converts Vecs of u64's into StreamIds
+fn fairness_test<S, R>(source: S, number_iterates: usize, truncate_to: usize, result_array: &R)
+where
+ S: IntoIterator,
+ S::Item: Into<StreamId>,
+ R: IntoIterator + std::fmt::Debug,
+ R::Item: Into<StreamId>,
+ Vec<u64>: PartialEq<R>,
+{
+ // test the OrderGroup code used for fairness
+ let mut group: OrderGroup = OrderGroup::default();
+ for stream_id in source {
+ group.insert(stream_id.into());
+ }
+ {
+ let mut iterator1 = group.iter();
+ // advance_by() would help here
+ let mut n = number_iterates;
+ while n > 0 {
+ iterator1.next();
+ n -= 1;
+ }
+ // let iterator1 go out of scope
+ }
+ group.truncate(truncate_to);
+
+ let iterator2 = group.iter();
+ let result: Vec<u64> = iterator2.map(StreamId::as_u64).collect();
+ assert_eq!(result, *result_array);
+}
+
+#[test]
+fn ordergroup_0() {
+ let source: [u64; 0] = [];
+ let result: [u64; 0] = [];
+ fairness_test(source, 1, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_1() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 6] = [1, 2, 3, 4, 5, 0];
+ fairness_test(source, 1, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_2() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 6] = [2, 3, 4, 5, 0, 1];
+ fairness_test(source, 2, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_3() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ fairness_test(source, 10, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_4() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ fairness_test(source, 0, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_5() {
+ let source: [u64; 1] = [0];
+ let result: [u64; 1] = [0];
+ fairness_test(source, 1, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_6() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 6] = [5, 0, 1, 2, 3, 4];
+ fairness_test(source, 5, usize::MAX, &result);
+}
+
+#[test]
+fn ordergroup_7() {
+ let source: [u64; 6] = [0, 1, 2, 3, 4, 5];
+ let result: [u64; 3] = [0, 1, 2];
+ fairness_test(source, 5, 3, &result);
+}
+
+#[test]
+// Send fin even if a peer closes a reomte bidi send stream before sending any data.
+fn report_fin_when_stream_closed_wo_data() {
+ // Note that the two servers in this test will get different anti-replay filters.
+ // That's OK because we aren't testing anti-replay.
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out = client.process(None, now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+
+ server.stream_close_send(stream_id).unwrap();
+ let out = server.process(None, now());
+ mem::drop(client.process(out.as_dgram_ref(), now()));
+ let stream_readable = |e| matches!(e, ConnectionEvent::RecvStreamReadable { .. });
+ assert!(client.events().any(stream_readable));
+}
+
+fn exchange_data(client: &mut Connection, server: &mut Connection) {
+ let mut input = None;
+ loop {
+ let out = client.process(input.as_ref(), now()).dgram();
+ let c_done = out.is_none();
+ let out = server.process(out.as_ref(), now()).dgram();
+ if out.is_none() && c_done {
+ break;
+ }
+ input = out;
+ }
+}
+
+#[test]
+fn sending_max_data() {
+ const SMALL_MAX_DATA: usize = 2048;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(client.events().count(), 2); // SendStreamWritable, StateChange(connected)
+ assert_eq!(stream_id, 2);
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ exchange_data(&mut client, &mut server);
+
+ let mut buf = vec![0; 40000];
+ let (received, fin) = server.stream_recv(stream_id, &mut buf).unwrap();
+ assert_eq!(received, SMALL_MAX_DATA);
+ assert!(!fin);
+
+ let out = server.process(None, now()).dgram();
+ client.process_input(&out.unwrap(), now());
+
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+}
+
+#[test]
+fn max_data() {
+ const SMALL_MAX_DATA: usize = 16383;
+
+ let mut client = default_client();
+ let mut server = default_server();
+
+ server
+ .set_local_tparam(
+ tparams::INITIAL_MAX_DATA,
+ TransportParameter::Integer(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ )
+ .unwrap();
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(client.events().count(), 2); // SendStreamWritable, StateChange(connected)
+ assert_eq!(stream_id, 2);
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+ assert_eq!(client.events().count(), 0);
+
+ assert_eq!(client.stream_send(stream_id, b"hello").unwrap(), 0);
+ client
+ .streams
+ .get_send_stream_mut(stream_id)
+ .unwrap()
+ .mark_as_sent(0, 4096, false);
+ assert_eq!(client.events().count(), 0);
+ client
+ .streams
+ .get_send_stream_mut(stream_id)
+ .unwrap()
+ .mark_as_acked(0, 4096, false);
+ assert_eq!(client.events().count(), 0);
+
+ assert_eq!(client.stream_send(stream_id, b"hello").unwrap(), 0);
+ // no event because still limited by conn max data
+ assert_eq!(client.events().count(), 0);
+
+ // Increase max data. Avail space now limited by stream credit
+ client.streams.handle_max_data(100_000_000);
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SEND_BUFFER_SIZE - SMALL_MAX_DATA
+ );
+
+ // Increase max stream data. Avail space now limited by tx buffer
+ client
+ .streams
+ .get_send_stream_mut(stream_id)
+ .unwrap()
+ .set_max_stream_data(100_000_000);
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SEND_BUFFER_SIZE - SMALL_MAX_DATA + 4096
+ );
+
+ let evts = client.events().collect::<Vec<_>>();
+ assert_eq!(evts.len(), 1);
+ assert!(matches!(
+ evts[0],
+ ConnectionEvent::SendStreamWritable { .. }
+ ));
+}
+
+#[test]
+fn exceed_max_data() {
+ const SMALL_MAX_DATA: usize = 1024;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(client.events().count(), 2); // SendStreamWritable, StateChange(connected)
+ assert_eq!(stream_id, 2);
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ assert_eq!(client.stream_send(stream_id, b"hello").unwrap(), 0);
+
+ // Artificially trick the client to think that it has more flow control credit.
+ client.streams.handle_max_data(100_000_000);
+ assert_eq!(client.stream_send(stream_id, b"h").unwrap(), 1);
+
+ exchange_data(&mut client, &mut server);
+
+ assert_error(
+ &client,
+ &ConnectionError::Transport(Error::PeerError(Error::FlowControlError.code())),
+ );
+ assert_error(
+ &server,
+ &ConnectionError::Transport(Error::FlowControlError),
+ );
+}
+
+#[test]
+// If we send a stop_sending to the peer, we should not accept more data from the peer.
+fn do_not_accept_data_after_stop_sending() {
+ // Note that the two servers in this test will get different anti-replay filters.
+ // That's OK because we aren't testing anti-replay.
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out = client.process(None, now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+
+ let stream_readable = |e| matches!(e, ConnectionEvent::RecvStreamReadable { .. });
+ assert!(server.events().any(stream_readable));
+
+ // Send one more packet from client. The packet should arrive after the server
+ // has already requested stop_sending.
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out_second_data_frame = client.process(None, now());
+ // Call stop sending.
+ assert_eq!(
+ Ok(()),
+ server.stream_stop_sending(stream_id, Error::NoError.code())
+ );
+
+ // Receive the second data frame. The frame should be ignored and
+ // DataReadable events shouldn't be posted.
+ let out = server.process(out_second_data_frame.as_dgram_ref(), now());
+ assert!(!server.events().any(stream_readable));
+
+ mem::drop(client.process(out.as_dgram_ref(), now()));
+ assert_eq!(
+ Err(Error::FinalSizeError),
+ client.stream_send(stream_id, &[0x00])
+ );
+}
+
+#[test]
+// Server sends stop_sending, the client simultaneous sends reset.
+fn simultaneous_stop_sending_and_reset() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out = client.process(None, now());
+ let ack = server.process(out.as_dgram_ref(), now()).dgram();
+
+ let stream_readable =
+ |e| matches!(e, ConnectionEvent::RecvStreamReadable { stream_id: id } if id == stream_id);
+ assert!(server.events().any(stream_readable));
+
+ // The client resets the stream. The packet with reset should arrive after the server
+ // has already requested stop_sending.
+ client.stream_reset_send(stream_id, 0).unwrap();
+ let out_reset_frame = client.process(ack.as_ref(), now()).dgram();
+
+ // Send something out of order to force the server to generate an
+ // acknowledgment at the next opportunity.
+ let force_ack = send_something(&mut client, now());
+ server.process_input(&force_ack, now());
+
+ // Call stop sending.
+ server.stream_stop_sending(stream_id, 0).unwrap();
+ // Receive the second data frame. The frame should be ignored and
+ // DataReadable events shouldn't be posted.
+ let ack = server.process(out_reset_frame.as_ref(), now()).dgram();
+ assert!(ack.is_some());
+ assert!(!server.events().any(stream_readable));
+
+ // The client gets the STOP_SENDING frame.
+ client.process_input(&ack.unwrap(), now());
+ assert_eq!(
+ Err(Error::InvalidStreamId),
+ client.stream_send(stream_id, &[0x00])
+ );
+}
+
+#[test]
+fn client_fin_reorder() {
+ let mut client = default_client();
+ let mut server = default_server();
+
+ // Send ClientHello.
+ let client_hs = client.process(None, now());
+ assert!(client_hs.as_dgram_ref().is_some());
+
+ let server_hs = server.process(client_hs.as_dgram_ref(), now());
+ assert!(server_hs.as_dgram_ref().is_some()); // ServerHello, etc...
+
+ let client_ack = client.process(server_hs.as_dgram_ref(), now());
+ assert!(client_ack.as_dgram_ref().is_some());
+
+ let server_out = server.process(client_ack.as_dgram_ref(), now());
+ assert!(server_out.as_dgram_ref().is_none());
+
+ assert!(maybe_authenticate(&mut client));
+ assert_eq!(*client.state(), State::Connected);
+
+ let client_fin = client.process(None, now());
+ assert!(client_fin.as_dgram_ref().is_some());
+
+ let client_stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ client.stream_send(client_stream_id, &[1, 2, 3]).unwrap();
+ let client_stream_data = client.process(None, now());
+ assert!(client_stream_data.as_dgram_ref().is_some());
+
+ // Now stream data gets before client_fin
+ let server_out = server.process(client_stream_data.as_dgram_ref(), now());
+ assert!(server_out.as_dgram_ref().is_none()); // the packet will be discarded
+
+ assert_eq!(*server.state(), State::Handshaking);
+ let server_out = server.process(client_fin.as_dgram_ref(), now());
+ assert!(server_out.as_dgram_ref().is_some());
+}
+
+#[test]
+fn after_fin_is_read_conn_events_for_stream_should_be_removed() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ let id = server.stream_create(StreamType::BiDi).unwrap();
+ server.stream_send(id, &[6; 10]).unwrap();
+ server.stream_close_send(id).unwrap();
+ let out = server.process(None, now()).dgram();
+ assert!(out.is_some());
+
+ mem::drop(client.process(out.as_ref(), now()));
+
+ // read from the stream before checking connection events.
+ let mut buf = vec![0; 4000];
+ let (_, fin) = client.stream_recv(id, &mut buf).unwrap();
+ assert!(fin);
+
+ // Make sure we do not have RecvStreamReadable events for the stream when fin has been read.
+ let readable_stream_evt =
+ |e| matches!(e, ConnectionEvent::RecvStreamReadable { stream_id } if stream_id == id);
+ assert!(!client.events().any(readable_stream_evt));
+}
+
+#[test]
+fn after_stream_stop_sending_is_called_conn_events_for_stream_should_be_removed() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ let id = server.stream_create(StreamType::BiDi).unwrap();
+ server.stream_send(id, &[6; 10]).unwrap();
+ server.stream_close_send(id).unwrap();
+ let out = server.process(None, now()).dgram();
+ assert!(out.is_some());
+
+ mem::drop(client.process(out.as_ref(), now()));
+
+ // send stop seending.
+ client
+ .stream_stop_sending(id, Error::NoError.code())
+ .unwrap();
+
+ // Make sure we do not have RecvStreamReadable events for the stream after stream_stop_sending
+ // has been called.
+ let readable_stream_evt =
+ |e| matches!(e, ConnectionEvent::RecvStreamReadable { stream_id } if stream_id == id);
+ assert!(!client.events().any(readable_stream_evt));
+}
+
+#[test]
+fn stream_data_blocked_generates_max_stream_data() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ let now = now();
+
+ // Send some data and consume some flow control.
+ let stream_id = server.stream_create(StreamType::UniDi).unwrap();
+ _ = server.stream_send(stream_id, DEFAULT_STREAM_DATA).unwrap();
+ let dgram = server.process(None, now).dgram();
+ assert!(dgram.is_some());
+
+ // Consume the data.
+ client.process_input(&dgram.unwrap(), now);
+ let mut buf = [0; 10];
+ let (count, end) = client.stream_recv(stream_id, &mut buf[..]).unwrap();
+ assert_eq!(count, DEFAULT_STREAM_DATA.len());
+ assert!(!end);
+
+ // Now send `STREAM_DATA_BLOCKED`.
+ let internal_stream = server.streams.get_send_stream_mut(stream_id).unwrap();
+ if let SendStreamState::Send { fc, .. } = internal_stream.state() {
+ fc.blocked();
+ } else {
+ panic!("unexpected stream state");
+ }
+ let dgram = server.process_output(now).dgram();
+ assert!(dgram.is_some());
+
+ let sdb_before = client.stats().frame_rx.stream_data_blocked;
+ let dgram = client.process(dgram.as_ref(), now).dgram();
+ assert_eq!(client.stats().frame_rx.stream_data_blocked, sdb_before + 1);
+ assert!(dgram.is_some());
+
+ // Client should have sent a MAX_STREAM_DATA frame with just a small increase
+ // on the default window size.
+ let msd_before = server.stats().frame_rx.max_stream_data;
+ server.process_input(&dgram.unwrap(), now);
+ assert_eq!(server.stats().frame_rx.max_stream_data, msd_before + 1);
+
+ // Test that the entirety of the receive buffer is available now.
+ let mut written = 0;
+ loop {
+ const LARGE_BUFFER: &[u8] = &[0; 1024];
+ let amount = server.stream_send(stream_id, LARGE_BUFFER).unwrap();
+ if amount == 0 {
+ break;
+ }
+ written += amount;
+ }
+ assert_eq!(written, RECV_BUFFER_SIZE);
+}
+
+/// See <https://github.com/mozilla/neqo/issues/871>
+#[test]
+fn max_streams_after_bidi_closed() {
+ const REQUEST: &[u8] = b"ping";
+ const RESPONSE: &[u8] = b"pong";
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ while client.stream_create(StreamType::BiDi).is_ok() {
+ // Exhaust the stream limit.
+ }
+ // Write on the one stream and send that out.
+ _ = client.stream_send(stream_id, REQUEST).unwrap();
+ client.stream_close_send(stream_id).unwrap();
+ let dgram = client.process(None, now()).dgram();
+
+ // Now handle the stream and send an incomplete response.
+ server.process_input(&dgram.unwrap(), now());
+ server.stream_send(stream_id, RESPONSE).unwrap();
+ let dgram = server.process_output(now()).dgram();
+
+ // The server shouldn't have released more stream credit.
+ client.process_input(&dgram.unwrap(), now());
+ let e = client.stream_create(StreamType::BiDi).unwrap_err();
+ assert!(matches!(e, Error::StreamLimitError));
+
+ // Closing the stream isn't enough.
+ server.stream_close_send(stream_id).unwrap();
+ let dgram = server.process_output(now()).dgram();
+ client.process_input(&dgram.unwrap(), now());
+ assert!(client.stream_create(StreamType::BiDi).is_err());
+
+ // The server needs to see an acknowledgment from the client for its
+ // response AND the server has to read all of the request.
+ // and the server needs to read all the data. Read first.
+ let mut buf = [0; REQUEST.len()];
+ let (count, fin) = server.stream_recv(stream_id, &mut buf).unwrap();
+ assert_eq!(&buf[..count], REQUEST);
+ assert!(fin);
+
+ // We need an ACK from the client now, but that isn't guaranteed,
+ // so give the client one more packet just in case.
+ let dgram = send_something(&mut server, now());
+ client.process_input(&dgram, now());
+
+ // Now get the client to send the ACK and have the server handle that.
+ let dgram = send_something(&mut client, now());
+ let dgram = server.process(Some(&dgram), now()).dgram();
+ client.process_input(&dgram.unwrap(), now());
+ assert!(client.stream_create(StreamType::BiDi).is_ok());
+ assert!(client.stream_create(StreamType::BiDi).is_err());
+}
+
+#[test]
+fn no_dupdata_readable_events() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out = client.process(None, now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+
+ // We have a data_readable event.
+ let stream_readable = |e| matches!(e, ConnectionEvent::RecvStreamReadable { .. });
+ assert!(server.events().any(stream_readable));
+
+ // Send one more data frame from client. The previous stream data has not been read yet,
+ // therefore there should not be a new DataReadable event.
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out_second_data_frame = client.process(None, now());
+ mem::drop(server.process(out_second_data_frame.as_dgram_ref(), now()));
+ assert!(!server.events().any(stream_readable));
+
+ // One more frame with a fin will not produce a new DataReadable event, because the
+ // previous stream data has not been read yet.
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ client.stream_close_send(stream_id).unwrap();
+ let out_third_data_frame = client.process(None, now());
+ mem::drop(server.process(out_third_data_frame.as_dgram_ref(), now()));
+ assert!(!server.events().any(stream_readable));
+}
+
+#[test]
+fn no_dupdata_readable_events_empty_last_frame() {
+ let mut client = default_client();
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = client.stream_create(StreamType::BiDi).unwrap();
+ client.stream_send(stream_id, &[0x00]).unwrap();
+ let out = client.process(None, now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+
+ // We have a data_readable event.
+ let stream_readable = |e| matches!(e, ConnectionEvent::RecvStreamReadable { .. });
+ assert!(server.events().any(stream_readable));
+
+ // An empty frame with a fin will not produce a new DataReadable event, because
+ // the previous stream data has not been read yet.
+ client.stream_close_send(stream_id).unwrap();
+ let out_second_data_frame = client.process(None, now());
+ mem::drop(server.process(out_second_data_frame.as_dgram_ref(), now()));
+ assert!(!server.events().any(stream_readable));
+}
+
+fn change_flow_control(stream_type: StreamType, new_fc: u64) {
+ const RECV_BUFFER_START: u64 = 300;
+
+ let mut client = new_client(
+ ConnectionParameters::default()
+ .max_stream_data(StreamType::BiDi, true, RECV_BUFFER_START)
+ .max_stream_data(StreamType::UniDi, true, RECV_BUFFER_START),
+ );
+ let mut server = default_server();
+ connect(&mut client, &mut server);
+
+ // create a stream
+ let stream_id = server.stream_create(stream_type).unwrap();
+ let written1 = server.stream_send(stream_id, &[0x0; 10000]).unwrap();
+ assert_eq!(u64::try_from(written1).unwrap(), RECV_BUFFER_START);
+
+ // Send the stream to the client.
+ let out = server.process(None, now());
+ mem::drop(client.process(out.as_dgram_ref(), now()));
+
+ // change max_stream_data for stream_id.
+ client.set_stream_max_data(stream_id, new_fc).unwrap();
+
+ // server should receive a MAX_SREAM_DATA frame if the flow control window is updated.
+ let out2 = client.process(None, now());
+ let out3 = server.process(out2.as_dgram_ref(), now());
+ let expected = usize::from(RECV_BUFFER_START < new_fc);
+ assert_eq!(server.stats().frame_rx.max_stream_data, expected);
+
+ // If the flow control window has been increased, server can write more data.
+ let written2 = server.stream_send(stream_id, &[0x0; 10000]).unwrap();
+ if RECV_BUFFER_START < new_fc {
+ assert_eq!(u64::try_from(written2).unwrap(), new_fc - RECV_BUFFER_START);
+ } else {
+ assert_eq!(written2, 0);
+ }
+
+ // Exchange packets so that client gets all data.
+ let out4 = client.process(out3.as_dgram_ref(), now());
+ let out5 = server.process(out4.as_dgram_ref(), now());
+ mem::drop(client.process(out5.as_dgram_ref(), now()));
+
+ // read all data by client
+ let mut buf = [0x0; 10000];
+ let (read, _) = client.stream_recv(stream_id, &mut buf).unwrap();
+ assert_eq!(u64::try_from(read).unwrap(), max(RECV_BUFFER_START, new_fc));
+
+ let out4 = client.process(None, now());
+ mem::drop(server.process(out4.as_dgram_ref(), now()));
+
+ let written3 = server.stream_send(stream_id, &[0x0; 10000]).unwrap();
+ assert_eq!(u64::try_from(written3).unwrap(), new_fc);
+}
+
+#[test]
+fn increase_decrease_flow_control() {
+ const RECV_BUFFER_NEW_BIGGER: u64 = 400;
+ const RECV_BUFFER_NEW_SMALLER: u64 = 200;
+
+ change_flow_control(StreamType::UniDi, RECV_BUFFER_NEW_BIGGER);
+ change_flow_control(StreamType::BiDi, RECV_BUFFER_NEW_BIGGER);
+
+ change_flow_control(StreamType::UniDi, RECV_BUFFER_NEW_SMALLER);
+ change_flow_control(StreamType::BiDi, RECV_BUFFER_NEW_SMALLER);
+}
+
+#[test]
+fn session_flow_control_stop_sending_state_recv() {
+ const SMALL_MAX_DATA: usize = 1024;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ // send 1 byte so that the server learns about the stream.
+ assert_eq!(client.stream_send(stream_id, b"a").unwrap(), 1);
+
+ exchange_data(&mut client, &mut server);
+
+ server
+ .stream_stop_sending(stream_id, Error::NoError.code())
+ .unwrap();
+
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA])
+ .unwrap(),
+ SMALL_MAX_DATA - 1
+ );
+
+ // In this case the final size is only known after RESET frame is received.
+ // The server sends STOP_SENDING -> the client sends RESET -> the server
+ // sends MAX_DATA.
+ let out = server.process(None, now()).dgram();
+ let out = client.process(out.as_ref(), now()).dgram();
+ // the client is still limited.
+ let stream_id2 = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(client.stream_avail_send_space(stream_id2).unwrap(), 0);
+ let out = server.process(out.as_ref(), now()).dgram();
+ client.process_input(&out.unwrap(), now());
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA
+ );
+}
+
+#[test]
+fn session_flow_control_stop_sending_state_size_known() {
+ const SMALL_MAX_DATA: usize = 1024;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ // send 1 byte so that the server learns about the stream.
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ let out1 = client.process(None, now()).dgram();
+ // Delay this packet and let the server receive fin first (it will enter SizeKnown state).
+ client.stream_close_send(stream_id).unwrap();
+ let out2 = client.process(None, now()).dgram();
+
+ server.process_input(&out2.unwrap(), now());
+
+ server
+ .stream_stop_sending(stream_id, Error::NoError.code())
+ .unwrap();
+
+ // In this case the final size is known when stream_stop_sending is called
+ // and the server releases flow control immediately and sends STOP_SENDING and
+ // MAX_DATA in the same packet.
+ let out = server.process(out1.as_ref(), now()).dgram();
+ client.process_input(&out.unwrap(), now());
+
+ // The flow control should have been updated and the client can again send
+ // SMALL_MAX_DATA.
+ let stream_id2 = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA
+ );
+}
+
+#[test]
+fn session_flow_control_stop_sending_state_data_recvd() {
+ const SMALL_MAX_DATA: usize = 1024;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ // send 1 byte so that the server learns about the stream.
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA + 1])
+ .unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ client.stream_close_send(stream_id).unwrap();
+
+ exchange_data(&mut client, &mut server);
+
+ // The stream is DataRecvd state
+ server
+ .stream_stop_sending(stream_id, Error::NoError.code())
+ .unwrap();
+
+ exchange_data(&mut client, &mut server);
+
+ // The flow control should have been updated and the client can again send
+ // SMALL_MAX_DATA.
+ let stream_id2 = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA
+ );
+}
+
+#[test]
+fn session_flow_control_affects_all_streams() {
+ const SMALL_MAX_DATA: usize = 1024;
+
+ let mut client = default_client();
+ let mut server = new_server(
+ ConnectionParameters::default().max_data(u64::try_from(SMALL_MAX_DATA).unwrap()),
+ );
+
+ connect(&mut client, &mut server);
+
+ let stream_id = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ let stream_id2 = client.stream_create(StreamType::UniDi).unwrap();
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ assert_eq!(
+ client
+ .stream_send(stream_id, &[b'a'; SMALL_MAX_DATA / 2 + 1])
+ .unwrap(),
+ SMALL_MAX_DATA / 2 + 1
+ );
+
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA / 2 - 1
+ );
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA / 2 - 1
+ );
+
+ exchange_data(&mut client, &mut server);
+
+ let mut buf = [0x0; SMALL_MAX_DATA];
+ let (read, _) = server.stream_recv(stream_id, &mut buf).unwrap();
+ assert_eq!(read, SMALL_MAX_DATA / 2 + 1);
+
+ exchange_data(&mut client, &mut server);
+
+ assert_eq!(
+ client.stream_avail_send_space(stream_id).unwrap(),
+ SMALL_MAX_DATA
+ );
+
+ assert_eq!(
+ client.stream_avail_send_space(stream_id2).unwrap(),
+ SMALL_MAX_DATA
+ );
+}
+
+fn connect_w_different_limit(bidi_limit: u64, unidi_limit: u64) {
+ let mut client = default_client();
+ let out = client.process(None, now());
+ let mut server = new_server(
+ ConnectionParameters::default()
+ .max_streams(StreamType::BiDi, bidi_limit)
+ .max_streams(StreamType::UniDi, unidi_limit),
+ );
+ let out = server.process(out.as_dgram_ref(), now());
+
+ let out = client.process(out.as_dgram_ref(), now());
+ mem::drop(server.process(out.as_dgram_ref(), now()));
+
+ assert!(maybe_authenticate(&mut client));
+
+ let mut bidi_events = 0;
+ let mut unidi_events = 0;
+ let mut connected_events = 0;
+ for e in client.events() {
+ match e {
+ ConnectionEvent::SendStreamCreatable { stream_type } => {
+ if stream_type == StreamType::BiDi {
+ bidi_events += 1;
+ } else {
+ unidi_events += 1;
+ }
+ }
+ ConnectionEvent::StateChange(State::Connected) => {
+ connected_events += 1;
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(bidi_events, usize::from(bidi_limit > 0));
+ assert_eq!(unidi_events, usize::from(unidi_limit > 0));
+ assert_eq!(connected_events, 1);
+}
+
+#[test]
+fn client_stream_creatable_event() {
+ connect_w_different_limit(0, 0);
+ connect_w_different_limit(0, 1);
+ connect_w_different_limit(1, 0);
+ connect_w_different_limit(1, 1);
+}