summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/connection/tests/recovery.rs
blob: 0f12d031075932e99659d8e0d6753fcf7efa0e62 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
    mem,
    time::{Duration, Instant},
};

use neqo_common::qdebug;
use neqo_crypto::AuthenticationStatus;
use test_fixture::{
    assertions::{assert_handshake, assert_initial},
    now, split_datagram,
};

use super::{
    super::{Connection, ConnectionParameters, Output, State},
    assert_full_cwnd, connect, connect_force_idle, connect_rtt_idle, connect_with_rtt, cwnd,
    default_client, default_server, fill_cwnd, maybe_authenticate, new_client, send_and_receive,
    send_something, AT_LEAST_PTO, DEFAULT_RTT, DEFAULT_STREAM_DATA, POST_HANDSHAKE_CWND,
};
use crate::{
    cc::CWND_MIN,
    path::PATH_MTU_V6,
    recovery::{
        FAST_PTO_SCALE, MAX_OUTSTANDING_UNACK, MAX_PTO_PACKET_COUNT, MIN_OUTSTANDING_UNACK,
    },
    rtt::GRANULARITY,
    stats::MAX_PTO_COUNTS,
    tparams::TransportParameter,
    tracking::DEFAULT_ACK_DELAY,
    StreamType,
};

#[test]
fn pto_works_basic() {
    let mut client = default_client();
    let mut server = default_server();
    connect_force_idle(&mut client, &mut server);

    let mut now = now();

    let res = client.process(None, now);
    let idle_timeout = ConnectionParameters::default().get_idle_timeout();
    assert_eq!(res, Output::Callback(idle_timeout));

    // Send data on two streams
    let stream1 = client.stream_create(StreamType::UniDi).unwrap();
    assert_eq!(client.stream_send(stream1, b"hello").unwrap(), 5);
    assert_eq!(client.stream_send(stream1, b" world!").unwrap(), 7);

    let stream2 = client.stream_create(StreamType::UniDi).unwrap();
    assert_eq!(client.stream_send(stream2, b"there!").unwrap(), 6);

    // Send a packet after some time.
    now += Duration::from_secs(10);
    let out = client.process(None, now);
    assert!(out.dgram().is_some());

    // Nothing to do, should return callback
    let out = client.process(None, now);
    assert!(matches!(out, Output::Callback(_)));

    // One second later, it should want to send PTO packet
    now += AT_LEAST_PTO;
    let out = client.process(None, now);

    let stream_before = server.stats().frame_rx.stream;
    server.process_input(&out.dgram().unwrap(), now);
    assert_eq!(server.stats().frame_rx.stream, stream_before + 2);
}

#[test]
fn pto_works_full_cwnd() {
    let mut client = default_client();
    let mut server = default_server();
    let now = connect_rtt_idle(&mut client, &mut server, DEFAULT_RTT);

    // Send lots of data.
    let stream_id = client.stream_create(StreamType::UniDi).unwrap();
    let (dgrams, now) = fill_cwnd(&mut client, stream_id, now);
    assert_full_cwnd(&dgrams, POST_HANDSHAKE_CWND);

    // Fill the CWND after waiting for a PTO.
    let (dgrams, now) = fill_cwnd(&mut client, stream_id, now + AT_LEAST_PTO);
    // Two packets in the PTO.
    // The first should be full sized; the second might be small.
    assert_eq!(dgrams.len(), 2);
    assert_eq!(dgrams[0].len(), PATH_MTU_V6);

    // Both datagrams contain one or more STREAM frames.
    for d in dgrams {
        let stream_before = server.stats().frame_rx.stream;
        server.process_input(&d, now);
        assert!(server.stats().frame_rx.stream > stream_before);
    }
}

