summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/rx
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/rx')
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/BUILD.gn2
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc19
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc51
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h7
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc27
5 files changed, 21 insertions, 85 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn b/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn
index f5f5b7ed81..15b9f60f3d 100644
--- a/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn
+++ b/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn
@@ -95,10 +95,10 @@ rtc_library("reassembly_queue") {
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base:stringutils",
"../../../rtc_base/containers:flat_set",
"../common:internal_types",
"../common:sequence_numbers",
- "../common:str_join",
"../packet:chunk",
"../packet:data",
"../packet:parameter",
diff --git a/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc b/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc
index 07192fda54..0e9e4fcb60 100644
--- a/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc
@@ -29,6 +29,8 @@ using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
constexpr size_t kArwnd = 10000;
constexpr TSN kInitialTSN(11);
@@ -42,8 +44,8 @@ class DataTrackerTest : public testing::Test {
}),
timer_(timer_manager_.CreateTimer(
"test/delayed_ack",
- []() { return absl::nullopt; },
- TimerOptions(DurationMs(0)))),
+ []() { return TimeDelta::Zero(); },
+ TimerOptions(TimeDelta::Zero()))),
tracker_(
std::make_unique<DataTracker>("log: ", timer_.get(), kInitialTSN)) {
}
@@ -71,7 +73,7 @@ class DataTrackerTest : public testing::Test {
tracker_->RestoreFromState(state);
}
- TimeMs now_ = TimeMs(0);
+ Timestamp now_ = Timestamp::Zero();
FakeTimeoutManager timeout_manager_;
TimerManager timer_manager_;
std::unique_ptr<Timer> timer_;
@@ -784,5 +786,16 @@ TEST_F(DataTrackerTest, DoesNotAcceptGapsWithDuplicateData) {
EXPECT_FALSE(tracker_->Observe(TSN(12)));
}
+TEST_F(DataTrackerTest, NotReadyForHandoverWhenHavingTsnGaps) {
+ tracker_->Observe(TSN(10));
+ tracker_->Observe(TSN(12));
+ EXPECT_EQ(tracker_->GetHandoverReadiness(),
+ HandoverReadinessStatus().Add(
+ HandoverUnreadinessReason::kDataTrackerTsnBlocksPending));
+
+ tracker_->Observe(TSN(11));
+ EXPECT_EQ(tracker_->GetHandoverReadiness(), HandoverReadinessStatus());
+}
+
} // namespace
} // namespace dcsctp
diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
index 573443635c..4c223d0532 100644
--- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
+++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
@@ -23,7 +23,6 @@
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
@@ -34,6 +33,7 @@
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"
+#include "rtc_base/strings/str_join.h"
namespace dcsctp {
namespace {
@@ -57,8 +57,6 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
: log_prefix_(log_prefix),
max_size_bytes_(max_size_bytes),
watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
- last_assembled_tsn_watermark_(
- tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
streams_(CreateStreams(
log_prefix_,
@@ -180,33 +178,9 @@ void ReassemblyQueue::AddReassembledMessage(
<< ", ppid=" << *message.ppid()
<< ", payload=" << message.payload().size() << " bytes";
- for (const UnwrappedTSN tsn : tsns) {
- if (tsn == last_assembled_tsn_watermark_.next_value()) {
- // Update watermark, or insert into delivered_tsns_
- last_assembled_tsn_watermark_.Increment();
- } else {
- delivered_tsns_.insert(tsn);
- }
- }
-
- // With new TSNs in delivered_tsns, gaps might be filled.
- MaybeMoveLastAssembledWatermarkFurther();
-
reassembled_messages_.emplace_back(std::move(message));
}
-void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
- // `delivered_tsns_` contain TSNS when there is a gap between ranges of
- // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
- // that list, because if so, it can be moved.
- while (!delivered_tsns_.empty() &&
- *delivered_tsns_.begin() ==
- last_assembled_tsn_watermark_.next_value()) {
- last_assembled_tsn_watermark_.Increment();
- delivered_tsns_.erase(delivered_tsns_.begin());
- }
-}
-
void ReassemblyQueue::HandleForwardTsn(
TSN new_cumulative_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
@@ -228,33 +202,19 @@ void ReassemblyQueue::HandleForwardTsn(
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
<< " - performing.";
- last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
- delivered_tsns_.erase(delivered_tsns_.begin(),
- delivered_tsns_.upper_bound(tsn));
- MaybeMoveLastAssembledWatermarkFurther();
queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
RTC_DCHECK(IsConsistent());
}
bool ReassemblyQueue::IsConsistent() const {
- // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
- // adjacent.
- if (!delivered_tsns_.empty() &&
- last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
- return false;
- }
-
// Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
- // enforced in this class. This comparison will still trigger if queued_bytes_
- // became "negative".
- return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
+ // enforced in this class. But in case it wraps around (becomes negative, but
+ // as it's unsigned, that would wrap to very big), this would trigger.
+ return (queued_bytes_ <= 2 * max_size_bytes_);
}
HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status = streams_->GetHandoverReadiness();
- if (!delivered_tsns_.empty()) {
- status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
- }
if (deferred_reset_streams_.has_value()) {
status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
}
@@ -262,7 +222,6 @@ HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
}
void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
- state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
state.rx.last_completed_deferred_reset_req_sn =
last_completed_reset_req_seq_nbr_.value();
streams_->AddHandoverState(state);
@@ -272,8 +231,6 @@ void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
// Validate that the component is in pristine state.
RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
- last_assembled_tsn_watermark_ =
- tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
last_completed_reset_req_seq_nbr_ =
ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
streams_->RestoreFromState(state);
diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h
index 761ec3556c..82a66d089c 100644
--- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h
+++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h
@@ -142,19 +142,12 @@ class ReassemblyQueue {
bool IsConsistent() const;
void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message);
- void MaybeMoveLastAssembledWatermarkFurther();
const absl::string_view log_prefix_;
const size_t max_size_bytes_;
const size_t watermark_bytes_;
UnwrappedTSN::Unwrapper tsn_unwrapper_;
- // Whenever a message has been assembled, either increase
- // `last_assembled_tsn_watermark_` or - if there are gaps - add the message's
- // TSNs into delivered_tsns_ so that messages are not re-delivered on
- // duplicate chunks.
- UnwrappedTSN last_assembled_tsn_watermark_;
- std::set<UnwrappedTSN> delivered_tsns_;
// Messages that have been reassembled, and will be returned by
// `FlushMessages`.
std::vector<DcSctpMessage> reassembled_messages_;
diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc
index fd8c423a5f..81bb7af963 100644
--- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc
@@ -252,33 +252,6 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
}
-TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) {
- ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
- reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
- EXPECT_FALSE(reasm.HasMessages());
-
- reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
- EXPECT_TRUE(reasm.HasMessages());
- EXPECT_EQ(
- reasm.GetHandoverReadiness(),
- HandoverReadinessStatus()
- .Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
- .Add(
- HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
-
- EXPECT_THAT(reasm.FlushMessages(),
- ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
- EXPECT_EQ(
- reasm.GetHandoverReadiness(),
- HandoverReadinessStatus()
- .Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap)
- .Add(
- HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
-
- reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>());
- EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
-}
-
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));