diff options
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc')
-rw-r--r-- | third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc | 261 |
1 files changed, 142 insertions, 119 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc index c2706bd0d2..ca639abc54 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc @@ -14,12 +14,16 @@ #include <utility> #include <vector> +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/public/types.h" +#include "rtc_base/checks.h" #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::Timestamp; // The number of times a packet must be NACKed before it's retransmitted. // See https://tools.ietf.org/html/rfc4960#section-7.2.4 @@ -63,18 +67,18 @@ void OutstandingData::Item::MarkAsRetransmitted() { } void OutstandingData::Item::Abandon() { - RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() || + RTC_DCHECK(!expires_at_.IsPlusInfinity() || max_retransmissions_ != MaxRetransmits::NoLimit()); lifecycle_ = Lifecycle::kAbandoned; } -bool OutstandingData::Item::has_expired(TimeMs now) const { +bool OutstandingData::Item::has_expired(Timestamp now) const { return expires_at_ <= now; } bool OutstandingData::IsConsistent() const { - size_t actual_outstanding_bytes = 0; - size_t actual_outstanding_items = 0; + size_t actual_unacked_bytes = 0; + size_t actual_unacked_items = 0; std::set<UnwrappedTSN> combined_to_be_retransmitted; combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(), @@ -83,10 +87,12 @@ bool OutstandingData::IsConsistent() const { to_be_fast_retransmitted_.end()); std::set<UnwrappedTSN> actual_combined_to_be_retransmitted; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (item.is_outstanding()) { - actual_outstanding_bytes += GetSerializedChunkSize(item.data()); - ++actual_outstanding_items; + actual_unacked_bytes += GetSerializedChunkSize(item.data()); + ++actual_unacked_items; } if (item.should_be_retransmitted()) { @@ -94,33 +100,28 @@ bool OutstandingData::IsConsistent() const { } } - if (outstanding_data_.empty() && - next_tsn_ != last_cumulative_tsn_ack_.next_value()) { - return false; - } - - return actual_outstanding_bytes == outstanding_bytes_ && - actual_outstanding_items == outstanding_items_ && + return actual_unacked_bytes == unacked_bytes_ && + actual_unacked_items == unacked_items_ && actual_combined_to_be_retransmitted == combined_to_be_retransmitted; } void OutstandingData::AckChunk(AckInfo& ack_info, - std::map<UnwrappedTSN, Item>::iterator iter) { - if (!iter->second.is_acked()) { - size_t serialized_size = GetSerializedChunkSize(iter->second.data()); + UnwrappedTSN tsn, + Item& item) { + if (!item.is_acked()) { + size_t serialized_size = GetSerializedChunkSize(item.data()); ack_info.bytes_acked += serialized_size; - if (iter->second.is_outstanding()) { - outstanding_bytes_ -= serialized_size; - --outstanding_items_; + if (item.is_outstanding()) { + unacked_bytes_ -= serialized_size; + --unacked_items_; } - if (iter->second.should_be_retransmitted()) { - RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) == + if (item.should_be_retransmitted()) { + RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) == to_be_fast_retransmitted_.end()); - to_be_retransmitted_.erase(iter->first); + to_be_retransmitted_.erase(tsn); } - iter->second.Ack(); - ack_info.highest_tsn_acked = - std::max(ack_info.highest_tsn_acked, iter->first); + item.Ack(); + ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn); } } @@ -143,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack( return ack_info; } +OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + +const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info) { - auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); - - for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { - AckChunk(ack_info, iter); - if (iter->second.lifecycle_id().IsSet()) { - RTC_DCHECK(iter->second.data().is_end); - if (iter->second.is_abandoned()) { - ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id()); + while (!outstanding_data_.empty() && + last_cumulative_tsn_ack_ < cumulative_tsn_ack) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value(); + Item& item = outstanding_data_.front(); + AckChunk(ack_info, tsn, item); + if (item.lifecycle_id().IsSet()) { + RTC_DCHECK(item.data().is_end); + if (item.is_abandoned()) { + ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id()); } else { - ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id()); + ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id()); } } + outstanding_data_.pop_front(); + last_cumulative_tsn_ack_.Increment(); } - outstanding_data_.erase(outstanding_data_.begin(), first_unacked); - last_cumulative_tsn_ack_ = cumulative_tsn_ack; stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(), stream_reset_breakpoint_tsns_.upper_bound( cumulative_tsn_ack.next_value())); @@ -176,12 +196,13 @@ void OutstandingData::AckGapBlocks( // handled differently. for (auto& block : gap_ack_blocks) { - auto start = outstanding_data_.lower_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); - auto end = outstanding_data_.upper_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); - for (auto iter = start; iter != end; ++iter) { - AckChunk(ack_info, iter); + UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + Item& item = GetItem(tsn); + AckChunk(ack_info, tsn, item); + } } } } @@ -216,13 +237,12 @@ void OutstandingData::NackBetweenAckBlocks( for (auto& block : gap_ack_blocks) { UnwrappedTSN cur_block_first_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); - for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); - iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { - if (iter->first <= max_tsn_to_nack) { - ack_info.has_packet_loss |= - NackItem(iter->first, iter->second, /*retransmit_now=*/false, - /*do_fast_retransmit=*/!is_in_fast_recovery); - } + for (UnwrappedTSN tsn = prev_block_last_acked.next_value(); + tsn < cur_block_first_acked && tsn <= max_tsn_to_nack; + tsn = tsn.next_value()) { + ack_info.has_packet_loss |= + NackItem(tsn, /*retransmit_now=*/false, + /*do_fast_retransmit=*/!is_in_fast_recovery); } prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); } @@ -234,12 +254,12 @@ void OutstandingData::NackBetweenAckBlocks( } bool OutstandingData::NackItem(UnwrappedTSN tsn, - Item& item, bool retransmit_now, bool do_fast_retransmit) { + Item& item = GetItem(tsn); if (item.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(item.data()); - --outstanding_items_; + unacked_bytes_ -= GetSerializedChunkSize(item.data()); + --unacked_items_; } switch (item.Nack(retransmit_now)) { @@ -272,28 +292,25 @@ void OutstandingData::AbandonAllFor(const Item& item) { // skipped over). So create a new fragment, representing the end, that the // received will never see as it is abandoned immediately and used as cum // TSN in the sent FORWARD-TSN. - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); Data message_end(item.data().stream_id, item.data().ssn, item.data().mid, item.data().fsn, item.data().ppid, std::vector<uint8_t>(), Data::IsBeginning(false), Data::IsEnd(true), item.data().is_unordered); - Item& added_item = - outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple( - item.message_id(), std::move(message_end), TimeMs(0), - MaxRetransmits(0), TimeMs::InfiniteFuture(), - LifecycleId::NotSet())) - .first->second; - // The added chunk shouldn't be included in `outstanding_bytes`, so set it + UnwrappedTSN tsn = next_tsn(); + Item& added_item = outstanding_data_.emplace_back( + item.message_id(), std::move(message_end), Timestamp::Zero(), + MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet()); + + // The added chunk shouldn't be included in `unacked_bytes`, so set it // as acked. added_item.Ack(); RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn=" << *tsn.Wrap(); } - for (auto& [tsn, other] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (Item& other : outstanding_data_) { + tsn.Increment(); if (!other.is_abandoned() && other.data().stream_id == item.data().stream_id && other.message_id() == item.message_id()) { @@ -315,9 +332,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( for (auto it = chunks.begin(); it != chunks.end();) { UnwrappedTSN tsn = *it; - auto elem = outstanding_data_.find(tsn); - RTC_DCHECK(elem != outstanding_data_.end()); - Item& item = elem->second; + Item& item = GetItem(tsn); RTC_DCHECK(item.should_be_retransmitted()); RTC_DCHECK(!item.is_outstanding()); RTC_DCHECK(!item.is_abandoned()); @@ -328,8 +343,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( item.MarkAsRetransmitted(); result.emplace_back(tsn.Wrap(), item.data().Clone()); max_size -= serialized_size; - outstanding_bytes_ += serialized_size; - ++outstanding_items_; + unacked_bytes_ += serialized_size; + ++unacked_items_; it = chunks.erase(it); } else { ++it; @@ -370,8 +385,10 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted( return ExtractChunksThatCanFit(to_be_retransmitted_, max_size); } -void OutstandingData::ExpireOutstandingChunks(TimeMs now) { - for (const auto& [tsn, item] : outstanding_data_) { +void OutstandingData::ExpireOutstandingChunks(Timestamp now) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); // Chunks that are nacked can be expired. Care should be taken not to expire // unacked (in-flight) chunks as they might have been received, but the SACK // is either delayed or in-flight and may be received later. @@ -391,38 +408,33 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) { } UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { - return outstanding_data_.empty() ? last_cumulative_tsn_ack_ - : outstanding_data_.rbegin()->first; + return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_, + outstanding_data_.size()); } absl::optional<UnwrappedTSN> OutstandingData::Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + Timestamp expires_at, LifecycleId lifecycle_id) { - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); - // All chunks are always padded to be even divisible by 4. size_t chunk_size = GetSerializedChunkSize(data); - outstanding_bytes_ += chunk_size; - ++outstanding_items_; - auto it = outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(message_id, data.Clone(), - time_sent, max_retransmissions, - expires_at, lifecycle_id)) - .first; - - if (it->second.has_expired(time_sent)) { + unacked_bytes_ += chunk_size; + ++unacked_items_; + UnwrappedTSN tsn = next_tsn(); + Item& item = outstanding_data_.emplace_back(message_id, data.Clone(), + time_sent, max_retransmissions, + expires_at, lifecycle_id); + + if (item.has_expired(time_sent)) { // No need to send it - it was expired when it was in the send // queue. - RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " - << *it->first.Wrap() << " and message " - << *it->second.data().mid << " as expired"; - AbandonAllFor(it->second); + RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap() + << " and message " << *item.data().mid + << " as expired"; + AbandonAllFor(item); RTC_DCHECK(IsConsistent()); return absl::nullopt; } @@ -432,34 +444,47 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert( } void OutstandingData::NackAll() { - for (auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + // A two-pass algorithm is needed, as NackItem will invalidate iterators. + std::vector<UnwrappedTSN> tsns_to_nack; + for (Item& item : outstanding_data_) { + tsn.Increment(); if (!item.is_acked()) { - NackItem(tsn, item, /*retransmit_now=*/true, - /*do_fast_retransmit=*/false); + tsns_to_nack.push_back(tsn); } } + + for (UnwrappedTSN tsn : tsns_to_nack) { + NackItem(tsn, /*retransmit_now=*/true, + /*do_fast_retransmit=*/false); + } + RTC_DCHECK(IsConsistent()); } -absl::optional<DurationMs> OutstandingData::MeasureRTT(TimeMs now, - UnwrappedTSN tsn) const { - auto it = outstanding_data_.find(tsn); - if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { - // https://tools.ietf.org/html/rfc4960#section-6.3.1 - // "Karn's algorithm: RTT measurements MUST NOT be made using - // packets that were retransmitted (and thus for which it is ambiguous - // whether the reply was for the first instance of the chunk or for a - // later instance)" - return now - it->second.time_sent(); +webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now, + UnwrappedTSN tsn) const { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + const Item& item = GetItem(tsn); + if (!item.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + return now - item.time_sent(); + } } - return absl::nullopt; + return webrtc::TimeDelta::PlusInfinity(); } std::vector<std::pair<TSN, OutstandingData::State>> OutstandingData::GetChunkStatesForTesting() const { std::vector<std::pair<TSN, State>> states; states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); State state; if (item.is_abandoned()) { state = State::kAbandoned; @@ -480,9 +505,7 @@ OutstandingData::GetChunkStatesForTesting() const { bool OutstandingData::ShouldSendForwardTsn() const { if (!outstanding_data_.empty()) { - auto it = outstanding_data_.begin(); - return it->first == last_cumulative_tsn_ack_.next_value() && - it->second.is_abandoned(); + return outstanding_data_.front().is_abandoned(); } return false; } @@ -491,7 +514,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const { std::map<StreamID, SSN> skipped_per_ordered_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -515,7 +540,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -539,16 +566,12 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::move(skipped_streams)); } -void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn, - UnwrappedTSN last_cumulative_tsn) { +void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) { RTC_DCHECK(outstanding_data_.empty()); - RTC_DCHECK(next_tsn_ == last_cumulative_tsn_ack_.next_value()); - RTC_DCHECK(next_tsn == last_cumulative_tsn.next_value()); - next_tsn_ = next_tsn; last_cumulative_tsn_ack_ = last_cumulative_tsn; } void OutstandingData::BeginResetStreams() { - stream_reset_breakpoint_tsns_.insert(next_tsn_); + stream_reset_breakpoint_tsns_.insert(next_tsn()); } } // namespace dcsctp |