#[test]
fn pto_works_ping() {
    let mut client = default_client();
    let mut server = default_server();
    connect_force_idle(&mut client, &mut server);
    let mut now = now() + Duration::from_secs(10);

    // Send a few packets from the client.
    let pkt0 = send_something(&mut client, now);
    let pkt1 = send_something(&mut client, now);
    let pkt2 = send_something(&mut client, now);
    let pkt3 = send_something(&mut client, now);

    // Nothing to do, should return callback
    let cb = client.process(None, now).callback();
    // The PTO timer is calculated with:
    //   RTT + max(rttvar * 4, GRANULARITY) + max_ack_delay
    // With zero RTT and rttvar, max_ack_delay is minimum too (GRANULARITY)
    assert_eq!(cb, GRANULARITY * 2);

    // Process these by server, skipping pkt0
    let srv0 = server.process(Some(&pkt1), now).dgram();
    assert!(srv0.is_some()); // ooo, ack client pkt1

    now += Duration::from_millis(20);

    // process pkt2 (immediate ack because last ack was more than an RTT ago; RTT=0)
    let srv1 = server.process(Some(&pkt2), now).dgram();
    assert!(srv1.is_some()); // this is now dropped

    now += Duration::from_millis(20);
    // process pkt3 (acked for same reason)
    let srv2 = server.process(Some(&pkt3), now).dgram();
    // ack client pkt 2 & 3
    assert!(srv2.is_some());

    // client processes ack
    let pkt4 = client.process(srv2.as_ref(), now).dgram();
    // client resends data from pkt0
    assert!(pkt4.is_some());

    // server sees ooo pkt0 and generates immediate ack
    let srv3 = server.process(Some(&pkt0), now).dgram();
    assert!(srv3.is_some());

    // Accept the acknowledgment.
    let pkt5 = client.process(srv3.as_ref(), now).dgram();
    assert!(pkt5.is_none());

    now += Duration::from_millis(70);
    // PTO expires. No unacked data. Only send PING.
    let client_pings = client.stats().frame_tx.ping;
    let pkt6 = client.process(None, now).dgram();
    assert_eq!(client.stats().frame_tx.ping, client_pings + 1);

    let server_pings = server.stats().frame_rx.ping;
    server.process_input(&pkt6.unwrap(), now);
    assert_eq!(server.stats().frame_rx.ping, server_pings + 1);
}

#[test]
fn pto_initial() {
    const INITIAL_PTO: Duration = Duration::from_millis(300);
    let mut now = now();

    qdebug!("---- client: generate CH");
    let mut client = default_client();
    let pkt1 = client.process(None, now).dgram();
    assert!(pkt1.is_some());
    assert_eq!(pkt1.clone().unwrap().len(), PATH_MTU_V6);

    let delay = client.process(None, now).callback();
    assert_eq!(delay, INITIAL_PTO);

    // Resend initial after PTO.
    now += delay;
    let pkt2 = client.process(None, now).dgram();
    assert!(pkt2.is_some());
    assert_eq!(pkt2.unwrap().len(), PATH_MTU_V6);

    let delay = client.process(None, now).callback();
    // PTO has doubled.
    assert_eq!(delay, INITIAL_PTO * 2);

    // Server process the first initial pkt.
    let mut server = default_server();
    let out = server.process(pkt1.as_ref(), now).dgram();
    assert!(out.is_some());

    // Client receives ack for the first initial packet as well a Handshake packet.
    // After the handshake packet the initial keys and the crypto stream for the initial
    // packet number space will be discarded.
    // Here only an ack for the Handshake packet will be sent.
    let out = client.process(out.as_ref(), now).dgram();
    assert!(out.is_some());

    // We do not have PTO for the resent initial packet any more, but
    // the Handshake PTO timer should be armed.  As the RTT is apparently
    // the same as the initial PTO value, and there is only one sample,
    // the PTO will be 3x the INITIAL PTO.
    let delay = client.process(None, now).callback();
    assert_eq!(delay, INITIAL_PTO * 3);
}

