diff options
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc')
-rw-r--r-- | third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc | 740 |
1 files changed, 740 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc new file mode 100644 index 0000000000..58f0bc4690 --- /dev/null +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc @@ -0,0 +1,740 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include <vector> + +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/public/types.h" +#include "test/gmock.h" + +namespace dcsctp { +namespace { +using ::testing::Return; +using ::testing::StrictMock; + +constexpr size_t kMtu = 1000; +constexpr size_t kPayloadSize = 4; + +MATCHER_P(HasDataWithMid, mid, "") { + if (!arg.has_value()) { + *result_listener << "There was no produced data"; + return false; + } + + if (arg->data.message_id != mid) { + *result_listener << "the produced data had mid " << *arg->data.message_id + << " and not the expected " << *mid; + return false; + } + + return true; +} + +std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)> +CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) { + return [sid, mid, payload_size](TimeMs now, size_t max_size) { + return SendQueue::DataToSend(Data( + sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size), + Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true))); + }; +} + +std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler, + size_t packets_to_generate) { + std::map<StreamID, size_t> packet_counts; + for (size_t i = 0; i < packets_to_generate; ++i) { + absl::optional<SendQueue::DataToSend> data = + scheduler.Produce(TimeMs(0), kMtu); + if (data.has_value()) { + ++packet_counts[data->data.stream_id]; + } + } + return packet_counts; +} + +class MockStreamProducer : public StreamScheduler::StreamProducer { + public: + MOCK_METHOD(absl::optional<SendQueue::DataToSend>, + Produce, + (TimeMs, size_t), + (override)); + MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); +}; + +class TestStream { + public: + TestStream(StreamScheduler& scheduler, + StreamID stream_id, + StreamPriority priority, + size_t packet_size = kPayloadSize) { + EXPECT_CALL(producer_, Produce) + .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size)); + EXPECT_CALL(producer_, bytes_to_send_in_next_message) + .WillRepeatedly(Return(packet_size)); + stream_ = scheduler.CreateStream(&producer_, stream_id, priority); + stream_->MaybeMakeActive(); + } + + StreamScheduler::Stream& stream() { return *stream_; } + + private: + StrictMock<MockStreamProducer> producer_; + std::unique_ptr<StreamScheduler::Stream> stream_; +}; + +// A scheduler without active streams doesn't produce data. +TEST(StreamSchedulerTest, HasNoActiveStreams) { + StreamScheduler scheduler(kMtu); + + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Stream properties can be set and retrieved +TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer; + auto stream = + scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); + + EXPECT_EQ(stream->stream_id(), StreamID(1)); + EXPECT_EQ(stream->priority(), StreamPriority(2)); + + stream->SetPriority(StreamPriority(0)); + EXPECT_EQ(stream->priority(), StreamPriority(0)); +} + +// A scheduler with a single stream produced packets from it. +TEST(StreamSchedulerTest, CanProduceFromSingleStream) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer; + EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); + EXPECT_CALL(producer, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(0)); + auto stream = + scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); + stream->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet. +TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Switches between two streams after every packet, but keeps producing from the +// same stream when a packet contains of multiple fragments. +TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector<uint8_t>(4), Data::IsBeginning(true), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector<uint8_t>(4), Data::IsBeginning(false), + Data::IsEnd(false), IsUnordered(true))); + }) + .WillOnce([](...) { + return SendQueue::DataToSend( + Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42), + std::vector<uint8_t>(4), Data::IsBeginning(false), + Data::IsEnd(true), IsUnordered(true))); + }) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Deactivates a stream before it has finished producing all packets. +TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming. + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + // ... but the stream is made inactive before it can be produced. + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Resumes a paused stream - makes a stream active after inactivating it. +TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer1; + // Callbacks are setup so that they hint that there is a MID(2) coming... + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active again + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + + stream1->MakeInactive(); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Iterates between streams, where one is suddenly paused and later resumed. +TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { + StreamScheduler scheduler(kMtu); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + stream1->MakeInactive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + stream1->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Verifies that packet counts are evenly distributed in round robin scheduling. +TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { + StreamScheduler scheduler(kMtu); + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); +} + +// Verifies that packet counts are evenly distributed among active streams, +// where a stream is suddenly made inactive, two are added, and then the paused +// stream is resumed. +TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { + StreamScheduler scheduler(kMtu); + TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); + TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 5U); + + stream2.stream().MakeInactive(); + + TestStream stream3(scheduler, StreamID(3), StreamPriority(1)); + TestStream stream4(scheduler, StreamID(4), StreamPriority(1)); + + std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15); + EXPECT_EQ(counts2[StreamID(1)], 5U); + EXPECT_EQ(counts2[StreamID(2)], 0U); + EXPECT_EQ(counts2[StreamID(3)], 5U); + EXPECT_EQ(counts2[StreamID(4)], 5U); + + stream2.stream().MaybeMakeActive(); + + std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20); + EXPECT_EQ(counts3[StreamID(1)], 5U); + EXPECT_EQ(counts3[StreamID(2)], 5U); + EXPECT_EQ(counts3[StreamID(3)], 5U); + EXPECT_EQ(counts3[StreamID(4)], 5U); +} + +// Degrades to fair queuing with streams having identical priority. +TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 30; + constexpr size_t kLargePacket = 70; + + StrictMock<MockStreamProducer> callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kLargePacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + // t = 30 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t = 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t = 70 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t = 90 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + // t = 140 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t = 210 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + StrictMock<MockStreamProducer> callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units. + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units. + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + StrictMock<MockStreamProducer> callback3; + EXPECT_CALL(callback3, Produce) + .WillOnce(CreateChunk(StreamID(3), MID(300))) + .WillOnce(CreateChunk(StreamID(3), MID(301))) + .WillOnce(CreateChunk(StreamID(3), MID(302))); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units. + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 20 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 40 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 50 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 80 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 100 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 150 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 160 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 240 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority +// and sending different payload sizes. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 20; + constexpr size_t kMediumPacket = 50; + constexpr size_t kLargePacket = 70; + + // Stream with priority = 125 -> inverse weight ~=80 + StrictMock<MockStreamProducer> callback1; + EXPECT_CALL(callback1, Produce) + // virtual finish time ~ 0 + 50 * 80 = 4000 + .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket)) + // virtual finish time ~ 4000 + 20 * 80 = 5600 + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + // virtual finish time ~ 5600 + 70 * 80 = 11200 + .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + // Stream with priority = 200 -> inverse weight ~=50 + StrictMock<MockStreamProducer> callback2; + EXPECT_CALL(callback2, Produce) + // virtual finish time ~ 0 + 50 * 50 = 2500 + .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket)) + // virtual finish time ~ 2500 + 70 * 50 = 6000 + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + // virtual finish time ~ 6000 + 20 * 50 = 7000 + .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + // Stream with priority = 500 -> inverse weight ~=20 + StrictMock<MockStreamProducer> callback3; + EXPECT_CALL(callback3, Produce) + // virtual finish time ~ 0 + 20 * 20 = 400 + .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket)) + // virtual finish time ~ 400 + 50 * 20 = 1400 + .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket)) + // virtual finish time ~ 1400 + 70 * 20 = 2800 + .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket)); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kMediumPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 1400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 2500 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 2800 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 4000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 5600 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 6000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 7000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 11200 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { + // A simple test with two streams of different priority, but sending packets + // of identical size. Verifies that the ratio of sent packets represent their + // priority. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); +} + +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) { + // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more + // streams. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) { + // A simple test with two streams of different priority, but sending packets + // of different size. Verifies that the ratio of total packet payload + // represent their priority. + // In this example, + // * stream1 has priority 100 and sends packets of size 8 + // * stream2 has priority 400 and sends packets of size 4 + // With round robin, stream1 would get twice as many payload bytes on the wire + // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as + // many payload bytes on the wire. That translates to stream2 getting 8x as + // many packets on the wire as they are half as large. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/8); + TestStream stream2(scheduler, StreamID(2), StreamPriority(400), + /*packet_size=*/4); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90); + EXPECT_EQ(packet_counts[StreamID(1)], 10U); + EXPECT_EQ(packet_counts[StreamID(2)], 80U); +} + +TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) { + // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more + // complicated. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/10); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), + /*packet_size=*/10); + TestStream stream3(scheduler, StreamID(3), StreamPriority(200), + /*packet_size=*/20); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), + /*packet_size=*/30); + + std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80); + // 15 packets * 10 bytes = 150 bytes at priority 100. + EXPECT_EQ(packet_counts[StreamID(1)], 15U); + // 30 packets * 10 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(2)], 30U); + // 15 packets * 20 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + // 20 packets * 30 bytes = 600 bytes at priority 400. + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +// Sending large messages with small MTU will fragment the messages and produce +// a first fragment not larger than the MTU, and will then not first send from +// the stream with the smallest message, as their first fragment will be equally +// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but +// with a larger MTU. +TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { + StreamScheduler scheduler(100 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(100)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 100)) + .WillOnce(CreateChunk(StreamID(2), MID(1), 50)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(50)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Sending large messages with large MTU will not fragment messages and will +// send the message first from the stream that has the smallest message. +TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { + StreamScheduler scheduler(200 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock<MockStreamProducer> producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 200)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock<MockStreamProducer> producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 150)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +} // namespace +} // namespace dcsctp |