summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc')
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc890
1 files changed, 890 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
new file mode 100644
index 0000000000..9d6da7bdff
--- /dev/null
+++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
@@ -0,0 +1,890 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/tx/rr_send_queue.h"
+
+#include <cstdint>
+#include <type_traits>
+#include <vector>
+
+#include "net/dcsctp/common/internal_types.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/dcsctp_options.h"
+#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h"
+#include "net/dcsctp/testing/testing_macros.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::SizeIs;
+using ::testing::UnorderedElementsAre;
+
+constexpr TimeMs kNow = TimeMs(0);
+constexpr StreamID kStreamID(1);
+constexpr PPID kPPID(53);
+constexpr size_t kMaxQueueSize = 1000;
+constexpr StreamPriority kDefaultPriority(10);
+constexpr size_t kBufferedAmountLowThreshold = 500;
+constexpr size_t kOneFragmentPacketSize = 100;
+constexpr size_t kTwoFragmentPacketSize = 101;
+constexpr size_t kMtu = 1100;
+
+class RRSendQueueTest : public testing::Test {
+ protected:
+ RRSendQueueTest()
+ : buf_("log: ",
+ &callbacks_,
+ kMaxQueueSize,
+ kMtu,
+ kDefaultPriority,
+ kBufferedAmountLowThreshold) {}
+
+ testing::NiceMock<MockDcSctpSocketCallbacks> callbacks_;
+ const DcSctpOptions options_;
+ RRSendQueue buf_;
+};
+
+TEST_F(RRSendQueueTest, EmptyBuffer) {
+ EXPECT_TRUE(buf_.IsEmpty());
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+ EXPECT_FALSE(buf_.IsFull());
+}
+
+TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
+
+ EXPECT_FALSE(buf_.IsEmpty());
+ EXPECT_FALSE(buf_.IsFull());
+ absl::optional<SendQueue::DataToSend> chunk_opt =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_opt.has_value());
+ EXPECT_TRUE(chunk_opt->data.is_beginning);
+ EXPECT_TRUE(chunk_opt->data.is_end);
+}
+
+TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
+ std::vector<uint8_t> payload(60);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_beg =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_beg.has_value());
+ EXPECT_TRUE(chunk_beg->data.is_beginning);
+ EXPECT_FALSE(chunk_beg->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_mid =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_mid.has_value());
+ EXPECT_FALSE(chunk_mid->data.is_beginning);
+ EXPECT_FALSE(chunk_mid->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_end =
+ buf_.Produce(kNow, /*max_size=*/20);
+ ASSERT_TRUE(chunk_end.has_value());
+ EXPECT_FALSE(chunk_end->data.is_beginning);
+ EXPECT_TRUE(chunk_end->data.is_end);
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+}
+
+TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
+ std::vector<uint8_t> payload(60);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_one->data.ppid, kPPID);
+ EXPECT_TRUE(chunk_one->data.is_beginning);
+ EXPECT_TRUE(chunk_one->data.is_end);
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_two->data.ppid, PPID(54));
+ EXPECT_TRUE(chunk_two->data.is_beginning);
+ EXPECT_TRUE(chunk_two->data.is_end);
+}
+
+TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
+ std::vector<uint8_t> payload(600);
+ EXPECT_FALSE(buf_.IsFull());
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_FALSE(buf_.IsFull());
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
+ EXPECT_TRUE(buf_.IsFull());
+ // However, it's still possible to add messages. It's a soft limit, and it
+ // might be necessary to forcefully add messages due to e.g. external
+ // fragmentation.
+ buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
+ EXPECT_TRUE(buf_.IsFull());
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(chunk_one->data.ppid, kPPID);
+
+ EXPECT_TRUE(buf_.IsFull());
+
+ absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_two->data.ppid, PPID(54));
+
+ EXPECT_FALSE(buf_.IsFull());
+ EXPECT_FALSE(buf_.IsEmpty());
+
+ absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
+ EXPECT_EQ(chunk_three->data.ppid, PPID(55));
+
+ EXPECT_FALSE(buf_.IsFull());
+ EXPECT_TRUE(buf_.IsEmpty());
+}
+
+TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
+ std::vector<uint8_t> payload(20);
+
+ // Default is ordered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_FALSE(chunk_one->data.is_unordered);
+
+ // Explicitly unordered.
+ SendOptions opts;
+ opts.unordered = IsUnordered(true);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_TRUE(chunk_two->data.is_unordered);
+}
+
+TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
+ std::vector<uint8_t> payload(20);
+
+ // Default is no expiry
+ TimeMs now = kNow;
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
+ now += DurationMs(1000000);
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ SendOptions expires_2_seconds;
+ expires_2_seconds.lifetime = DurationMs(2000);
+
+ // Add and consume within lifetime
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(2000);
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // Add and consume just outside lifetime
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(2001);
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // A long time after expiry
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+ now += DurationMs(1000000);
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+
+ // Expire one message, but produce the second that is not expired.
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
+
+ SendOptions expires_4_seconds;
+ expires_4_seconds.lifetime = DurationMs(4000);
+
+ buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
+ now += DurationMs(2001);
+
+ ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
+ ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, DiscardPartialPackets) {
+ std::vector<uint8_t> payload(120);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_FALSE(chunk_one->data.is_end);
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_FALSE(chunk_two->data.is_end);
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_TRUE(chunk_three->data.is_end);
+ EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
+
+ // Calling it again shouldn't cause issues.
+ buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
+ ASSERT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
+ EXPECT_EQ(buf_.total_buffered_amount(), 8u);
+
+ buf_.PrepareResetStream(StreamID(1));
+ EXPECT_EQ(buf_.total_buffered_amount(), 5u);
+
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+ UnorderedElementsAre(StreamID(1)));
+ buf_.CommitResetStreams();
+ buf_.PrepareResetStream(StreamID(2));
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+}
+
+TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
+ std::vector<uint8_t> payload(120);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
+
+ buf_.PrepareResetStream(StreamID(1));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
+}
+
+TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.PrepareResetStream(StreamID(1));
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+
+ EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+ UnorderedElementsAre(StreamID(1)));
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+
+ buf_.CommitResetStreams();
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 0u);
+}
+
+TEST_F(RRSendQueueTest, PausedStreamsStillSendPartialMessagesUntilEnd) {
+ constexpr size_t kPayloadSize = 100;
+ constexpr size_t kFragmentSize = 50;
+ std::vector<uint8_t> payload(kPayloadSize);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kFragmentSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize);
+
+ // This will stop the second message from being sent.
+ buf_.PrepareResetStream(StreamID(1));
+ EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize);
+
+ // Should still produce fragments until end of message.
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kFragmentSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 0ul);
+
+ // But shouldn't produce any more messages as the stream is paused.
+ EXPECT_FALSE(buf_.Produce(kNow, kFragmentSize).has_value());
+}
+
+TEST_F(RRSendQueueTest, CommittingResetsSSN) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.ssn, SSN(1));
+
+ buf_.PrepareResetStream(StreamID(1));
+
+ // Buffered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+ UnorderedElementsAre(StreamID(1)));
+ buf_.CommitResetStreams();
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.ssn, SSN(0));
+}
+
+TEST_F(RRSendQueueTest, CommittingDoesNotResetMessageId) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.ssn, SSN(0));
+ EXPECT_EQ(chunk1.message_id, OutgoingMessageId(0));
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.ssn, SSN(1));
+ EXPECT_EQ(chunk2.message_id, OutgoingMessageId(1));
+
+ buf_.PrepareResetStream(kStreamID);
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(), UnorderedElementsAre(kStreamID));
+ buf_.CommitResetStreams();
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.ssn, SSN(0));
+ EXPECT_EQ(chunk3.message_id, OutgoingMessageId(2));
+}
+
+TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
+ EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_two->data.ssn, SSN(0));
+
+ buf_.PrepareResetStream(StreamID(3));
+
+ // Send two more messages - SID 3 will buffer, SID 1 will send.
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
+
+ EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+ UnorderedElementsAre(StreamID(3)));
+
+ buf_.CommitResetStreams();
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
+ EXPECT_EQ(chunk_three->data.ssn, SSN(1));
+
+ absl::optional<SendQueue::DataToSend> chunk_four =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_four.has_value());
+ EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
+ EXPECT_EQ(chunk_four->data.ssn, SSN(0));
+}
+
+TEST_F(RRSendQueueTest, RollBackResumesSSN) {
+ std::vector<uint8_t> payload(50);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_two.has_value());
+ EXPECT_EQ(chunk_two->data.ssn, SSN(1));
+
+ buf_.PrepareResetStream(StreamID(1));
+
+ // Buffered
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+
+ EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+ EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+ UnorderedElementsAre(StreamID(1)));
+ buf_.RollbackResetStreams();
+
+ absl::optional<SendQueue::DataToSend> chunk_three =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_three.has_value());
+ EXPECT_EQ(chunk_three->data.ssn, SSN(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsFragmentsForOneMessageBeforeMovingToNext) {
+ std::vector<uint8_t> payload(200);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+}
+
+TEST_F(RRSendQueueTest, ReturnsAlsoSmallFragmentsBeforeMovingToNext) {
+ std::vector<uint8_t> payload(kTwoFragmentPacketSize);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk4.data.payload,
+ SizeIs(kTwoFragmentPacketSize - kOneFragmentPacketSize));
+}
+
+TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(2)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(3)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(4)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(5)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(6)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(7)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(4), kPPID, std::vector<uint8_t>(8)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(3));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(5));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk4.data.payload, SizeIs(7));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk5,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk5.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk5.data.payload, SizeIs(2));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk6,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk6.data.stream_id, StreamID(2));
+ EXPECT_THAT(chunk6.data.payload, SizeIs(4));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk7,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk7.data.stream_id, StreamID(3));
+ EXPECT_THAT(chunk7.data.payload, SizeIs(6));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk8,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
+ EXPECT_THAT(chunk8.data.payload, SizeIs(8));
+}
+
+TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) {
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u);
+}
+
+TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+}
+
+TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
+
+ // Should now trigger again, as buffer_amount went above the threshold.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(1));
+}
+
+TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) {
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u);
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(10));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(20));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
+}
+
+TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) {
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
+
+ std::vector<uint8_t> payload(1000);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u);
+
+ // Doesn't trigger when reducing even further.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+}
+
+TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) {
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000)));
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, 400));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(400));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200)));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
+
+ // Will trigger again, as it went above the limit.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, 200));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(200));
+ EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
+}
+
+TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) {
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100)));
+
+ // Modifying the threshold, still under buffered_amount, should not trigger.
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 99);
+
+ // When the threshold reaches buffered_amount, it will trigger.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 100);
+
+ // But not when it's set low again.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
+
+ // But it will trigger when it overshoots.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow(StreamID(1)));
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 150);
+
+ // But not when it's set low again.
+ EXPECT_CALL(callbacks_, OnBufferedAmountLow).Times(0);
+ buf_.SetBufferedAmountLowThreshold(StreamID(1), 0);
+}
+
+TEST_F(RRSendQueueTest,
+ OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) {
+ EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
+ std::vector<uint8_t> payload(kBufferedAmountLowThreshold - 1);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ // Will not trigger if going above but never below.
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize)));
+}
+
+TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) {
+ EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(0);
+ std::vector<uint8_t> payload(kBufferedAmountLowThreshold);
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
+
+ // Reaches it.
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector<uint8_t>(1)));
+
+ // Drain it a bit - will trigger.
+ EXPECT_CALL(callbacks_, OnTotalBufferedAmountLow).Times(1);
+ absl::optional<SendQueue::DataToSend> chunk_two =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+}
+
+TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
+ buf_.Add(kNow, DcSctpMessage(StreamID(5), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(1));
+
+ // Next, it should pick a different stream.
+
+ buf_.Add(kNow,
+ DcSctpMessage(StreamID(1), kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // It should still stay on the Stream1 now, even if might be tempted to switch
+ // to this stream, as it's the stream following 5.
+ buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // After stream id 1 is complete, it's time to do stream 6.
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk4.data.stream_id, StreamID(6));
+ EXPECT_THAT(chunk4.data.payload, SizeIs(1));
+
+ EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+}
+
+TEST_F(RRSendQueueTest, StreamsHaveInitialPriority) {
+ EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), kDefaultPriority);
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
+ EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), kDefaultPriority);
+}
+
+TEST_F(RRSendQueueTest, CanChangeStreamPriority) {
+ buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
+ EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), StreamPriority(42));
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
+ buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
+ EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), StreamPriority(42));
+}
+
+TEST_F(RRSendQueueTest, WillHandoverPriority) {
+ buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
+ buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
+
+ DcSctpSocketHandoverState state;
+ buf_.AddHandoverState(state);
+
+ RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
+ kBufferedAmountLowThreshold);
+ q2.RestoreFromState(state);
+ EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));
+ EXPECT_EQ(q2.GetStreamPriority(StreamID(2)), StreamPriority(42));
+}
+
+TEST_F(RRSendQueueTest, WillSendMessagesByPrio) {
+ buf_.EnableMessageInterleaving(true);
+ buf_.SetStreamPriority(StreamID(1), StreamPriority(10));
+ buf_.SetStreamPriority(StreamID(2), StreamPriority(20));
+ buf_.SetStreamPriority(StreamID(3), StreamPriority(30));
+
+ buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(40)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(20)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, std::vector<uint8_t>(10)));
+ std::vector<uint16_t> expected_streams = {3, 2, 2, 1, 1, 1, 1};
+
+ for (uint16_t stream_num : expected_streams) {
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk,
+ buf_.Produce(kNow, 10));
+ EXPECT_EQ(chunk.data.stream_id, StreamID(stream_num));
+ }
+ EXPECT_FALSE(buf_.Produce(kNow, 1).has_value());
+}
+
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
+ std::vector<uint8_t> payload(kOneFragmentPacketSize);
+ buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload),
+ SendOptions{.lifetime = DurationMs(1000),
+ .lifecycle_id = LifecycleId(1)});
+
+ EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
+ /*maybe_delivered=*/false));
+ EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
+ EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
+ .has_value());
+}
+
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
+ std::vector<uint8_t> payload(120);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+ SendOptions{.lifecycle_id = LifecycleId(1)});
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+ SendOptions{.lifecycle_id = LifecycleId(2)});
+
+ absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
+
+ EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2),
+ /*maybe_delivered=*/false));
+ EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2)));
+ buf_.PrepareResetStream(StreamID(1));
+ EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
+}
+
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
+ std::vector<uint8_t> payload(kOneFragmentPacketSize + 20);
+
+ buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+ SendOptions{.lifecycle_id = LifecycleId(1)});
+
+ absl::optional<SendQueue::DataToSend> chunk_one =
+ buf_.Produce(kNow, kOneFragmentPacketSize);
+ ASSERT_TRUE(chunk_one.has_value());
+ EXPECT_FALSE(chunk_one->data.is_end);
+ EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+ EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
+ /*maybe_delivered=*/false));
+ EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
+ buf_.Discard(chunk_one->data.stream_id, chunk_one->message_id);
+}
+} // namespace
+} // namespace dcsctp