/// A complete handshake that involves a PTO in the Handshake space.
#[test]
fn pto_handshake_complete() {
    const HALF_RTT: Duration = Duration::from_millis(10);

    let mut now = now();
    // start handshake
    let mut client = default_client();
    let mut server = default_server();

    let pkt = client.process(None, now).dgram();
    assert_initial(pkt.as_ref().unwrap(), false);
    let cb = client.process(None, now).callback();
    assert_eq!(cb, Duration::from_millis(300));

    now += HALF_RTT;
    let pkt = server.process(pkt.as_ref(), now).dgram();
    assert_initial(pkt.as_ref().unwrap(), false);

    now += HALF_RTT;
    let pkt = client.process(pkt.as_ref(), now).dgram();
    assert_handshake(pkt.as_ref().unwrap());

    let cb = client.process(None, now).callback();
    // The client now has a single RTT estimate (20ms), so
    // the handshake PTO is set based on that.
    assert_eq!(cb, HALF_RTT * 6);

    now += HALF_RTT;
    let pkt = server.process(pkt.as_ref(), now).dgram();
    assert!(pkt.is_none());

    now += HALF_RTT;
    client.authenticated(AuthenticationStatus::Ok, now);

    qdebug!("---- client: SH..FIN -> FIN");
    let pkt1 = client.process(None, now).dgram();
    assert_handshake(pkt1.as_ref().unwrap());
    assert_eq!(*client.state(), State::Connected);

    let cb = client.process(None, now).callback();
    assert_eq!(cb, HALF_RTT * 6);

    let mut pto_counts = [0; MAX_PTO_COUNTS];
    assert_eq!(client.stats.borrow().pto_counts, pto_counts);

    // Wait for PTO to expire and resend a handshake packet.
    // Wait long enough that the 1-RTT PTO also fires.
    qdebug!("---- client: PTO");
    now += HALF_RTT * 6;
    let pkt2 = client.process(None, now).dgram();
    assert_handshake(pkt2.as_ref().unwrap());

    pto_counts[0] = 1;
    assert_eq!(client.stats.borrow().pto_counts, pto_counts);

    // Get a second PTO packet.
    // Add some application data to this datagram, then split the 1-RTT off.
    // We'll use that packet to force the server to acknowledge 1-RTT.
    let stream_id = client.stream_create(StreamType::UniDi).unwrap();
    client.stream_close_send(stream_id).unwrap();
    let pkt3 = client.process(None, now).dgram();
    assert_handshake(pkt3.as_ref().unwrap());
    let (pkt3_hs, pkt3_1rtt) = split_datagram(&pkt3.unwrap());
    assert_handshake(&pkt3_hs);
    assert!(pkt3_1rtt.is_some());

    // PTO has been doubled.
    let cb = client.process(None, now).callback();
    assert_eq!(cb, HALF_RTT * 12);

    // We still have only a single PTO
    assert_eq!(client.stats.borrow().pto_counts, pto_counts);

    qdebug!("---- server: receive FIN and send ACK");
    now += HALF_RTT;
    // Now let the server have pkt1 and expect an immediate Handshake ACK.
    // The output will be a Handshake packet with ACK and 1-RTT packet with
    // HANDSHAKE_DONE and (because of pkt3_1rtt) an ACK.
    // This should remove the 1-RTT PTO from messing this test up.
    let server_acks = server.stats().frame_tx.ack;
    let server_done = server.stats().frame_tx.handshake_done;
    server.process_input(&pkt3_1rtt.unwrap(), now);
    let ack = server.process(pkt1.as_ref(), now).dgram();
    assert!(ack.is_some());
    assert_eq!(server.stats().frame_tx.ack, server_acks + 2);
    assert_eq!(server.stats().frame_tx.handshake_done, server_done + 1);

    // Check that the other packets (pkt2, pkt3) are Handshake packets.
    // The server discarded the Handshake keys already, therefore they are dropped.
    // Note that these don't include 1-RTT packets, because 1-RTT isn't send on PTO.
    let (pkt2_hs, pkt2_1rtt) = split_datagram(&pkt2.unwrap());
    assert_handshake(&pkt2_hs);
    assert!(pkt2_1rtt.is_some());
    let dropped_before1 = server.stats().dropped_rx;
    let server_frames = server.stats().frame_rx.all;
    server.process_input(&pkt2_hs, now);
    assert_eq!(1, server.stats().dropped_rx - dropped_before1);
    assert_eq!(server.stats().frame_rx.all, server_frames);

    server.process_input(&pkt2_1rtt.unwrap(), now);
    let server_frames2 = server.stats().frame_rx.all;
    let dropped_before2 = server.stats().dropped_rx;
    server.process_input(&pkt3_hs, now);
    assert_eq!(1, server.stats().dropped_rx - dropped_before2);
    assert_eq!(server.stats().frame_rx.all, server_frames2);

    now += HALF_RTT;

    // Let the client receive the ACK.
    // It should now be wait to acknowledge the HANDSHAKE_DONE.
    let cb = client.process(ack.as_ref(), now).callback();
    // The default ack delay is the RTT divided by the default ACK ratio of 4.
    let expected_ack_delay = HALF_RTT * 2 / 4;
    assert_eq!(cb, expected_ack_delay);

    // Let the ACK delay timer expire.
    now += cb;
    let out = client.process(None, now).dgram();
    assert!(out.is_some());
}

