summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc')
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc261
1 files changed, 142 insertions, 119 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
index c2706bd0d2..ca639abc54 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
@@ -14,12 +14,16 @@
#include <utility>
#include <vector>
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/public/types.h"
+#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace dcsctp {
+using ::webrtc::Timestamp;
// The number of times a packet must be NACKed before it's retransmitted.
// See https://tools.ietf.org/html/rfc4960#section-7.2.4
@@ -63,18 +67,18 @@ void OutstandingData::Item::MarkAsRetransmitted() {
}
void OutstandingData::Item::Abandon() {
- RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() ||
+ RTC_DCHECK(!expires_at_.IsPlusInfinity() ||
max_retransmissions_ != MaxRetransmits::NoLimit());
lifecycle_ = Lifecycle::kAbandoned;
}
-bool OutstandingData::Item::has_expired(TimeMs now) const {
+bool OutstandingData::Item::has_expired(Timestamp now) const {
return expires_at_ <= now;
}
bool OutstandingData::IsConsistent() const {
- size_t actual_outstanding_bytes = 0;
- size_t actual_outstanding_items = 0;
+ size_t actual_unacked_bytes = 0;
+ size_t actual_unacked_items = 0;
std::set<UnwrappedTSN> combined_to_be_retransmitted;
combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(),
@@ -83,10 +87,12 @@ bool OutstandingData::IsConsistent() const {
to_be_fast_retransmitted_.end());
std::set<UnwrappedTSN> actual_combined_to_be_retransmitted;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (item.is_outstanding()) {
- actual_outstanding_bytes += GetSerializedChunkSize(item.data());
- ++actual_outstanding_items;
+ actual_unacked_bytes += GetSerializedChunkSize(item.data());
+ ++actual_unacked_items;
}
if (item.should_be_retransmitted()) {
@@ -94,33 +100,28 @@ bool OutstandingData::IsConsistent() const {
}
}
- if (outstanding_data_.empty() &&
- next_tsn_ != last_cumulative_tsn_ack_.next_value()) {
- return false;
- }
-
- return actual_outstanding_bytes == outstanding_bytes_ &&
- actual_outstanding_items == outstanding_items_ &&
+ return actual_unacked_bytes == unacked_bytes_ &&
+ actual_unacked_items == unacked_items_ &&
actual_combined_to_be_retransmitted == combined_to_be_retransmitted;
}
void OutstandingData::AckChunk(AckInfo& ack_info,
- std::map<UnwrappedTSN, Item>::iterator iter) {
- if (!iter->second.is_acked()) {
- size_t serialized_size = GetSerializedChunkSize(iter->second.data());
+ UnwrappedTSN tsn,
+ Item& item) {
+ if (!item.is_acked()) {
+ size_t serialized_size = GetSerializedChunkSize(item.data());
ack_info.bytes_acked += serialized_size;
- if (iter->second.is_outstanding()) {
- outstanding_bytes_ -= serialized_size;
- --outstanding_items_;
+ if (item.is_outstanding()) {
+ unacked_bytes_ -= serialized_size;
+ --unacked_items_;
}
- if (iter->second.should_be_retransmitted()) {
- RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) ==
+ if (item.should_be_retransmitted()) {
+ RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) ==
to_be_fast_retransmitted_.end());
- to_be_retransmitted_.erase(iter->first);
+ to_be_retransmitted_.erase(tsn);
}
- iter->second.Ack();
- ack_info.highest_tsn_acked =
- std::max(ack_info.highest_tsn_acked, iter->first);
+ item.Ack();
+ ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn);
}
}
@@ -143,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack(
return ack_info;
}
+OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) {
+ RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
+ RTC_DCHECK(tsn < next_tsn());
+ int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
+ RTC_DCHECK(index >= 0);
+ RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
+ return outstanding_data_[index];
+}
+
+const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const {
+ RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
+ RTC_DCHECK(tsn < next_tsn());
+ int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
+ RTC_DCHECK(index >= 0);
+ RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
+ return outstanding_data_[index];
+}
+
void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
AckInfo& ack_info) {
- auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack);
-
- for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) {
- AckChunk(ack_info, iter);
- if (iter->second.lifecycle_id().IsSet()) {
- RTC_DCHECK(iter->second.data().is_end);
- if (iter->second.is_abandoned()) {
- ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id());
+ while (!outstanding_data_.empty() &&
+ last_cumulative_tsn_ack_ < cumulative_tsn_ack) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value();
+ Item& item = outstanding_data_.front();
+ AckChunk(ack_info, tsn, item);
+ if (item.lifecycle_id().IsSet()) {
+ RTC_DCHECK(item.data().is_end);
+ if (item.is_abandoned()) {
+ ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id());
} else {
- ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id());
+ ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id());
}
}
+ outstanding_data_.pop_front();
+ last_cumulative_tsn_ack_.Increment();
}
- outstanding_data_.erase(outstanding_data_.begin(), first_unacked);
- last_cumulative_tsn_ack_ = cumulative_tsn_ack;
stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(),
stream_reset_breakpoint_tsns_.upper_bound(
cumulative_tsn_ack.next_value()));
@@ -176,12 +196,13 @@ void OutstandingData::AckGapBlocks(
// handled differently.
for (auto& block : gap_ack_blocks) {
- auto start = outstanding_data_.lower_bound(
- UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start));
- auto end = outstanding_data_.upper_bound(
- UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end));
- for (auto iter = start; iter != end; ++iter) {
- AckChunk(ack_info, iter);
+ UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
+ UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
+ for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) {
+ if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
+ Item& item = GetItem(tsn);
+ AckChunk(ack_info, tsn, item);
+ }
}
}
}
@@ -216,13 +237,12 @@ void OutstandingData::NackBetweenAckBlocks(
for (auto& block : gap_ack_blocks) {
UnwrappedTSN cur_block_first_acked =
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
- for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
- iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
- if (iter->first <= max_tsn_to_nack) {
- ack_info.has_packet_loss |=
- NackItem(iter->first, iter->second, /*retransmit_now=*/false,
- /*do_fast_retransmit=*/!is_in_fast_recovery);
- }
+ for (UnwrappedTSN tsn = prev_block_last_acked.next_value();
+ tsn < cur_block_first_acked && tsn <= max_tsn_to_nack;
+ tsn = tsn.next_value()) {
+ ack_info.has_packet_loss |=
+ NackItem(tsn, /*retransmit_now=*/false,
+ /*do_fast_retransmit=*/!is_in_fast_recovery);
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
}
@@ -234,12 +254,12 @@ void OutstandingData::NackBetweenAckBlocks(
}
bool OutstandingData::NackItem(UnwrappedTSN tsn,
- Item& item,
bool retransmit_now,
bool do_fast_retransmit) {
+ Item& item = GetItem(tsn);
if (item.is_outstanding()) {
- outstanding_bytes_ -= GetSerializedChunkSize(item.data());
- --outstanding_items_;
+ unacked_bytes_ -= GetSerializedChunkSize(item.data());
+ --unacked_items_;
}
switch (item.Nack(retransmit_now)) {
@@ -272,28 +292,25 @@ void OutstandingData::AbandonAllFor(const Item& item) {
// skipped over). So create a new fragment, representing the end, that the
// received will never see as it is abandoned immediately and used as cum
// TSN in the sent FORWARD-TSN.
- UnwrappedTSN tsn = next_tsn_;
- next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn, item.data().mid,
item.data().fsn, item.data().ppid, std::vector<uint8_t>(),
Data::IsBeginning(false), Data::IsEnd(true),
item.data().is_unordered);
- Item& added_item =
- outstanding_data_
- .emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
- std::forward_as_tuple(
- item.message_id(), std::move(message_end), TimeMs(0),
- MaxRetransmits(0), TimeMs::InfiniteFuture(),
- LifecycleId::NotSet()))
- .first->second;
- // The added chunk shouldn't be included in `outstanding_bytes`, so set it
+ UnwrappedTSN tsn = next_tsn();
+ Item& added_item = outstanding_data_.emplace_back(
+ item.message_id(), std::move(message_end), Timestamp::Zero(),
+ MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet());
+
+ // The added chunk shouldn't be included in `unacked_bytes`, so set it
// as acked.
added_item.Ack();
RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn="
<< *tsn.Wrap();
}
- for (auto& [tsn, other] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (Item& other : outstanding_data_) {
+ tsn.Increment();
if (!other.is_abandoned() &&
other.data().stream_id == item.data().stream_id &&
other.message_id() == item.message_id()) {
@@ -315,9 +332,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
for (auto it = chunks.begin(); it != chunks.end();) {
UnwrappedTSN tsn = *it;
- auto elem = outstanding_data_.find(tsn);
- RTC_DCHECK(elem != outstanding_data_.end());
- Item& item = elem->second;
+ Item& item = GetItem(tsn);
RTC_DCHECK(item.should_be_retransmitted());
RTC_DCHECK(!item.is_outstanding());
RTC_DCHECK(!item.is_abandoned());
@@ -328,8 +343,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
item.MarkAsRetransmitted();
result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size;
- outstanding_bytes_ += serialized_size;
- ++outstanding_items_;
+ unacked_bytes_ += serialized_size;
+ ++unacked_items_;
it = chunks.erase(it);
} else {
++it;
@@ -370,8 +385,10 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
return ExtractChunksThatCanFit(to_be_retransmitted_, max_size);
}
-void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
- for (const auto& [tsn, item] : outstanding_data_) {
+void OutstandingData::ExpireOutstandingChunks(Timestamp now) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
// Chunks that are nacked can be expired. Care should be taken not to expire
// unacked (in-flight) chunks as they might have been received, but the SACK
// is either delayed or in-flight and may be received later.
@@ -391,38 +408,33 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
}
UnwrappedTSN OutstandingData::highest_outstanding_tsn() const {
- return outstanding_data_.empty() ? last_cumulative_tsn_ack_
- : outstanding_data_.rbegin()->first;
+ return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_,
+ outstanding_data_.size());
}
absl::optional<UnwrappedTSN> OutstandingData::Insert(
OutgoingMessageId message_id,
const Data& data,
- TimeMs time_sent,
+ Timestamp time_sent,
MaxRetransmits max_retransmissions,
- TimeMs expires_at,
+ Timestamp expires_at,
LifecycleId lifecycle_id) {
- UnwrappedTSN tsn = next_tsn_;
- next_tsn_.Increment();
-
// All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(data);
- outstanding_bytes_ += chunk_size;
- ++outstanding_items_;
- auto it = outstanding_data_
- .emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
- std::forward_as_tuple(message_id, data.Clone(),
- time_sent, max_retransmissions,
- expires_at, lifecycle_id))
- .first;
-
- if (it->second.has_expired(time_sent)) {
+ unacked_bytes_ += chunk_size;
+ ++unacked_items_;
+ UnwrappedTSN tsn = next_tsn();
+ Item& item = outstanding_data_.emplace_back(message_id, data.Clone(),
+ time_sent, max_retransmissions,
+ expires_at, lifecycle_id);
+
+ if (item.has_expired(time_sent)) {
// No need to send it - it was expired when it was in the send
// queue.
- RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk "
- << *it->first.Wrap() << " and message "
- << *it->second.data().mid << " as expired";
- AbandonAllFor(it->second);
+ RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap()
+ << " and message " << *item.data().mid
+ << " as expired";
+ AbandonAllFor(item);
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
@@ -432,34 +444,47 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
}
void OutstandingData::NackAll() {
- for (auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ // A two-pass algorithm is needed, as NackItem will invalidate iterators.
+ std::vector<UnwrappedTSN> tsns_to_nack;
+ for (Item& item : outstanding_data_) {
+ tsn.Increment();
if (!item.is_acked()) {
- NackItem(tsn, item, /*retransmit_now=*/true,
- /*do_fast_retransmit=*/false);
+ tsns_to_nack.push_back(tsn);
}
}
+
+ for (UnwrappedTSN tsn : tsns_to_nack) {
+ NackItem(tsn, /*retransmit_now=*/true,
+ /*do_fast_retransmit=*/false);
+ }
+
RTC_DCHECK(IsConsistent());
}
-absl::optional<DurationMs> OutstandingData::MeasureRTT(TimeMs now,
- UnwrappedTSN tsn) const {
- auto it = outstanding_data_.find(tsn);
- if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) {
- // https://tools.ietf.org/html/rfc4960#section-6.3.1
- // "Karn's algorithm: RTT measurements MUST NOT be made using
- // packets that were retransmitted (and thus for which it is ambiguous
- // whether the reply was for the first instance of the chunk or for a
- // later instance)"
- return now - it->second.time_sent();
+webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now,
+ UnwrappedTSN tsn) const {
+ if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
+ const Item& item = GetItem(tsn);
+ if (!item.has_been_retransmitted()) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.1
+ // "Karn's algorithm: RTT measurements MUST NOT be made using
+ // packets that were retransmitted (and thus for which it is ambiguous
+ // whether the reply was for the first instance of the chunk or for a
+ // later instance)"
+ return now - item.time_sent();
+ }
}
- return absl::nullopt;
+ return webrtc::TimeDelta::PlusInfinity();
}
std::vector<std::pair<TSN, OutstandingData::State>>
OutstandingData::GetChunkStatesForTesting() const {
std::vector<std::pair<TSN, State>> states;
states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked);
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
State state;
if (item.is_abandoned()) {
state = State::kAbandoned;
@@ -480,9 +505,7 @@ OutstandingData::GetChunkStatesForTesting() const {
bool OutstandingData::ShouldSendForwardTsn() const {
if (!outstanding_data_.empty()) {
- auto it = outstanding_data_.begin();
- return it->first == last_cumulative_tsn_ack_.next_value() &&
- it->second.is_abandoned();
+ return outstanding_data_.front().is_abandoned();
}
return false;
}
@@ -491,7 +514,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const {
std::map<StreamID, SSN> skipped_per_ordered_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
@@ -515,7 +540,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
@@ -539,16 +566,12 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::move(skipped_streams));
}
-void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn,
- UnwrappedTSN last_cumulative_tsn) {
+void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) {
RTC_DCHECK(outstanding_data_.empty());
- RTC_DCHECK(next_tsn_ == last_cumulative_tsn_ack_.next_value());
- RTC_DCHECK(next_tsn == last_cumulative_tsn.next_value());
- next_tsn_ = next_tsn;
last_cumulative_tsn_ack_ = last_cumulative_tsn;
}
void OutstandingData::BeginResetStreams() {
- stream_reset_breakpoint_tsns_.insert(next_tsn_);
+ stream_reset_breakpoint_tsns_.insert(next_tsn());
}
} // namespace dcsctp