summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc')
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc740
1 files changed, 740 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
new file mode 100644
index 0000000000..58f0bc4690
--- /dev/null
+++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
@@ -0,0 +1,740 @@
+/*
+ * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/stream_scheduler.h"
+
+#include <vector>
+
+#include "net/dcsctp/packet/sctp_packet.h"
+#include "net/dcsctp/public/types.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::Return;
+using ::testing::StrictMock;
+
+constexpr size_t kMtu = 1000;
+constexpr size_t kPayloadSize = 4;
+
+MATCHER_P(HasDataWithMid, mid, "") {
+ if (!arg.has_value()) {
+ *result_listener << "There was no produced data";
+ return false;
+ }
+
+ if (arg->data.message_id != mid) {
+ *result_listener << "the produced data had mid " << *arg->data.message_id
+ << " and not the expected " << *mid;
+ return false;
+ }
+
+ return true;
+}
+
+std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
+CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
+ return [sid, mid, payload_size](TimeMs now, size_t max_size) {
+ return SendQueue::DataToSend(Data(
+ sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
+ Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
+ };
+}
+
+std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
+ size_t packets_to_generate) {
+ std::map<StreamID, size_t> packet_counts;
+ for (size_t i = 0; i < packets_to_generate; ++i) {
+ absl::optional<SendQueue::DataToSend> data =
+ scheduler.Produce(TimeMs(0), kMtu);
+ if (data.has_value()) {
+ ++packet_counts[data->data.stream_id];
+ }
+ }
+ return packet_counts;
+}
+
+class MockStreamProducer : public StreamScheduler::StreamProducer {
+ public:
+ MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
+ Produce,
+ (TimeMs, size_t),
+ (override));
+ MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
+};
+
+class TestStream {
+ public:
+ TestStream(StreamScheduler& scheduler,
+ StreamID stream_id,
+ StreamPriority priority,
+ size_t packet_size = kPayloadSize) {
+ EXPECT_CALL(producer_, Produce)
+ .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
+ EXPECT_CALL(producer_, bytes_to_send_in_next_message)
+ .WillRepeatedly(Return(packet_size));
+ stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
+ stream_->MaybeMakeActive();
+ }
+
+ StreamScheduler::Stream& stream() { return *stream_; }
+
+ private:
+ StrictMock<MockStreamProducer> producer_;
+ std::unique_ptr<StreamScheduler::Stream> stream_;
+};
+
+// A scheduler without active streams doesn't produce data.
+TEST(StreamSchedulerTest, HasNoActiveStreams) {
+ StreamScheduler scheduler(kMtu);
+
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Stream properties can be set and retrieved
+TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer;
+ auto stream =
+ scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
+
+ EXPECT_EQ(stream->stream_id(), StreamID(1));
+ EXPECT_EQ(stream->priority(), StreamPriority(2));
+
+ stream->SetPriority(StreamPriority(0));
+ EXPECT_EQ(stream->priority(), StreamPriority(0));
+}
+
+// A scheduler with a single stream produced packets from it.
+TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer;
+ EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
+ EXPECT_CALL(producer, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(0));
+ auto stream =
+ scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
+ stream->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Switches between two streams after every packet.
+TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce(CreateChunk(StreamID(1), MID(101)))
+ .WillOnce(CreateChunk(StreamID(1), MID(102)));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(200)))
+ .WillOnce(CreateChunk(StreamID(2), MID(201)))
+ .WillOnce(CreateChunk(StreamID(2), MID(202)));
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ stream2->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Switches between two streams after every packet, but keeps producing from the
+// same stream when a packet contains of multiple fragments.
+TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce([](...) {
+ return SendQueue::DataToSend(
+ Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
+ std::vector<uint8_t>(4), Data::IsBeginning(true),
+ Data::IsEnd(false), IsUnordered(true)));
+ })
+ .WillOnce([](...) {
+ return SendQueue::DataToSend(
+ Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
+ std::vector<uint8_t>(4), Data::IsBeginning(false),
+ Data::IsEnd(false), IsUnordered(true)));
+ })
+ .WillOnce([](...) {
+ return SendQueue::DataToSend(
+ Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
+ std::vector<uint8_t>(4), Data::IsBeginning(false),
+ Data::IsEnd(true), IsUnordered(true)));
+ })
+ .WillOnce(CreateChunk(StreamID(1), MID(102)));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(200)))
+ .WillOnce(CreateChunk(StreamID(2), MID(201)))
+ .WillOnce(CreateChunk(StreamID(2), MID(202)));
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ stream2->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Deactivates a stream before it has finished producing all packets.
+TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce(CreateChunk(StreamID(1), MID(101)));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+
+ // ... but the stream is made inactive before it can be produced.
+ stream1->MakeInactive();
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Resumes a paused stream - makes a stream active after inactivating it.
+TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer1;
+ // Callbacks are setup so that they hint that there is a MID(2) coming...
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce(CreateChunk(StreamID(1), MID(101)))
+ .WillOnce(CreateChunk(StreamID(1), MID(102)));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize)) // When making active again
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+
+ stream1->MakeInactive();
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ stream1->MaybeMakeActive();
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Iterates between streams, where one is suddenly paused and later resumed.
+TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
+ StreamScheduler scheduler(kMtu);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce(CreateChunk(StreamID(1), MID(101)))
+ .WillOnce(CreateChunk(StreamID(1), MID(102)));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(200)))
+ .WillOnce(CreateChunk(StreamID(2), MID(201)))
+ .WillOnce(CreateChunk(StreamID(2), MID(202)));
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ stream2->MaybeMakeActive();
+
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ stream1->MakeInactive();
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ stream1->MaybeMakeActive();
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Verifies that packet counts are evenly distributed in round robin scheduling.
+TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
+ StreamScheduler scheduler(kMtu);
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
+ EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+ EXPECT_EQ(packet_counts[StreamID(2)], 5U);
+}
+
+// Verifies that packet counts are evenly distributed among active streams,
+// where a stream is suddenly made inactive, two are added, and then the paused
+// stream is resumed.
+TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
+ StreamScheduler scheduler(kMtu);
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
+ EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+ EXPECT_EQ(packet_counts[StreamID(2)], 5U);
+
+ stream2.stream().MakeInactive();
+
+ TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
+ TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
+
+ std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
+ EXPECT_EQ(counts2[StreamID(1)], 5U);
+ EXPECT_EQ(counts2[StreamID(2)], 0U);
+ EXPECT_EQ(counts2[StreamID(3)], 5U);
+ EXPECT_EQ(counts2[StreamID(4)], 5U);
+
+ stream2.stream().MaybeMakeActive();
+
+ std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
+ EXPECT_EQ(counts3[StreamID(1)], 5U);
+ EXPECT_EQ(counts3[StreamID(2)], 5U);
+ EXPECT_EQ(counts3[StreamID(3)], 5U);
+ EXPECT_EQ(counts3[StreamID(4)], 5U);
+}
+
+// Degrades to fair queuing with streams having identical priority.
+TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
+ StreamScheduler scheduler(kMtu);
+ scheduler.EnableMessageInterleaving(true);
+
+ constexpr size_t kSmallPacket = 30;
+ constexpr size_t kLargePacket = 70;
+
+ StrictMock<MockStreamProducer> callback1;
+ EXPECT_CALL(callback1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
+ .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
+ .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kSmallPacket)) // When making active
+ .WillOnce(Return(kSmallPacket))
+ .WillOnce(Return(kSmallPacket))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> callback2;
+ EXPECT_CALL(callback2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
+ .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
+ .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kLargePacket)) // When making active
+ .WillOnce(Return(kLargePacket))
+ .WillOnce(Return(kLargePacket))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+ stream2->MaybeMakeActive();
+
+ // t = 30
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ // t = 60
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ // t = 70
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ // t = 90
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ // t = 140
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ // t = 210
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Will do weighted fair queuing with three streams having different priority.
+TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
+ StreamScheduler scheduler(kMtu);
+ scheduler.EnableMessageInterleaving(true);
+
+ StrictMock<MockStreamProducer> callback1;
+ EXPECT_CALL(callback1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(100)))
+ .WillOnce(CreateChunk(StreamID(1), MID(101)))
+ .WillOnce(CreateChunk(StreamID(1), MID(102)));
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
+ auto stream1 =
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> callback2;
+ EXPECT_CALL(callback2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(200)))
+ .WillOnce(CreateChunk(StreamID(2), MID(201)))
+ .WillOnce(CreateChunk(StreamID(2), MID(202)));
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
+ auto stream2 =
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
+ stream2->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> callback3;
+ EXPECT_CALL(callback3, Produce)
+ .WillOnce(CreateChunk(StreamID(3), MID(300)))
+ .WillOnce(CreateChunk(StreamID(3), MID(301)))
+ .WillOnce(CreateChunk(StreamID(3), MID(302)));
+ EXPECT_CALL(callback3, bytes_to_send_in_next_message)
+ .WillOnce(Return(kPayloadSize)) // When making active
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(kPayloadSize))
+ .WillOnce(Return(0));
+ // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
+ auto stream3 =
+ scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
+ stream3->MaybeMakeActive();
+
+ // t ~= 20
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+ // t ~= 40
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+ // t ~= 50
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ // t ~= 60
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+ // t ~= 80
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ // t ~= 100
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ // t ~= 150
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ // t ~= 160
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ // t ~= 240
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Will do weighted fair queuing with three streams having different priority
+// and sending different payload sizes.
+TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
+ StreamScheduler scheduler(kMtu);
+ scheduler.EnableMessageInterleaving(true);
+
+ constexpr size_t kSmallPacket = 20;
+ constexpr size_t kMediumPacket = 50;
+ constexpr size_t kLargePacket = 70;
+
+ // Stream with priority = 125 -> inverse weight ~=80
+ StrictMock<MockStreamProducer> callback1;
+ EXPECT_CALL(callback1, Produce)
+ // virtual finish time ~ 0 + 50 * 80 = 4000
+ .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
+ // virtual finish time ~ 4000 + 20 * 80 = 5600
+ .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
+ // virtual finish time ~ 5600 + 70 * 80 = 11200
+ .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ .WillOnce(Return(kMediumPacket)) // When making active
+ .WillOnce(Return(kSmallPacket))
+ .WillOnce(Return(kLargePacket))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
+ stream1->MaybeMakeActive();
+
+ // Stream with priority = 200 -> inverse weight ~=50
+ StrictMock<MockStreamProducer> callback2;
+ EXPECT_CALL(callback2, Produce)
+ // virtual finish time ~ 0 + 50 * 50 = 2500
+ .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
+ // virtual finish time ~ 2500 + 70 * 50 = 6000
+ .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
+ // virtual finish time ~ 6000 + 20 * 50 = 7000
+ .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ .WillOnce(Return(kMediumPacket)) // When making active
+ .WillOnce(Return(kLargePacket))
+ .WillOnce(Return(kSmallPacket))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
+ stream2->MaybeMakeActive();
+
+ // Stream with priority = 500 -> inverse weight ~=20
+ StrictMock<MockStreamProducer> callback3;
+ EXPECT_CALL(callback3, Produce)
+ // virtual finish time ~ 0 + 20 * 20 = 400
+ .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
+ // virtual finish time ~ 400 + 50 * 20 = 1400
+ .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
+ // virtual finish time ~ 1400 + 70 * 20 = 2800
+ .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
+ EXPECT_CALL(callback3, bytes_to_send_in_next_message)
+ .WillOnce(Return(kSmallPacket)) // When making active
+ .WillOnce(Return(kMediumPacket))
+ .WillOnce(Return(kLargePacket))
+ .WillOnce(Return(0));
+ auto stream3 =
+ scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
+ stream3->MaybeMakeActive();
+
+ // t ~= 400
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+ // t ~= 1400
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+ // t ~= 2500
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ // t ~= 2800
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+ // t ~= 4000
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ // t ~= 5600
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ // t ~= 6000
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ // t ~= 7000
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ // t ~= 11200
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
+ // A simple test with two streams of different priority, but sending packets
+ // of identical size. Verifies that the ratio of sent packets represent their
+ // priority.
+ StreamScheduler scheduler(kMtu);
+ scheduler.EnableMessageInterleaving(true);
+
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
+ EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+ EXPECT_EQ(packet_counts[StreamID(2)], 10U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
+ // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
+ // streams.
+ StreamScheduler scheduler(kMtu);
+ scheduler.EnableMessageInterleaving(true);
+
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
+ TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
+ TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
+ EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+ EXPECT_EQ(packet_counts[StreamID(2)], 10U);
+ EXPECT_EQ(packet_counts[StreamID(3)], 15U);
+ EXPECT_EQ(packet_counts[StreamID(4)], 20U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
+ // A simple test with two streams of different priority, but sending packets
+ // of different size. Verifies that the ratio of total packet payload
+ // represent their priority.
+ // In this example,
+ // * stream1 has priority 100 and sends packets of size 8
+ // * stream2 has priority 400 and sends packets of size 4
+ // With round robin, stream1 would get twice as many payload bytes on the wire
+ // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
+ // many payload bytes on the wire. That translates to stream2 getting 8x as
+ // many packets on the wire as they are half as large.
+ StreamScheduler scheduler(kMtu);
+ // Enable WFQ scheduler.
+ scheduler.EnableMessageInterleaving(true);
+
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
+ /*packet_size=*/8);
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
+ /*packet_size=*/4);
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
+ EXPECT_EQ(packet_counts[StreamID(1)], 10U);
+ EXPECT_EQ(packet_counts[StreamID(2)], 80U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
+ // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
+ // complicated.
+ StreamScheduler scheduler(kMtu);
+ // Enable WFQ scheduler.
+ scheduler.EnableMessageInterleaving(true);
+
+ TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
+ /*packet_size=*/10);
+ TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
+ /*packet_size=*/10);
+ TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
+ /*packet_size=*/20);
+ TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
+ /*packet_size=*/30);
+
+ std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
+ // 15 packets * 10 bytes = 150 bytes at priority 100.
+ EXPECT_EQ(packet_counts[StreamID(1)], 15U);
+ // 30 packets * 10 bytes = 300 bytes at priority 200.
+ EXPECT_EQ(packet_counts[StreamID(2)], 30U);
+ // 15 packets * 20 bytes = 300 bytes at priority 200.
+ EXPECT_EQ(packet_counts[StreamID(3)], 15U);
+ // 20 packets * 30 bytes = 600 bytes at priority 400.
+ EXPECT_EQ(packet_counts[StreamID(4)], 20U);
+}
+
+// Sending large messages with small MTU will fragment the messages and produce
+// a first fragment not larger than the MTU, and will then not first send from
+// the stream with the smallest message, as their first fragment will be equally
+// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
+// with a larger MTU.
+TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
+ StreamScheduler scheduler(100 + SctpPacket::kHeaderSize +
+ IDataChunk::kHeaderSize);
+ scheduler.EnableMessageInterleaving(true);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(0), 100))
+ .WillOnce(CreateChunk(StreamID(1), MID(0), 100));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(200)) // When making active
+ .WillOnce(Return(100))
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(1), 100))
+ .WillOnce(CreateChunk(StreamID(2), MID(1), 50));
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ .WillOnce(Return(150)) // When making active
+ .WillOnce(Return(50))
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
+ stream2->MaybeMakeActive();
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Sending large messages with large MTU will not fragment messages and will
+// send the message first from the stream that has the smallest message.
+TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
+ StreamScheduler scheduler(200 + SctpPacket::kHeaderSize +
+ IDataChunk::kHeaderSize);
+ scheduler.EnableMessageInterleaving(true);
+
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
+ .WillOnce(CreateChunk(StreamID(1), MID(0), 200));
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ .WillOnce(Return(200)) // When making active
+ .WillOnce(Return(0));
+ auto stream1 =
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
+ stream1->MaybeMakeActive();
+
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
+ .WillOnce(CreateChunk(StreamID(2), MID(1), 150));
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ .WillOnce(Return(150)) // When making active
+ .WillOnce(Return(0));
+ auto stream2 =
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
+ stream2->MaybeMakeActive();
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+} // namespace
+} // namespace dcsctp