/// Test that PTO in the Handshake space contains the right frames.
#[test]
fn pto_handshake_frames() {
    let mut now = now();
    qdebug!("---- client: generate CH");
    let mut client = default_client();
    let pkt = client.process(None, now);

    now += Duration::from_millis(10);
    qdebug!("---- server: CH -> SH, EE, CERT, CV, FIN");
    let mut server = default_server();
    let pkt = server.process(pkt.as_dgram_ref(), now);

    now += Duration::from_millis(10);
    qdebug!("---- client: cert verification");
    let pkt = client.process(pkt.as_dgram_ref(), now);

    now += Duration::from_millis(10);
    mem::drop(server.process(pkt.as_dgram_ref(), now));

    now += Duration::from_millis(10);
    client.authenticated(AuthenticationStatus::Ok, now);

    let stream = client.stream_create(StreamType::UniDi).unwrap();
    assert_eq!(stream, 2);
    assert_eq!(client.stream_send(stream, b"zero").unwrap(), 4);
    qdebug!("---- client: SH..FIN -> FIN and 1RTT packet");
    let pkt1 = client.process(None, now).dgram();
    assert!(pkt1.is_some());

    // Get PTO timer.
    let out = client.process(None, now);
    assert_eq!(out, Output::Callback(Duration::from_millis(60)));

    // Wait for PTO to expire and resend a handshake packet.
    now += Duration::from_millis(60);
    let pkt2 = client.process(None, now).dgram();
    assert!(pkt2.is_some());

    now += Duration::from_millis(10);
    let crypto_before = server.stats().frame_rx.crypto;
    server.process_input(&pkt2.unwrap(), now);
    assert_eq!(server.stats().frame_rx.crypto, crypto_before + 1);
}

/// In the case that the Handshake takes too many packets, the server might
/// be stalled on the anti-amplification limit.  If a Handshake ACK from the
/// client is lost, the client has to keep the PTO timer armed or the server
/// might be unable to send anything, causing a deadlock.
#[test]
fn handshake_ack_pto() {
    const RTT: Duration = Duration::from_millis(10);
    let mut now = now();
    let mut client = default_client();
    let mut server = default_server();
    // This is a greasing transport parameter, and large enough that the
    // server needs to send two Handshake packets.
    let big = TransportParameter::Bytes(vec![0; PATH_MTU_V6]);
    server.set_local_tparam(0xce16, big).unwrap();

    let c1 = client.process(None, now).dgram();

    now += RTT / 2;
    let s1 = server.process(c1.as_ref(), now).dgram();
    assert!(s1.is_some());
    let s2 = server.process(None, now).dgram();
    assert!(s1.is_some());

    // Now let the client have the Initial, but drop the first coalesced Handshake packet.
    now += RTT / 2;
    let (initial, _) = split_datagram(&s1.unwrap());
    client.process_input(&initial, now);
    let c2 = client.process(s2.as_ref(), now).dgram();
    assert!(c2.is_some()); // This is an ACK.  Drop it.
    let delay = client.process(None, now).callback();
    assert_eq!(delay, RTT * 3);

    let mut pto_counts = [0; MAX_PTO_COUNTS];
    assert_eq!(client.stats.borrow().pto_counts, pto_counts);

    // Wait for the PTO and ensure that the client generates a packet.
    now += delay;
    let c3 = client.process(None, now).dgram();
    assert!(c3.is_some());

    now += RTT / 2;
    let ping_before = server.stats().frame_rx.ping;
    server.process_input(&c3.unwrap(), now);
    assert_eq!(server.stats().frame_rx.ping, ping_before + 1);

    pto_counts[0] = 1;
    assert_eq!(client.stats.borrow().pto_counts, pto_counts);

    // Now complete the handshake as cheaply as possible.
    let dgram = server.process(None, now).dgram();
    client.process_input(&dgram.unwrap(), now);
    maybe_authenticate(&mut client);
    let dgram = client.process(None, now).dgram();
    assert_eq!(*client.state(), State::Connected);
    let dgram = server.process(dgram.as_ref(), now).dgram();
    assert_eq!(*server.state(), State::Confirmed);
    client.process_input(&dgram.unwrap(), now);
    assert_eq!(*client.state(), State::Confirmed);

    assert_eq!(client.stats.borrow().pto_counts, pto_counts);
}

