/* * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "net/dcsctp/tx/retransmission_queue.h" #include #include #include #include #include #include #include "absl/types/optional.h" #include "api/array_view.h" #include "api/task_queue/task_queue_base.h" #include "net/dcsctp/common/handover_testing.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/sack_chunk.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_options.h" #include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h" #include "net/dcsctp/testing/data_generator.h" #include "net/dcsctp/testing/testing_macros.h" #include "net/dcsctp/timer/fake_timeout.h" #include "net/dcsctp/timer/timer.h" #include "net/dcsctp/tx/mock_send_queue.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/gunit.h" #include "test/gmock.h" namespace dcsctp { namespace { using ::testing::MockFunction; using State = ::dcsctp::RetransmissionQueue::State; using ::testing::_; using ::testing::ElementsAre; using ::testing::Field; using ::testing::IsEmpty; using ::testing::NiceMock; using ::testing::Pair; using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; using ::webrtc::TimeDelta; using ::webrtc::Timestamp; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; constexpr OutgoingMessageId kMessageId = OutgoingMessageId(42); DcSctpOptions MakeOptions() { DcSctpOptions options; options.mtu = kMaxMtu; return options; } class RetransmissionQueueTest : public testing::Test { protected: RetransmissionQueueTest() : options_(MakeOptions()), gen_(MID(42)), timeout_manager_([this]() { return now_; }), timer_manager_([this](webrtc::TaskQueueBase::DelayPrecision precision) { return timeout_manager_.CreateTimeout(precision); }), timer_(timer_manager_.CreateTimer( "test/t3_rtx", []() { return TimeDelta::Zero(); }, TimerOptions(options_.rto_initial.ToTimeDelta()))) {} std::function CreateChunk( OutgoingMessageId message_id) { return [this, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend(message_id, gen_.Ordered({1, 2, 3, 4}, "BE")); }; } std::vector GetTSNsForFastRetransmit(RetransmissionQueue& queue) { std::vector tsns; for (const auto& elem : queue.GetChunksForFastRetransmit(10000)) { tsns.push_back(elem.first); } return tsns; } std::vector GetSentPacketTSNs(RetransmissionQueue& queue) { std::vector tsns; for (const auto& elem : queue.GetChunksToSend(now_, 10000)) { tsns.push_back(elem.first); } return tsns; } RetransmissionQueue CreateQueue(bool supports_partial_reliability = true, bool use_message_interleaving = false) { return RetransmissionQueue( "", &callbacks_, TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(), on_clear_retransmission_counter_.AsStdFunction(), *timer_, options_, supports_partial_reliability, use_message_interleaving); } std::unique_ptr CreateQueueByHandover( RetransmissionQueue& queue) { EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus()); DcSctpSocketHandoverState state; queue.AddHandoverState(state); g_handover_state_transformer_for_test(&state); auto queue2 = std::make_unique( "", &callbacks_, TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(), on_clear_retransmission_counter_.AsStdFunction(), *timer_, options_, /*supports_partial_reliability=*/true, /*use_message_interleaving=*/false); queue2->RestoreFromState(state); return queue2; } MockDcSctpSocketCallbacks callbacks_; DcSctpOptions options_; DataGenerator gen_; Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; NiceMock> on_rtt_; NiceMock> on_clear_retransmission_counter_; NiceMock producer_; std::unique_ptr timer_; }; TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked))); } TEST_F(RetransmissionQueueTest, SendOneChunk) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); } TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked))); } TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kInFlight))); } TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15), TSN(16), TSN(17))); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 5)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kNacked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kNacked), // Pair(TSN(17), State::kAcked))); } TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15), TSN(16), TSN(17))); // Send more chunks, but leave some as gaps to force retransmission after // three NACKs. // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); // Ack 12, 14-15, 17-18 queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 6)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kNacked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kNacked), // Pair(TSN(17), State::kAcked), // Pair(TSN(18), State::kAcked))); // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); // Ack 12, 14-15, 17-19 queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 7)}, {})); // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); // Ack 12, 14-15, 17-20 queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 8)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kToBeRetransmitted), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kToBeRetransmitted), // Pair(TSN(17), State::kAcked), // Pair(TSN(18), State::kAcked), // Pair(TSN(19), State::kAcked), // Pair(TSN(20), State::kAcked))); // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be // resent right now. The send queue will not even be queried. EXPECT_CALL(producer_, Produce).Times(0); EXPECT_THAT(GetTSNsForFastRetransmit(queue), testing::ElementsAre(TSN(13), TSN(16))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kInFlight), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kInFlight), // Pair(TSN(17), State::kAcked), // Pair(TSN(18), State::kAcked), // Pair(TSN(19), State::kAcked), // Pair(TSN(20), State::kAcked))); } TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Verifies that if fast retransmit is retransmitting the first outstanding // TSN, it will also restart T3-RTX. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); static constexpr Timestamp kStartTime = Timestamp::Seconds(100); now_ = kStartTime; EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); // Ack 10, 12, after 100ms. now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kNacked), // Pair(TSN(12), State::kAcked))); // Send 13 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13))); // Ack 10, 12-13, after 100ms. now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); // Send 14 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14))); // Ack 10, 12-14, after 100 ms. now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kToBeRetransmitted), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked))); // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be // resent right now. The send queue will not even be queried. EXPECT_CALL(producer_, Produce).Times(0); EXPECT_THAT(GetTSNsForFastRetransmit(queue), testing::ElementsAre(TSN(11))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked))); // Verify that the timer was really restarted when fast-retransmitting. The // timeout is `options_.rto_initial`, so advance the time just before that. now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1); EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value()); // And ensure it really is running. now_ += TimeDelta::Millis(1); ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout, timeout_manager_.GetNextExpiredTimeout()); // An expired timeout has to be handled (asserts validate this). timer_manager_.HandleTimeout(timeout); } TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(1), gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight))); } TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); // Will force chunks to be retransmitted queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); std::vector> chunks_to_rtx = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); } TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { RetransmissionQueue queue = CreateQueue(/*supports_partial_reliability=*/false); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); // Will force chunks to be retransmitted queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); } // namespace dcsctp TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); // Will force chunks to be retransmitted EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned))); std::vector> chunks_to_rtx = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_rtx, testing::IsEmpty()); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned))); } TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(3); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); // Retransmission 1 queue.HandleT3RtxTimerExpiry(); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); // Retransmission 2 queue.HandleT3RtxTimerExpiry(); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); // Retransmission 3 queue.HandleT3RtxTimerExpiry(); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1)); // Retransmission 4 - not allowed. EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned))); } TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { RetransmissionQueue queue = CreateQueue(); static constexpr size_t kCwnd = 1200; queue.set_cwnd(kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); std::vector payload(1000); EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1500); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.unacked_items(), 1u); // Will force chunks to be retransmitted queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); std::vector> chunks_to_rtx = queue.GetChunksToSend(now_, 1500); EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.unacked_items(), 1u); } TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight))); // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(true)); queue.HandleT3RtxTimerExpiry(); // NOTE: The TSN=13 represents the end fragment. EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned), // Pair(TSN(13), State::kAbandoned))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13)); EXPECT_THAT(forward_tsn.skipped_streams(), UnorderedElementsAre( ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); } TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "E")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight))); // Chunk 10 is acked, but the remaining are lost queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); EXPECT_THAT(forward_tsn.skipped_streams(), UnorderedElementsAre( ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); } TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(1); SendQueue::DataToSend dts(OutgoingMessageId(42), gen_.Ordered({1, 2, 3, 4}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(2); SendQueue::DataToSend dts(OutgoingMessageId(43), gen_.Unordered({1, 2, 3, 4}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(3); SendQueue::DataToSend dts(OutgoingMessageId(44), gen_.Ordered({9, 10, 11, 12}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(4); SendQueue::DataToSend dts(OutgoingMessageId(45), gen_.Ordered({13, 14, 15, 16}, "B", opts)); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _), Pair(TSN(13), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight), // Pair(TSN(13), State::kInFlight))); // Chunk 13 is acked, but the remaining are lost queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kNacked), // Pair(TSN(11), State::kNacked), // Pair(TSN(12), State::kNacked), // Pair(TSN(13), State::kAcked))); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(42))) .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(StreamID(2), OutgoingMessageId(43))) .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(StreamID(3), OutgoingMessageId(44))) .WillOnce(Return(true)); queue.HandleT3RtxTimerExpiry(); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned), // Pair(TSN(13), State::kAcked), // Representing end fragments of stream 1-3 Pair(TSN(14), State::kAbandoned), // Pair(TSN(15), State::kAbandoned), // Pair(TSN(16), State::kAbandoned))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn(); EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12)); EXPECT_THAT( forward_tsn1.skipped_streams(), UnorderedElementsAre(IForwardTsnChunk::SkippedStream( IsUnordered(false), StreamID(1), MID(42)), IForwardTsnChunk::SkippedStream( IsUnordered(true), StreamID(2), MID(42)), IForwardTsnChunk::SkippedStream( IsUnordered(false), StreamID(3), MID(42)))); // When TSN 13 is acked, the placeholder "end fragments" must be skipped as // well. // A receiver is more likely to ack TSN 13, but do it incrementally. queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); EXPECT_CALL(producer_, Discard).Times(0); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAbandoned), // Pair(TSN(15), State::kAbandoned), // Pair(TSN(16), State::kAbandoned))); IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn(); EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16)); EXPECT_THAT( forward_tsn2.skipped_streams(), UnorderedElementsAre(IForwardTsnChunk::SkippedStream( IsUnordered(false), StreamID(1), MID(42)), IForwardTsnChunk::SkippedStream( IsUnordered(true), StreamID(2), MID(42)), IForwardTsnChunk::SkippedStream( IsUnordered(false), StreamID(3), MID(42)))); } TEST_F(RetransmissionQueueTest, MeasureRTT) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); now_ = now_ + TimeDelta::Millis(123); EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); } TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); } TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15), TSN(16), TSN(17))); EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {}))); EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {}))); EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {}))); } TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15), TSN(16), TSN(17))); // Ack 9, 20-25. This is an invalid SACK, but should still be handled. queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight), // Pair(TSN(13), State::kInFlight), // Pair(TSN(14), State::kInFlight), // Pair(TSN(15), State::kInFlight), // Pair(TSN(16), State::kInFlight), // Pair(TSN(17), State::kInFlight))); } TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) { RetransmissionQueue queue = CreateQueue(); // Nothing produced - nothing in retransmission queue // Ack 9, 12-13 queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {})); // Gap ack blocks are just ignore. EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked))); } TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15), TSN(16), TSN(17))); // Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be // adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the // cum-tsn-ack should not move, as the gap-ack-blocks are just advisory. queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kInFlight), // Pair(TSN(16), State::kInFlight), // Pair(TSN(17), State::kInFlight))); } TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { RetransmissionQueue queue = CreateQueue(); // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the // magic numbers in this test. EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); std::vector payload(183); return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); std::vector payload(957); return SendQueue::DataToSend(OutgoingMessageId(1), gen_.Ordered(payload, "BE")); }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1188 - 12); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _))); } TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight))); EXPECT_EQ(queue.unacked_bytes(), (16 + 4) * 3u); EXPECT_EQ(queue.unacked_items(), 3u); // Mark the message as lost. EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); queue.HandleT3RtxTimerExpiry(); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned))); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); // Now ACK those, one at a time. queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.unacked_items(), 0u); } TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { RetransmissionQueue queue = CreateQueue(); DataGeneratorOptions options; options.stream_id = StreamID(17); options.mid = MID(42); Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B", options)); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "", options)); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 24); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId)) .WillOnce(Return(true)); now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); EXPECT_THAT( queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN Pair(TSN(10), State::kAbandoned), // Produced Pair(TSN(11), State::kAbandoned), // Produced and expired Pair(TSN(12), State::kAbandoned))); // Placeholder end } TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { RetransmissionQueue queue = CreateQueue(); Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(42), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(43), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) // Stream reset - MID reset to zero again. .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)})); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)})); dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44))) .WillOnce(Return(true)); EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(10), Field(&Data::mid, MID(0))))); EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(11), Field(&Data::mid, MID(1))))); EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0))))); now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); EXPECT_THAT( queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN Pair(TSN(10), State::kInFlight), // OutgoingMessageId=42, BE Pair(TSN(11), State::kInFlight), // OutgoingMessageId=43, BE Pair(TSN(12), State::kAbandoned), // OutgoingMessageId=44, B Pair(TSN(13), State::kAbandoned), // Produced and expired Pair(TSN(14), State::kAbandoned))); // Placeholder end } TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _), Pair(TSN(13), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight), // Pair(TSN(13), State::kInFlight))); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kNacked), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kInFlight), // Pair(TSN(13), State::kInFlight))); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kNacked), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kInFlight))); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); } TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { // This is a fairly long test. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(2); return dts; }) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _), Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _), Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _), Pair(TSN(19), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight), // Pair(TSN(13), State::kInFlight), // Pair(TSN(14), State::kInFlight), // Pair(TSN(15), State::kInFlight), // Pair(TSN(16), State::kInFlight), // Pair(TSN(17), State::kInFlight), // Pair(TSN(18), State::kInFlight), // Pair(TSN(19), State::kInFlight))); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(8))).Times(0); // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it. for (int tsn = 11; tsn <= 13; ++tsn) { queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); } EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kInFlight), // Pair(TSN(15), State::kInFlight), // Pair(TSN(16), State::kInFlight), // Pair(TSN(17), State::kInFlight), // Pair(TSN(18), State::kInFlight), // Pair(TSN(19), State::kInFlight))); EXPECT_THAT(queue.GetChunksForFastRetransmit(1000), ElementsAre(Pair(TSN(10), _))); // Ack TSN [14 to 16] - three more nacks - second and last retransmission. for (int tsn = 14; tsn <= 16; ++tsn) { queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); } EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kAcked), // Pair(TSN(17), State::kInFlight), // Pair(TSN(18), State::kInFlight), // Pair(TSN(19), State::kInFlight))); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _))); // Ack TSN [17 to 18] for (int tsn = 17; tsn <= 18; ++tsn) { queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {})); } EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kNacked), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kAcked), // Pair(TSN(17), State::kAcked), // Pair(TSN(18), State::kAcked), // Pair(TSN(19), State::kInFlight))); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions. EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)) .WillOnce(Return(false)); queue.HandleSack( now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {})); EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty()); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAcked), // Pair(TSN(12), State::kAcked), // Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kAcked), // Pair(TSN(15), State::kAcked), // Pair(TSN(16), State::kAcked), // Pair(TSN(17), State::kAcked), // Pair(TSN(18), State::kAcked), // Pair(TSN(19), State::kAcked))); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); } TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { RetransmissionQueue queue = CreateQueue(); static constexpr size_t kCwnd = 1200; queue.set_cwnd(kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd); std::vector payload(1000); EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 1500); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); size_t serialized_size = payload.size() + DataChunk::kHeaderSize; EXPECT_EQ(queue.unacked_bytes(), serialized_size); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_EQ(queue.cwnd(), kCwnd + serialized_size); } // Verifies that it doesn't produce tiny packets, when getting close to // the full congestion window. TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { RetransmissionQueue queue = CreateQueue(); size_t intial_cwnd = options_.avoid_fragmentation_cwnd_mtus * options_.mtu; queue.set_cwnd(intial_cwnd); EXPECT_EQ(queue.cwnd(), intial_cwnd); // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 10000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); // To little space left - will not send more. EXPECT_FALSE(queue.can_send_data()); // But when the first chunk is acked, it will continue. queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_TRUE(queue.can_send_data()); EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu); } TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { RetransmissionQueue queue = CreateQueue(); size_t intial_cwnd = options_.avoid_fragmentation_cwnd_mtus * options_.mtu - 1; queue.set_cwnd(intial_cwnd); EXPECT_EQ(queue.cwnd(), intial_cwnd); // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector(chunk_size), "BE")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector> chunks_to_send = queue.GetChunksToSend(now_, 10000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); // With congestion window under limit, allow small packets to be created. EXPECT_TRUE(queue.can_send_data()); } TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); EXPECT_EQ( queue.GetHandoverReadiness(), HandoverReadinessStatus( HandoverUnreadinessReason::kRetransmissionQueueOutstandingData)); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus()); } TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8)); EXPECT_EQ( queue.GetHandoverReadiness(), HandoverReadinessStatus( HandoverUnreadinessReason::kRetransmissionQueueOutstandingData)); // Send more chunks, but leave some chunks unacked to force retransmission // after three NACKs. // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-18 queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 6)}, {})); // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-19 queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 7)}, {})); // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-20 // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be // resent right now. The send queue will not even be queried. queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {SackChunk::GapAckBlock(2, 3), SackChunk::GapAckBlock(5, 8)}, {})); EXPECT_EQ( queue.GetHandoverReadiness(), HandoverReadinessStatus() .Add(HandoverUnreadinessReason::kRetransmissionQueueOutstandingData) .Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery) .Add(HandoverUnreadinessReason::kRetransmissionQueueNotEmpty)); // Send "fast retransmit" mode chunks EXPECT_CALL(producer_, Produce).Times(0); EXPECT_THAT(GetTSNsForFastRetransmit(queue), SizeIs(2)); EXPECT_EQ( queue.GetHandoverReadiness(), HandoverReadinessStatus() .Add(HandoverUnreadinessReason::kRetransmissionQueueOutstandingData) .Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery)); // Ack 20 to confirm the retransmission queue.HandleSack(now_, SackChunk(TSN(20), kArwnd, {}, {})); EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus()); } TEST_F(RetransmissionQueueTest, HandoverTest) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2)); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); std::unique_ptr handedover_queue = CreateQueueByHandover(queue); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(*handedover_queue), testing::ElementsAre(TSN(12), TSN(13), TSN(14))); handedover_queue->HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})); EXPECT_THAT(handedover_queue->GetChunkStatesForTesting(), ElementsAre(Pair(TSN(13), State::kAcked), // Pair(TSN(14), State::kInFlight))); } TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) { RetransmissionQueue queue = CreateQueue(); // A large payload - enough to not fit two DATA in same packet. size_t mtu = RoundDownTo4(options_.mtu); std::vector payload(mtu - 100); EXPECT_CALL(producer_, Produce) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "B")); }) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "E")); }) .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Produce all chunks and put them in the retransmission queue. std::vector> chunks_to_send = queue.GetChunksToSend(now_, 5 * mtu); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _), Pair(TSN(13), _), Pair(TSN(14), _))); EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight), Pair(TSN(13), State::kInFlight), Pair(TSN(14), State::kInFlight))); // Ack 12, and report an empty receiver window (the peer obviously has a // tiny receive window). queue.HandleSack( now_, SackChunk(TSN(9), /*rwnd=*/0, {SackChunk::GapAckBlock(3, 3)}, {})); // Force TSN 10 to be retransmitted. queue.HandleT3RtxTimerExpiry(); // Even if the receiver window is empty, it will allow TSN 10 to be sent. EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(10), _))); // But not more than that, as there now is outstanding data. EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty()); // Don't ack any new data, and still have receiver window zero. queue.HandleSack( now_, SackChunk(TSN(9), /*rwnd=*/0, {SackChunk::GapAckBlock(3, 3)}, {})); // There is in-flight data, so new data should not be allowed to be send since // the receiver window is full. EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty()); // Ack that packet (no more in-flight data), but still report an empty // receiver window. queue.HandleSack( now_, SackChunk(TSN(10), /*rwnd=*/0, {SackChunk::GapAckBlock(2, 2)}, {})); // Then TSN 11 can be sent, as there is no in-flight data. EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(11), _))); EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty()); // Ack and recover the receiver window queue.HandleSack(now_, SackChunk(TSN(12), /*rwnd=*/5 * mtu, {}, {})); // That will unblock sending remaining chunks. EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(13), _))); EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(14), _))); EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty()); } } // namespace } // namespace dcsctp