#[test]
fn loss_recovery_crash() {
    let mut client = default_client();
    let mut server = default_server();
    connect(&mut client, &mut server);
    let now = now();

    // The server sends something, but we will drop this.
    mem::drop(send_something(&mut server, now));

    // Then send something again, but let it through.
    let ack = send_and_receive(&mut server, &mut client, now);
    assert!(ack.is_some());

    // Have the server process the ACK.
    let cb = server.process(ack.as_ref(), now).callback();
    assert!(cb > Duration::from_secs(0));

    // Now we leap into the future.  The server should regard the first
    // packet as lost based on time alone.
    let dgram = server.process(None, now + AT_LEAST_PTO).dgram();
    assert!(dgram.is_some());

    // This crashes.
    mem::drop(send_something(&mut server, now + AT_LEAST_PTO));
}

// If we receive packets after the PTO timer has fired, we won't clear
// the PTO state, but we might need to acknowledge those packets.
// This shouldn't happen, but we found that some implementations do this.
#[test]
fn ack_after_pto() {
    let mut client = default_client();
    let mut server = default_server();
    connect_force_idle(&mut client, &mut server);

    let mut now = now();

    // The client sends and is forced into a PTO.
    mem::drop(send_something(&mut client, now));

    // Jump forward to the PTO and drain the PTO packets.
    now += AT_LEAST_PTO;
    // We can use MAX_PTO_PACKET_COUNT, because we know the handshake is over.
    for _ in 0..MAX_PTO_PACKET_COUNT {
        let dgram = client.process(None, now).dgram();
        assert!(dgram.is_some());
    }
    assert!(client.process(None, now).dgram().is_none());

    // The server now needs to send something that will cause the
    // client to want to acknowledge it.  A little out of order
    // delivery is just the thing.
    // Note: The server can't ACK anything here, but none of what
    // the client has sent so far has been transferred.
    mem::drop(send_something(&mut server, now));
    let dgram = send_something(&mut server, now);

    // The client is now after a PTO, but if it receives something
    // that demands acknowledgment, it will send just the ACK.
    let ack = client.process(Some(&dgram), now).dgram();
    assert!(ack.is_some());

    // Make sure that the packet only contained an ACK frame.
    let all_frames_before = server.stats().frame_rx.all;
    let ack_before = server.stats().frame_rx.ack;
    server.process_input(&ack.unwrap(), now);
    assert_eq!(server.stats().frame_rx.all, all_frames_before + 1);
    assert_eq!(server.stats().frame_rx.ack, ack_before + 1);
}

/// When we declare a packet as lost, we keep it around for a while for another loss period.
/// Those packets should not affect how we report the loss recovery timer.
/// As the loss recovery timer based on RTT we use that to drive the state.
#[test]
fn lost_but_kept_and_lr_timer() {
    const RTT: Duration = Duration::from_secs(1);
    let mut client = default_client();
    let mut server = default_server();
    let mut now = connect_with_rtt(&mut client, &mut server, now(), RTT);

    // Two packets (p1, p2) are sent at around t=0.  The first is lost.
    let _p1 = send_something(&mut client, now);
    let p2 = send_something(&mut client, now);

    // At t=RTT/2 the server receives the packet and ACKs it.
    now += RTT / 2;
    let ack = server.process(Some(&p2), now).dgram();
    assert!(ack.is_some());
    // The client also sends another two packets (p3, p4), again losing the first.
    let _p3 = send_something(&mut client, now);
    let p4 = send_something(&mut client, now);

    // At t=RTT the client receives the ACK and goes into timed loss recovery.
    // The client doesn't call p1 lost at this stage, but it will soon.
    now += RTT / 2;
    let res = client.process(ack.as_ref(), now);
    // The client should be on a loss recovery timer as p1 is missing.
    let lr_timer = res.callback();
    // Loss recovery timer should be RTT/8, but only check for 0 or >=RTT/2.
    assert_ne!(lr_timer, Duration::from_secs(0));
    assert!(lr_timer < (RTT / 2));
    // The server also receives and acknowledges p4, again sending an ACK.
    let ack = server.process(Some(&p4), now).dgram();
    assert!(ack.is_some());

    // At t=RTT*3/2 the client should declare p1 to be lost.
    now += RTT / 2;
    // So the client will send the data from p1 again.
    let res = client.process(None, now);
    assert!(res.dgram().is_some());
    // When the client processes the ACK, it should engage the
    // loss recovery timer for p3, not p1 (even though it still tracks p1).
    let res = client.process(ack.as_ref(), now);
    let lr_timer2 = res.callback();
    assert_eq!(lr_timer, lr_timer2);
}

/// We should not be setting the loss recovery timer based on packets
/// that are sent prior to the largest acknowledged.
/// Testing this requires that we construct a case where one packet
/// number space causes the loss recovery timer to be engaged.  At the same time,
/// there is a packet in another space that hasn't been acknowledged AND
/// that packet number space has not received acknowledgments for later packets.
#[test]
fn loss_time_past_largest_acked() {
    const RTT: Duration = Duration::from_secs(10);
    const INCR: Duration = Duration::from_millis(1);
    let mut client = default_client();
    let mut server = default_server();

    let mut now = now();

    // Start the handshake.
    let c_in = client.process(None, now).dgram();
    now += RTT / 2;
    let s_hs1 = server.process(c_in.as_ref(), now).dgram();

    // Get some spare server handshake packets for the client to ACK.
    // This involves a time machine, so be a little cautious.
    // This test uses an RTT of 10s, but our server starts
    // with a much lower RTT estimate, so the PTO at this point should
    // be much smaller than an RTT and so the server shouldn't see
    // time go backwards.
    let s_pto = server.process(None, now).callback();
    assert_ne!(s_pto, Duration::from_secs(0));
    assert!(s_pto < RTT);
    let s_hs2 = server.process(None, now + s_pto).dgram();
    assert!(s_hs2.is_some());
    let s_hs3 = server.process(None, now + s_pto).dgram();
    assert!(s_hs3.is_some());

    // Get some Handshake packets from the client.
    // We need one to be left unacknowledged before one that is acknowledged.
    // So that the client engages the loss recovery timer.
    // This is complicated by the fact that it is hard to cause the client
    // to generate an ack-eliciting packet.  For that, we use the Finished message.
    // Reordering delivery ensures that the later packet is also acknowledged.
    now += RTT / 2;
    let c_hs1 = client.process(s_hs1.as_ref(), now).dgram();
    assert!(c_hs1.is_some()); // This comes first, so it's useless.
    maybe_authenticate(&mut client);
    let c_hs2 = client.process(None, now).dgram();
    assert!(c_hs2.is_some()); // This one will elicit an ACK.

    // The we need the outstanding packet to be sent after the
    // application data packet, so space these out a tiny bit.
    let _p1 = send_something(&mut client, now + INCR);
    let c_hs3 = client.process(s_hs2.as_ref(), now + (INCR * 2)).dgram();
    assert!(c_hs3.is_some()); // This will be left outstanding.
    let c_hs4 = client.process(s_hs3.as_ref(), now + (INCR * 3)).dgram();
    assert!(c_hs4.is_some()); // This will be acknowledged.

    // Process c_hs2 and c_hs4, but skip c_hs3.
    // Then get an ACK for the client.
    now += RTT / 2;
    // Deliver c_hs4 first, but don't generate a packet.
    server.process_input(&c_hs4.unwrap(), now);
    let s_ack = server.process(c_hs2.as_ref(), now).dgram();
    assert!(s_ack.is_some());
    // This includes an ACK, but it also includes HANDSHAKE_DONE,
    // which we need to remove because that will cause the Handshake loss
    // recovery state to be dropped.
    let (s_hs_ack, _s_ap_ack) = split_datagram(&s_ack.unwrap());

    // Now the client should start its loss recovery timer based on the ACK.
    now += RTT / 2;
    let c_ack = client.process(Some(&s_hs_ack), now).dgram();
    assert!(c_ack.is_none());
    // The client should now have the loss recovery timer active.
    let lr_time = client.process(None, now).callback();
    assert_ne!(lr_time, Duration::from_secs(0));
    assert!(lr_time < (RTT / 2));
}

/// `sender` sends a little, `receiver` acknowledges it.
/// Repeat until `count` acknowledgements are sent.
/// Returns the last packet containing acknowledgements, if any.
fn trickle(sender: &mut Connection, receiver: &mut Connection, mut count: usize, now: Instant) {
    let id = sender.stream_create(StreamType::UniDi).unwrap();
    let mut maybe_ack = None;
    while count > 0 {
        qdebug!("trickle: remaining={}", count);
        assert_eq!(sender.stream_send(id, &[9]).unwrap(), 1);
        let dgram = sender.process(maybe_ack.as_ref(), now).dgram();

        maybe_ack = receiver.process(dgram.as_ref(), now).dgram();
        count -= usize::from(maybe_ack.is_some());
    }
    sender.process_input(&maybe_ack.unwrap(), now);
}

/// Ensure that a PING frame is sent with ACK sometimes.
/// `fast` allows testing of when `MAX_OUTSTANDING_UNACK` packets are
/// outstanding (`fast` is `true`) within 1 PTO and when only
/// `MIN_OUTSTANDING_UNACK` packets arrive after 2 PTOs (`fast` is `false`).
fn ping_with_ack(fast: bool) {
    let mut sender = default_client();
    let mut receiver = default_server();
    let mut now = now();
    connect_force_idle(&mut sender, &mut receiver);
    let sender_acks_before = sender.stats().frame_tx.ack;
    let receiver_acks_before = receiver.stats().frame_tx.ack;
    let count = if fast {
        MAX_OUTSTANDING_UNACK
    } else {
        MIN_OUTSTANDING_UNACK
    };
    trickle(&mut sender, &mut receiver, count, now);
    assert_eq!(sender.stats().frame_tx.ack, sender_acks_before);
    assert_eq!(receiver.stats().frame_tx.ack, receiver_acks_before + count);
    assert_eq!(receiver.stats().frame_tx.ping, 0);

    if !fast {
        // Wait at least one PTO, from the reciever's perspective.
        // A receiver that hasn't received MAX_OUTSTANDING_UNACK won't send PING.
        now += receiver.pto() + Duration::from_micros(1);
        trickle(&mut sender, &mut receiver, 1, now);
        assert_eq!(receiver.stats().frame_tx.ping, 0);
    }

    // After a second PTO (or the first if fast), new acknowledgements come
    // with a PING frame and cause an ACK to be sent by the sender.
    now += receiver.pto() + Duration::from_micros(1);
    trickle(&mut sender, &mut receiver, 1, now);
    assert_eq!(receiver.stats().frame_tx.ping, 1);
    if let Output::Callback(t) = sender.process_output(now) {
        assert_eq!(t, DEFAULT_ACK_DELAY);
        assert!(sender.process_output(now + t).dgram().is_some());
    }
    assert_eq!(sender.stats().frame_tx.ack, sender_acks_before + 1);
}

#[test]
fn ping_with_ack_fast() {
    ping_with_ack(true);
}

#[test]
fn ping_with_ack_slow() {
    ping_with_ack(false);
}

#[test]
fn ping_with_ack_min() {
    const COUNT: usize = MIN_OUTSTANDING_UNACK - 2;
    let mut sender = default_client();
    let mut receiver = default_server();
    let mut now = now();
    connect_force_idle(&mut sender, &mut receiver);
    let sender_acks_before = sender.stats().frame_tx.ack;
    let receiver_acks_before = receiver.stats().frame_tx.ack;
    trickle(&mut sender, &mut receiver, COUNT, now);
    assert_eq!(sender.stats().frame_tx.ack, sender_acks_before);
    assert_eq!(receiver.stats().frame_tx.ack, receiver_acks_before + COUNT);
    assert_eq!(receiver.stats().frame_tx.ping, 0);

    // After 3 PTO, no PING because there are too few outstanding packets.
    now += receiver.pto() * 3 + Duration::from_micros(1);
    trickle(&mut sender, &mut receiver, 1, now);
    assert_eq!(receiver.stats().frame_tx.ping, 0);
}

/// This calculates the PTO timer immediately after connection establishment.
/// It depends on there only being 2 RTT samples in the handshake.
fn expected_pto(rtt: Duration) -> Duration {
    // PTO calculation is rtt + 4rttvar + ack delay.
    // rttvar should be (rtt + 4 * (rtt / 2) * (3/4)^n + 25ms)/2
    // where n is the number of round trips
    // This uses a 25ms ack delay as the ACK delay extension
    // is negotiated and no ACK_DELAY frame has been received.
    rtt + rtt * 9 / 8 + Duration::from_millis(25)
}

#[test]
fn fast_pto() {
    let mut client = new_client(ConnectionParameters::default().fast_pto(FAST_PTO_SCALE / 2));
    let mut server = default_server();
    let mut now = connect_rtt_idle(&mut client, &mut server, DEFAULT_RTT);

    let res = client.process(None, now);
    let idle_timeout = ConnectionParameters::default().get_idle_timeout() - (DEFAULT_RTT / 2);
    assert_eq!(res, Output::Callback(idle_timeout));

    // Send data on two streams
    let stream = client.stream_create(StreamType::UniDi).unwrap();
    assert_eq!(
        client.stream_send(stream, DEFAULT_STREAM_DATA).unwrap(),
        DEFAULT_STREAM_DATA.len()
    );

    // Send a packet after some time.
    now += idle_timeout / 2;
    let dgram = client.process_output(now).dgram();
    assert!(dgram.is_some());

    // Nothing to do, should return a callback.
    let cb = client.process_output(now).callback();
    assert_eq!(expected_pto(DEFAULT_RTT) / 2, cb);

    // Once the PTO timer expires, a PTO packet should be sent should want to send PTO packet.
    now += cb;
    let dgram = client.process(None, now).dgram();

    let stream_before = server.stats().frame_rx.stream;
    server.process_input(&dgram.unwrap(), now);
    assert_eq!(server.stats().frame_rx.stream, stream_before + 1);
}

/// Even if the PTO timer is slowed right down, persistent congestion is declared
/// based on the "true" value of the timer.
#[test]
fn fast_pto_persistent_congestion() {
    let mut client = new_client(ConnectionParameters::default().fast_pto(FAST_PTO_SCALE * 2));
    let mut server = default_server();
    let mut now = connect_rtt_idle(&mut client, &mut server, DEFAULT_RTT);

    let res = client.process(None, now);
    let idle_timeout = ConnectionParameters::default().get_idle_timeout() - (DEFAULT_RTT / 2);
    assert_eq!(res, Output::Callback(idle_timeout));

    // Send packets spaced by the PTO timer.  And lose them.
    // Note: This timing is a tiny bit higher than the client will use
    // to determine persistent congestion. The ACK below adds another RTT
    // estimate, which will reduce rttvar by 3/4, so persistent congestion
    // will occur at `rtt + rtt*27/32 + 25ms`.
    // That is OK as we're still showing that this interval is less than
    // six times the PTO, which is what would be used if the scaling
    // applied to the PTO used to determine persistent congestion.
    let pc_interval = expected_pto(DEFAULT_RTT) * 3;
    println!("pc_interval {pc_interval:?}");
    let _drop1 = send_something(&mut client, now);

    // Check that the PTO matches expectations.
    let cb = client.process_output(now).callback();
    assert_eq!(expected_pto(DEFAULT_RTT) * 2, cb);

    now += pc_interval;
    let _drop2 = send_something(&mut client, now);
    let _drop3 = send_something(&mut client, now);
    let _drop4 = send_something(&mut client, now);
    let dgram = send_something(&mut client, now);

    // Now acknowledge the tail packet and enter persistent congestion.
    now += DEFAULT_RTT / 2;
    let ack = server.process(Some(&dgram), now).dgram();
    now += DEFAULT_RTT / 2;
    client.process_input(&ack.unwrap(), now);
    assert_eq!(cwnd(&client), CWND_MIN);
}