diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:14:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:14:29 +0000 |
commit | fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 (patch) | |
tree | 4c1ccaf5486d4f2009f9a338a98a83e886e29c97 /third_party/libwebrtc/net/dcsctp/tx | |
parent | Releasing progress-linux version 124.0.1-1~progress7.99u1. (diff) | |
download | firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.tar.xz firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.zip |
Merging upstream version 125.0.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx')
18 files changed, 605 insertions, 552 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn index 5547ffa870..d1fd8ab3d5 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn @@ -11,6 +11,7 @@ import("../../../webrtc.gni") rtc_source_set("send_queue") { deps = [ "../../../api:array_view", + "../../../api/units:timestamp", "../common:internal_types", "../packet:chunk", "../packet:data", @@ -28,9 +29,9 @@ rtc_library("rr_send_queue") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_map", "../common:internal_types", - "../common:str_join", "../packet:data", "../public:socket", "../public:types", @@ -53,9 +54,9 @@ rtc_library("stream_scheduler") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_set", - "../common:str_join", "../packet:chunk", "../packet:data", "../packet:sctp_packet", @@ -89,6 +90,7 @@ rtc_library("retransmission_error_counter") { rtc_library("retransmission_timeout") { deps = [ + "../../../api/units:time_delta", "../../../rtc_base:checks", "../public:types", ] @@ -103,13 +105,15 @@ rtc_library("outstanding_data") { ":retransmission_timeout", ":send_queue", "../../../api:array_view", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_set", "../common:internal_types", "../common:math", "../common:sequence_numbers", - "../common:str_join", "../packet:chunk", "../packet:data", "../public:socket", @@ -138,7 +142,6 @@ rtc_library("retransmission_queue") { "../../../rtc_base:stringutils", "../common:math", "../common:sequence_numbers", - "../common:str_join", "../packet:chunk", "../packet:data", "../public:socket", @@ -162,6 +165,7 @@ if (rtc_include_tests) { deps = [ ":send_queue", "../../../api:array_view", + "../../../api/units:timestamp", "../../../test:test_support", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] diff --git a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h index 04921866ae..3511403eab 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h @@ -15,6 +15,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/tx/send_queue.h" #include "test/gmock.h" @@ -23,14 +24,15 @@ namespace dcsctp { class MockSendQueue : public SendQueue { public: MockSendQueue() { - ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) { - return absl::nullopt; - }); + ON_CALL(*this, Produce) + .WillByDefault([](webrtc::Timestamp now, size_t max_size) { + return absl::nullopt; + }); } MOCK_METHOD(absl::optional<SendQueue::DataToSend>, Produce, - (TimeMs now, size_t max_size), + (webrtc::Timestamp now, size_t max_size), (override)); MOCK_METHOD(bool, Discard, diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc index c2706bd0d2..ca639abc54 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc @@ -14,12 +14,16 @@ #include <utility> #include <vector> +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/public/types.h" +#include "rtc_base/checks.h" #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::Timestamp; // The number of times a packet must be NACKed before it's retransmitted. // See https://tools.ietf.org/html/rfc4960#section-7.2.4 @@ -63,18 +67,18 @@ void OutstandingData::Item::MarkAsRetransmitted() { } void OutstandingData::Item::Abandon() { - RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() || + RTC_DCHECK(!expires_at_.IsPlusInfinity() || max_retransmissions_ != MaxRetransmits::NoLimit()); lifecycle_ = Lifecycle::kAbandoned; } -bool OutstandingData::Item::has_expired(TimeMs now) const { +bool OutstandingData::Item::has_expired(Timestamp now) const { return expires_at_ <= now; } bool OutstandingData::IsConsistent() const { - size_t actual_outstanding_bytes = 0; - size_t actual_outstanding_items = 0; + size_t actual_unacked_bytes = 0; + size_t actual_unacked_items = 0; std::set<UnwrappedTSN> combined_to_be_retransmitted; combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(), @@ -83,10 +87,12 @@ bool OutstandingData::IsConsistent() const { to_be_fast_retransmitted_.end()); std::set<UnwrappedTSN> actual_combined_to_be_retransmitted; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (item.is_outstanding()) { - actual_outstanding_bytes += GetSerializedChunkSize(item.data()); - ++actual_outstanding_items; + actual_unacked_bytes += GetSerializedChunkSize(item.data()); + ++actual_unacked_items; } if (item.should_be_retransmitted()) { @@ -94,33 +100,28 @@ bool OutstandingData::IsConsistent() const { } } - if (outstanding_data_.empty() && - next_tsn_ != last_cumulative_tsn_ack_.next_value()) { - return false; - } - - return actual_outstanding_bytes == outstanding_bytes_ && - actual_outstanding_items == outstanding_items_ && + return actual_unacked_bytes == unacked_bytes_ && + actual_unacked_items == unacked_items_ && actual_combined_to_be_retransmitted == combined_to_be_retransmitted; } void OutstandingData::AckChunk(AckInfo& ack_info, - std::map<UnwrappedTSN, Item>::iterator iter) { - if (!iter->second.is_acked()) { - size_t serialized_size = GetSerializedChunkSize(iter->second.data()); + UnwrappedTSN tsn, + Item& item) { + if (!item.is_acked()) { + size_t serialized_size = GetSerializedChunkSize(item.data()); ack_info.bytes_acked += serialized_size; - if (iter->second.is_outstanding()) { - outstanding_bytes_ -= serialized_size; - --outstanding_items_; + if (item.is_outstanding()) { + unacked_bytes_ -= serialized_size; + --unacked_items_; } - if (iter->second.should_be_retransmitted()) { - RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) == + if (item.should_be_retransmitted()) { + RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) == to_be_fast_retransmitted_.end()); - to_be_retransmitted_.erase(iter->first); + to_be_retransmitted_.erase(tsn); } - iter->second.Ack(); - ack_info.highest_tsn_acked = - std::max(ack_info.highest_tsn_acked, iter->first); + item.Ack(); + ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn); } } @@ -143,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack( return ack_info; } +OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + +const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info) { - auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); - - for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { - AckChunk(ack_info, iter); - if (iter->second.lifecycle_id().IsSet()) { - RTC_DCHECK(iter->second.data().is_end); - if (iter->second.is_abandoned()) { - ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id()); + while (!outstanding_data_.empty() && + last_cumulative_tsn_ack_ < cumulative_tsn_ack) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value(); + Item& item = outstanding_data_.front(); + AckChunk(ack_info, tsn, item); + if (item.lifecycle_id().IsSet()) { + RTC_DCHECK(item.data().is_end); + if (item.is_abandoned()) { + ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id()); } else { - ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id()); + ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id()); } } + outstanding_data_.pop_front(); + last_cumulative_tsn_ack_.Increment(); } - outstanding_data_.erase(outstanding_data_.begin(), first_unacked); - last_cumulative_tsn_ack_ = cumulative_tsn_ack; stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(), stream_reset_breakpoint_tsns_.upper_bound( cumulative_tsn_ack.next_value())); @@ -176,12 +196,13 @@ void OutstandingData::AckGapBlocks( // handled differently. for (auto& block : gap_ack_blocks) { - auto start = outstanding_data_.lower_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); - auto end = outstanding_data_.upper_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); - for (auto iter = start; iter != end; ++iter) { - AckChunk(ack_info, iter); + UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + Item& item = GetItem(tsn); + AckChunk(ack_info, tsn, item); + } } } } @@ -216,13 +237,12 @@ void OutstandingData::NackBetweenAckBlocks( for (auto& block : gap_ack_blocks) { UnwrappedTSN cur_block_first_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); - for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); - iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { - if (iter->first <= max_tsn_to_nack) { - ack_info.has_packet_loss |= - NackItem(iter->first, iter->second, /*retransmit_now=*/false, - /*do_fast_retransmit=*/!is_in_fast_recovery); - } + for (UnwrappedTSN tsn = prev_block_last_acked.next_value(); + tsn < cur_block_first_acked && tsn <= max_tsn_to_nack; + tsn = tsn.next_value()) { + ack_info.has_packet_loss |= + NackItem(tsn, /*retransmit_now=*/false, + /*do_fast_retransmit=*/!is_in_fast_recovery); } prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); } @@ -234,12 +254,12 @@ void OutstandingData::NackBetweenAckBlocks( } bool OutstandingData::NackItem(UnwrappedTSN tsn, - Item& item, bool retransmit_now, bool do_fast_retransmit) { + Item& item = GetItem(tsn); if (item.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(item.data()); - --outstanding_items_; + unacked_bytes_ -= GetSerializedChunkSize(item.data()); + --unacked_items_; } switch (item.Nack(retransmit_now)) { @@ -272,28 +292,25 @@ void OutstandingData::AbandonAllFor(const Item& item) { // skipped over). So create a new fragment, representing the end, that the // received will never see as it is abandoned immediately and used as cum // TSN in the sent FORWARD-TSN. - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); Data message_end(item.data().stream_id, item.data().ssn, item.data().mid, item.data().fsn, item.data().ppid, std::vector<uint8_t>(), Data::IsBeginning(false), Data::IsEnd(true), item.data().is_unordered); - Item& added_item = - outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple( - item.message_id(), std::move(message_end), TimeMs(0), - MaxRetransmits(0), TimeMs::InfiniteFuture(), - LifecycleId::NotSet())) - .first->second; - // The added chunk shouldn't be included in `outstanding_bytes`, so set it + UnwrappedTSN tsn = next_tsn(); + Item& added_item = outstanding_data_.emplace_back( + item.message_id(), std::move(message_end), Timestamp::Zero(), + MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet()); + + // The added chunk shouldn't be included in `unacked_bytes`, so set it // as acked. added_item.Ack(); RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn=" << *tsn.Wrap(); } - for (auto& [tsn, other] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (Item& other : outstanding_data_) { + tsn.Increment(); if (!other.is_abandoned() && other.data().stream_id == item.data().stream_id && other.message_id() == item.message_id()) { @@ -315,9 +332,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( for (auto it = chunks.begin(); it != chunks.end();) { UnwrappedTSN tsn = *it; - auto elem = outstanding_data_.find(tsn); - RTC_DCHECK(elem != outstanding_data_.end()); - Item& item = elem->second; + Item& item = GetItem(tsn); RTC_DCHECK(item.should_be_retransmitted()); RTC_DCHECK(!item.is_outstanding()); RTC_DCHECK(!item.is_abandoned()); @@ -328,8 +343,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( item.MarkAsRetransmitted(); result.emplace_back(tsn.Wrap(), item.data().Clone()); max_size -= serialized_size; - outstanding_bytes_ += serialized_size; - ++outstanding_items_; + unacked_bytes_ += serialized_size; + ++unacked_items_; it = chunks.erase(it); } else { ++it; @@ -370,8 +385,10 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted( return ExtractChunksThatCanFit(to_be_retransmitted_, max_size); } -void OutstandingData::ExpireOutstandingChunks(TimeMs now) { - for (const auto& [tsn, item] : outstanding_data_) { +void OutstandingData::ExpireOutstandingChunks(Timestamp now) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); // Chunks that are nacked can be expired. Care should be taken not to expire // unacked (in-flight) chunks as they might have been received, but the SACK // is either delayed or in-flight and may be received later. @@ -391,38 +408,33 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) { } UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { - return outstanding_data_.empty() ? last_cumulative_tsn_ack_ - : outstanding_data_.rbegin()->first; + return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_, + outstanding_data_.size()); } absl::optional<UnwrappedTSN> OutstandingData::Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + Timestamp expires_at, LifecycleId lifecycle_id) { - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); - // All chunks are always padded to be even divisible by 4. size_t chunk_size = GetSerializedChunkSize(data); - outstanding_bytes_ += chunk_size; - ++outstanding_items_; - auto it = outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(message_id, data.Clone(), - time_sent, max_retransmissions, - expires_at, lifecycle_id)) - .first; - - if (it->second.has_expired(time_sent)) { + unacked_bytes_ += chunk_size; + ++unacked_items_; + UnwrappedTSN tsn = next_tsn(); + Item& item = outstanding_data_.emplace_back(message_id, data.Clone(), + time_sent, max_retransmissions, + expires_at, lifecycle_id); + + if (item.has_expired(time_sent)) { // No need to send it - it was expired when it was in the send // queue. - RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " - << *it->first.Wrap() << " and message " - << *it->second.data().mid << " as expired"; - AbandonAllFor(it->second); + RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap() + << " and message " << *item.data().mid + << " as expired"; + AbandonAllFor(item); RTC_DCHECK(IsConsistent()); return absl::nullopt; } @@ -432,34 +444,47 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert( } void OutstandingData::NackAll() { - for (auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + // A two-pass algorithm is needed, as NackItem will invalidate iterators. + std::vector<UnwrappedTSN> tsns_to_nack; + for (Item& item : outstanding_data_) { + tsn.Increment(); if (!item.is_acked()) { - NackItem(tsn, item, /*retransmit_now=*/true, - /*do_fast_retransmit=*/false); + tsns_to_nack.push_back(tsn); } } + + for (UnwrappedTSN tsn : tsns_to_nack) { + NackItem(tsn, /*retransmit_now=*/true, + /*do_fast_retransmit=*/false); + } + RTC_DCHECK(IsConsistent()); } -absl::optional<DurationMs> OutstandingData::MeasureRTT(TimeMs now, - UnwrappedTSN tsn) const { - auto it = outstanding_data_.find(tsn); - if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { - // https://tools.ietf.org/html/rfc4960#section-6.3.1 - // "Karn's algorithm: RTT measurements MUST NOT be made using - // packets that were retransmitted (and thus for which it is ambiguous - // whether the reply was for the first instance of the chunk or for a - // later instance)" - return now - it->second.time_sent(); +webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now, + UnwrappedTSN tsn) const { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + const Item& item = GetItem(tsn); + if (!item.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + return now - item.time_sent(); + } } - return absl::nullopt; + return webrtc::TimeDelta::PlusInfinity(); } std::vector<std::pair<TSN, OutstandingData::State>> OutstandingData::GetChunkStatesForTesting() const { std::vector<std::pair<TSN, State>> states; states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); State state; if (item.is_abandoned()) { state = State::kAbandoned; @@ -480,9 +505,7 @@ OutstandingData::GetChunkStatesForTesting() const { bool OutstandingData::ShouldSendForwardTsn() const { if (!outstanding_data_.empty()) { - auto it = outstanding_data_.begin(); - return it->first == last_cumulative_tsn_ack_.next_value() && - it->second.is_abandoned(); + return outstanding_data_.front().is_abandoned(); } return false; } @@ -491,7 +514,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const { std::map<StreamID, SSN> skipped_per_ordered_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -515,7 +540,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -539,16 +566,12 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::move(skipped_streams)); } -void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn, - UnwrappedTSN last_cumulative_tsn) { +void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) { RTC_DCHECK(outstanding_data_.empty()); - RTC_DCHECK(next_tsn_ == last_cumulative_tsn_ack_.next_value()); - RTC_DCHECK(next_tsn == last_cumulative_tsn.next_value()); - next_tsn_ = next_tsn; last_cumulative_tsn_ack_ = last_cumulative_tsn; } void OutstandingData::BeginResetStreams() { - stream_reset_breakpoint_tsns_.insert(next_tsn_); + stream_reset_breakpoint_tsns_.insert(next_tsn()); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h index f8e939661d..2a214975e6 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h @@ -10,12 +10,14 @@ #ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_ #define NET_DCSCTP_TX_OUTSTANDING_DATA_H_ +#include <deque> #include <map> #include <set> #include <utility> #include <vector> #include "absl/types/optional.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" @@ -29,6 +31,9 @@ namespace dcsctp { // This class keeps track of outstanding data chunks (sent, not yet acked) and // handles acking, nacking, rescheduling and abandoning. +// +// Items are added to this queue as they are sent and will be removed when the +// peer acks them using the cumulative TSN ack. class OutstandingData { public: // State for DATA chunks (message fragments) in the queue - used in tests. @@ -74,11 +79,9 @@ class OutstandingData { OutstandingData( size_t data_chunk_header_size, - UnwrappedTSN next_tsn, UnwrappedTSN last_cumulative_tsn_ack, std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue) : data_chunk_header_size_(data_chunk_header_size), - next_tsn_(next_tsn), last_cumulative_tsn_ack_(last_cumulative_tsn_ack), discard_from_send_queue_(std::move(discard_from_send_queue)) {} @@ -98,14 +101,14 @@ class OutstandingData { // it? std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size); - size_t outstanding_bytes() const { return outstanding_bytes_; } + size_t unacked_bytes() const { return unacked_bytes_; } - // Returns the number of DATA chunks that are in-flight. - size_t outstanding_items() const { return outstanding_items_; } + // Returns the number of DATA chunks that are in-flight (not acked or nacked). + size_t unacked_items() const { return unacked_items_; } // Given the current time `now_ms`, expire and abandon outstanding (sent at // least once) chunks that have a limited lifetime. - void ExpireOutstandingChunks(TimeMs now); + void ExpireOutstandingChunks(webrtc::Timestamp now); bool empty() const { return outstanding_data_.empty(); } @@ -121,7 +124,9 @@ class OutstandingData { return last_cumulative_tsn_ack_; } - UnwrappedTSN next_tsn() const { return next_tsn_; } + UnwrappedTSN next_tsn() const { + return highest_outstanding_tsn().next_value(); + } UnwrappedTSN highest_outstanding_tsn() const; @@ -131,9 +136,9 @@ class OutstandingData { absl::optional<UnwrappedTSN> Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(), - TimeMs expires_at = TimeMs::InfiniteFuture(), + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(), LifecycleId lifecycle_id = LifecycleId::NotSet()); // Nacks all outstanding data. @@ -147,8 +152,8 @@ class OutstandingData { // Given the current time and a TSN, it returns the measured RTT between when // the chunk was sent and now. It takes into acccount Karn's algorithm, so if - // the chunk has ever been retransmitted, it will return absl::nullopt. - absl::optional<DurationMs> MeasureRTT(TimeMs now, UnwrappedTSN tsn) const; + // the chunk has ever been retransmitted, it will return `PlusInfinity()`. + webrtc::TimeDelta MeasureRTT(webrtc::Timestamp now, UnwrappedTSN tsn) const; // Returns the internal state of all queued chunks. This is only used in // unit-tests. @@ -159,8 +164,7 @@ class OutstandingData { bool ShouldSendForwardTsn() const; // Sets the next TSN to be used. This is used in handover. - void ResetSequenceNumbers(UnwrappedTSN next_tsn, - UnwrappedTSN last_cumulative_tsn); + void ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn); // Called when an outgoing stream reset is sent, marking the last assigned TSN // as a breakpoint that a FORWARD-TSN shouldn't cross. @@ -179,9 +183,9 @@ class OutstandingData { Item(OutgoingMessageId message_id, Data data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + webrtc::Timestamp expires_at, LifecycleId lifecycle_id) : message_id_(message_id), time_sent_(time_sent), @@ -195,7 +199,7 @@ class OutstandingData { OutgoingMessageId message_id() const { return message_id_; } - TimeMs time_sent() const { return time_sent_; } + webrtc::Timestamp time_sent() const { return time_sent_; } const Data& data() const { return data_; } @@ -229,7 +233,7 @@ class OutstandingData { // Given the current time, and the current state of this DATA chunk, it will // indicate if it has expired (SCTP Partial Reliability Extension). - bool has_expired(TimeMs now) const; + bool has_expired(webrtc::Timestamp now) const; LifecycleId lifecycle_id() const { return lifecycle_id_; } @@ -258,7 +262,7 @@ class OutstandingData { const OutgoingMessageId message_id_; // When the packet was sent, and placed in this queue. - const TimeMs time_sent_; + const webrtc::Timestamp time_sent_; // If the message was sent with a maximum number of retransmissions, this is // set to that number. The value zero (0) means that it will never be // retransmitted. @@ -278,7 +282,7 @@ class OutstandingData { // At this exact millisecond, the item is considered expired. If the message // is not to be expired, this is set to the infinite future. - const TimeMs expires_at_; + const webrtc::Timestamp expires_at_; // An optional lifecycle id, which may only be set for the last fragment. const LifecycleId lifecycle_id_; @@ -290,6 +294,9 @@ class OutstandingData { // Returns how large a chunk will be, serialized, carrying the data size_t GetSerializedChunkSize(const Data& data) const; + Item& GetItem(UnwrappedTSN tsn); + const Item& GetItem(UnwrappedTSN tsn) const; + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items // in the retransmission queue up until this value and will update `ack_info` // by setting `bytes_acked_by_cumulative_tsn_ack`. @@ -313,7 +320,7 @@ class OutstandingData { // Process the acknowledgement of the chunk referenced by `iter` and updates // state in `ack_info` and the object's state. - void AckChunk(AckInfo& ack_info, std::map<UnwrappedTSN, Item>::iterator iter); + void AckChunk(AckInfo& ack_info, UnwrappedTSN tsn, Item& item); // Helper method to process an incoming nack of an item and perform the // correct operations given the action indicated when nacking an item (e.g. @@ -323,10 +330,11 @@ class OutstandingData { // many times so that it should be retransmitted, this will schedule it to be // "fast retransmitted". This is only done just before going into fast // recovery. - bool NackItem(UnwrappedTSN tsn, - Item& item, - bool retransmit_now, - bool do_fast_retransmit); + // + // Note that since nacking an item may result in it becoming abandoned, which + // in turn could alter `outstanding_data_`, any iterators are invalidated + // after having called this method. + bool NackItem(UnwrappedTSN tsn, bool retransmit_now, bool do_fast_retransmit); // Given that a message fragment, `item` has been abandoned, abandon all other // fragments that share the same message - both never-before-sent fragments @@ -341,19 +349,20 @@ class OutstandingData { // The size of the data chunk (DATA/I-DATA) header that is used. const size_t data_chunk_header_size_; - // Next TSN to used. - UnwrappedTSN next_tsn_; // The last cumulative TSN ack number. UnwrappedTSN last_cumulative_tsn_ack_; // Callback when to discard items from the send queue. std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_; - std::map<UnwrappedTSN, Item> outstanding_data_; + // Outstanding items. If non-empty, the first element has + // `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict + // increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`. + std::deque<Item> outstanding_data_; // The number of bytes that are in-flight (sent but not yet acked or nacked). - size_t outstanding_bytes_ = 0; + size_t unacked_bytes_ = 0; // The number of DATA chunks that are in-flight (sent but not yet acked or // nacked). - size_t outstanding_items_ = 0; + size_t unacked_items_ = 0; // Data chunks that are eligible for fast retransmission. std::set<UnwrappedTSN> to_be_fast_retransmitted_; // Data chunks that are to be retransmitted. diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc index b8c2e593a1..e4bdb7ce7e 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc @@ -37,8 +37,10 @@ using ::testing::Property; using ::testing::Return; using ::testing::StrictMock; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow(42); +constexpr Timestamp kNow = Timestamp::Millis(42); constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17); class OutstandingDataTest : public testing::Test { @@ -46,7 +48,6 @@ class OutstandingDataTest : public testing::Test { OutstandingDataTest() : gen_(MID(42)), buf_(DataChunk::kHeaderSize, - unwrapper_.Unwrap(TSN(10)), unwrapper_.Unwrap(TSN(9)), on_discard_.AsStdFunction()) {} @@ -58,8 +59,8 @@ class OutstandingDataTest : public testing::Test { TEST_F(OutstandingDataTest, HasInitialState) { EXPECT_TRUE(buf_.empty()); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(10)); @@ -75,8 +76,8 @@ TEST_F(OutstandingDataTest, InsertChunk) { EXPECT_EQ(tsn.Wrap(), TSN(10)); - EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); - EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -95,8 +96,8 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) { EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10)); EXPECT_FALSE(ack.has_packet_loss); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -109,8 +110,8 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) { buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false); - EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); - EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -131,8 +132,8 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) { EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11)); EXPECT_FALSE(ack.has_packet_loss); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(12)); @@ -277,20 +278,20 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { } TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { - static constexpr TimeMs kExpiresAt = kNow + DurationMs(1); + static constexpr Timestamp kExpiresAt = kNow + TimeDelta::Millis(1); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""), - kNow + DurationMs(0), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(0), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), - kNow + DurationMs(1), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(1), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); @@ -362,15 +363,14 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { TEST_F(OutstandingDataTest, MeasureRTT) { buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1)); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(1)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(2)); - static constexpr DurationMs kDuration(123); - ASSERT_HAS_VALUE_AND_ASSIGN( - DurationMs duration, - buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11)))); + static constexpr TimeDelta kDuration = TimeDelta::Millis(123); + TimeDelta duration = + buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11))); - EXPECT_EQ(duration, kDuration - DurationMs(1)); + EXPECT_EQ(duration, kDuration - TimeDelta::Millis(1)); } TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { @@ -453,13 +453,13 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) { buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(42)); buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(43)); buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(44)); OutstandingData::AckInfo ack1 = @@ -479,7 +479,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -515,7 +515,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); EXPECT_THAT(buf_.GetChunkStatesForTesting(), testing::ElementsAre(Pair(TSN(9), State::kAcked), // diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc index 2b9843f4a7..8c0d227a36 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc @@ -25,7 +25,6 @@ #include "api/array_view.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" @@ -40,10 +39,13 @@ #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" #include "rtc_base/strings/string_builder.h" namespace dcsctp { namespace { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // Allow sending only slightly less than an MTU, to account for headers. constexpr float kMinBytesRequiredToSendFactor = 0.9; @@ -55,7 +57,7 @@ RetransmissionQueue::RetransmissionQueue( TSN my_initial_tsn, size_t a_rwnd, SendQueue& send_queue, - std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void(TimeDelta rtt)> on_new_rtt, std::function<void()> on_clear_retransmission_counter, Timer& t3_rtx, const DcSctpOptions& options, @@ -84,7 +86,6 @@ RetransmissionQueue::RetransmissionQueue( send_queue_(send_queue), outstanding_data_( data_chunk_header_size_, - tsn_unwrapper_.Unwrap(my_initial_tsn), tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)), [this](StreamID stream_id, OutgoingMessageId message_id) { return send_queue_.Discard(stream_id, message_id); @@ -114,12 +115,12 @@ void RetransmissionQueue::MaybeExitFastRecovery( } void RetransmissionQueue::HandleIncreasedCumulativeTsnAck( - size_t outstanding_bytes, + size_t unacked_bytes, size_t total_bytes_acked) { // Allow some margin for classifying as fully utilized, due to e.g. that too // small packets (less than kMinimumFragmentedPayload) are not sent + // overhead. - bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_; + bool is_fully_utilized = unacked_bytes + options_.mtu >= cwnd_; size_t old_cwnd = cwnd_; if (phase() == CongestionAlgorithmPhase::kSlowStart) { if (is_fully_utilized && !is_in_fast_recovery()) { @@ -204,13 +205,13 @@ void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { } void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { - rwnd_ = outstanding_data_.outstanding_bytes() >= a_rwnd + rwnd_ = outstanding_data_.unacked_bytes() >= a_rwnd ? 0 - : a_rwnd - outstanding_data_.outstanding_bytes(); + : a_rwnd - outstanding_data_.unacked_bytes(); } void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { - // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to + // Note: Can't use `unacked_bytes()` as that one doesn't count chunks to // be retransmitted. if (outstanding_data_.empty()) { // https://tools.ietf.org/html/rfc4960#section-6.3.2 @@ -257,14 +258,14 @@ bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { return true; } -bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { +bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) { if (!IsSackValid(sack)) { return false; } UnwrappedTSN old_last_cumulative_tsn_ack = outstanding_data_.last_cumulative_tsn_ack(); - size_t old_outstanding_bytes = outstanding_data_.outstanding_bytes(); + size_t old_unacked_bytes = outstanding_data_.unacked_bytes(); size_t old_rwnd = rwnd_; UnwrappedTSN cumulative_tsn_ack = tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); @@ -301,9 +302,9 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " (" << *old_last_cumulative_tsn_ack.Wrap() - << "), outstanding_bytes=" - << outstanding_data_.outstanding_bytes() << " (" - << old_outstanding_bytes << "), rwnd=" << rwnd_ << " (" + << "), unacked_bytes=" + << outstanding_data_.unacked_bytes() << " (" + << old_unacked_bytes << "), rwnd=" << rwnd_ << " (" << old_rwnd << ")"; if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) { @@ -315,8 +316,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { // Note: It may be started again in a bit further down. t3_rtx_.Stop(); - HandleIncreasedCumulativeTsnAck(old_outstanding_bytes, - ack_info.bytes_acked); + HandleIncreasedCumulativeTsnAck(old_unacked_bytes, ack_info.bytes_acked); } if (ack_info.has_packet_loss) { @@ -335,7 +335,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { return true; } -void RetransmissionQueue::UpdateRTT(TimeMs now, +void RetransmissionQueue::UpdateRTT(Timestamp now, UnwrappedTSN cumulative_tsn_ack) { // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C, // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin @@ -345,17 +345,16 @@ void RetransmissionQueue::UpdateRTT(TimeMs now, // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and // use only those packets for measurement. - absl::optional<DurationMs> rtt = - outstanding_data_.MeasureRTT(now, cumulative_tsn_ack); + TimeDelta rtt = outstanding_data_.MeasureRTT(now, cumulative_tsn_ack); - if (rtt.has_value()) { - on_new_rtt_(*rtt); + if (rtt.IsFinite()) { + on_new_rtt_(rtt); } } void RetransmissionQueue::HandleT3RtxTimerExpiry() { size_t old_cwnd = cwnd_; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); // https://tools.ietf.org/html/rfc4960#section-6.3.3 // "For the destination address for which the timer expires, adjust // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." @@ -392,8 +391,8 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ << " (" << old_cwnd << "), ssthresh=" << ssthresh_ - << ", outstanding_bytes " << outstanding_bytes() << " (" - << old_outstanding_bytes << ")"; + << ", unacked_bytes " << unacked_bytes() << " (" + << old_unacked_bytes << ")"; RTC_DCHECK(IsConsistent()); } @@ -402,7 +401,7 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) { RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted()); RTC_DCHECK(IsDivisibleBy4(bytes_in_packet)); std::vector<std::pair<TSN, Data>> to_be_sent; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); to_be_sent = outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet); @@ -441,21 +440,21 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) { sb << *c.first; }) << " - " << bytes_retransmitted - << " bytes. outstanding_bytes=" << outstanding_bytes() - << " (" << old_outstanding_bytes << ")"; + << " bytes. unacked_bytes=" << unacked_bytes() << " (" + << old_unacked_bytes << ")"; RTC_DCHECK(IsConsistent()); return to_be_sent; } std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( - TimeMs now, + Timestamp now, size_t bytes_remaining_in_packet) { // Chunks are always padded to even divisible by four. RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); std::vector<std::pair<TSN, Data>> to_be_sent; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); size_t old_rwnd = rwnd_; // Calculate the bandwidth budget (how many bytes that is @@ -494,7 +493,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( chunk_opt->message_id, chunk_opt->data, now, partial_reliability_ ? chunk_opt->max_retransmissions : MaxRetransmits::NoLimit(), - partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(), + partial_reliability_ ? chunk_opt->expires_at + : Timestamp::PlusInfinity(), chunk_opt->lifecycle_id); if (tsn.has_value()) { @@ -526,8 +526,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( [&](size_t r, const std::pair<TSN, Data>& d) { return r + GetSerializedChunkSize(d.second); }) - << " bytes. outstanding_bytes=" << outstanding_bytes() - << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_ + << " bytes. unacked_bytes=" << unacked_bytes() << " (" + << old_unacked_bytes << "), cwnd=" << cwnd_ << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; } RTC_DCHECK(IsConsistent()); @@ -539,7 +539,7 @@ bool RetransmissionQueue::can_send_data() const { max_bytes_to_send() >= min_bytes_required_to_send_; } -bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { +bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) { if (!partial_reliability_) { return false; } @@ -550,9 +550,9 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { } size_t RetransmissionQueue::max_bytes_to_send() const { - size_t left = outstanding_bytes() >= cwnd_ ? 0 : cwnd_ - outstanding_bytes(); + size_t left = unacked_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_bytes(); - if (outstanding_bytes() == 0) { + if (unacked_bytes() == 0) { // https://datatracker.ietf.org/doc/html/rfc4960#section-6.1 // ... However, regardless of the value of rwnd (including if it is 0), the // data sender can always have one DATA chunk in flight to the receiver if @@ -619,7 +619,6 @@ void RetransmissionQueue::RestoreFromState( partial_bytes_acked_ = state.tx.partial_bytes_acked; outstanding_data_.ResetSequenceNumbers( - tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn)), tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn - 1))); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h index b44db2a9a0..a0fbb33c47 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h @@ -60,7 +60,7 @@ class RetransmissionQueue { TSN my_initial_tsn, size_t a_rwnd, SendQueue& send_queue, - std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void(webrtc::TimeDelta rtt)> on_new_rtt, std::function<void()> on_clear_retransmission_counter, Timer& t3_rtx, const DcSctpOptions& options, @@ -69,7 +69,7 @@ class RetransmissionQueue { // Handles a received SACK. Returns true if the `sack` was processed and // false if it was discarded due to received out-of-order and not relevant. - bool HandleSack(TimeMs now, const SackChunk& sack); + bool HandleSack(webrtc::Timestamp now, const SackChunk& sack); // Handles an expired retransmission timer. void HandleT3RtxTimerExpiry(); @@ -90,7 +90,7 @@ class RetransmissionQueue { // called prior to this method, to abandon expired chunks, as this method will // not expire any chunks. std::vector<std::pair<TSN, Data>> GetChunksToSend( - TimeMs now, + webrtc::Timestamp now, size_t bytes_remaining_in_packet); // Returns the internal state of all queued chunks. This is only used in @@ -121,14 +121,10 @@ class RetransmissionQueue { uint64_t rtx_bytes_count() const { return rtx_bytes_count_; } // Returns the number of bytes of packets that are in-flight. - size_t outstanding_bytes() const { - return outstanding_data_.outstanding_bytes(); - } + size_t unacked_bytes() const { return outstanding_data_.unacked_bytes(); } // Returns the number of DATA chunks that are in-flight. - size_t outstanding_items() const { - return outstanding_data_.outstanding_items(); - } + size_t unacked_items() const { return outstanding_data_.unacked_items(); } // Indicates if the congestion control algorithm allows data to be sent. bool can_send_data() const; @@ -136,7 +132,7 @@ class RetransmissionQueue { // Given the current time `now`, it will evaluate if there are chunks that // have expired and that need to be discarded. It returns true if a // FORWARD-TSN should be sent. - bool ShouldSendForwardTsn(TimeMs now); + bool ShouldSendForwardTsn(webrtc::Timestamp now); // Creates a FORWARD-TSN chunk. ForwardTsnChunk CreateForwardTsn() const { @@ -185,7 +181,7 @@ class RetransmissionQueue { // When a SACK chunk is received, this method will be called which _may_ call // into the `RetransmissionTimeout` to update the RTO. - void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); + void UpdateRTT(webrtc::Timestamp now, UnwrappedTSN cumulative_tsn_ack); // If the congestion control is in "fast recovery mode", this may be exited // now. @@ -197,7 +193,7 @@ class RetransmissionQueue { // Update the congestion control algorithm given as the cumulative ack TSN // value has increased, as reported in an incoming SACK chunk. - void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes, + void HandleIncreasedCumulativeTsnAck(size_t unacked_bytes, size_t total_bytes_acked); // Update the congestion control algorithm, given as packet loss has been // detected, as reported in an incoming SACK chunk. @@ -230,7 +226,7 @@ class RetransmissionQueue { // The size of the data chunk (DATA/I-DATA) header that is used. const size_t data_chunk_header_size_; // Called when a new RTT measurement has been done - const std::function<void(DurationMs rtt)> on_new_rtt_; + const std::function<void(webrtc::TimeDelta rtt)> on_new_rtt_; // Called when a SACK has been seen that cleared the retransmission counter. const std::function<void()> on_clear_retransmission_counter_; // The retransmission counter. diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc index d50494f084..eb1e04a5bb 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc @@ -52,6 +52,8 @@ using ::testing::Pair; using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; @@ -74,12 +76,12 @@ class RetransmissionQueueTest : public testing::Test { }), timer_(timer_manager_.CreateTimer( "test/t3_rtx", - []() { return absl::nullopt; }, - TimerOptions(options_.rto_initial))) {} + []() { return TimeDelta::Zero(); }, + TimerOptions(options_.rto_initial.ToTimeDelta()))) {} - std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk( + std::function<SendQueue::DataToSend(Timestamp, size_t)> CreateChunk( OutgoingMessageId message_id) { - return [this, message_id](TimeMs now, size_t max_size) { + return [this, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend(message_id, gen_.Ordered({1, 2, 3, 4}, "BE")); }; @@ -127,10 +129,10 @@ class RetransmissionQueueTest : public testing::Test { MockDcSctpSocketCallbacks callbacks_; DcSctpOptions options_; DataGenerator gen_; - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; - NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_; + NiceMock<MockFunction<void(TimeDelta rtt_ms)>> on_rtt_; NiceMock<MockFunction<void()>> on_clear_retransmission_counter_; NiceMock<MockSendQueue> producer_; std::unique_ptr<Timer> timer_; @@ -146,7 +148,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -159,7 +161,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -175,7 +177,7 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); @@ -198,7 +200,7 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -229,7 +231,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -241,7 +243,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); // Ack 12, 14-15, 17-18 @@ -262,7 +264,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); // Ack 12, 14-15, 17-19 @@ -274,7 +276,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); // Ack 12, 14-15, 17-20 @@ -321,16 +323,16 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); - static constexpr TimeMs kStartTime(100000); + static constexpr Timestamp kStartTime = Timestamp::Seconds(100); now_ = kStartTime; EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); // Ack 10, 12, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); @@ -342,22 +344,22 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Send 13 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(3))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13))); // Ack 10, 12-13, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); // Send 14 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14))); // Ack 10, 12-14, after 100 ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); @@ -383,11 +385,11 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Verify that the timer was really restarted when fast-retransmitting. The // timeout is `options_.rto_initial`, so advance the time just before that. - now_ += options_.rto_initial - DurationMs(1); + now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1); EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value()); // And ensure it really is running. - now_ += DurationMs(1); + now_ += TimeDelta::Millis(1); ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout, timeout_manager_.GetNextExpiredTimeout()); // An expired timeout has to be handled (asserts validate this). @@ -397,15 +399,15 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(1), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -420,11 +422,11 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -459,12 +461,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { RetransmissionQueue queue = CreateQueue(/*supports_partial_reliability=*/false); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -488,12 +490,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -529,12 +531,12 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(3); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -577,16 +579,16 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { static constexpr size_t kCwnd = 1200; queue.set_cwnd(kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); std::vector<uint8_t> payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1500); @@ -594,8 +596,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); - EXPECT_EQ(queue.outstanding_items(), 1u); + EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); + EXPECT_EQ(queue.unacked_items(), 1u); // Will force chunks to be retransmitted queue.HandleT3RtxTimerExpiry(); @@ -603,8 +605,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); std::vector<std::pair<TSN, Data>> chunks_to_rtx = queue.GetChunksToSend(now_, 1500); @@ -612,30 +614,30 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); - EXPECT_EQ(queue.outstanding_items(), 1u); + EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); + EXPECT_EQ(queue.unacked_items(), 1u); } TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -675,23 +677,23 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "E")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -729,7 +731,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(1); SendQueue::DataToSend dts(OutgoingMessageId(42), @@ -737,7 +739,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(2); SendQueue::DataToSend dts(OutgoingMessageId(43), @@ -745,7 +747,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(3); SendQueue::DataToSend dts(OutgoingMessageId(44), @@ -753,7 +755,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(4); SendQueue::DataToSend dts(OutgoingMessageId(45), @@ -761,7 +763,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -850,21 +852,21 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { TEST_F(RetransmissionQueueTest, MeasureRTT) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); - now_ = now_ + DurationMs(123); + now_ = now_ + TimeDelta::Millis(123); - EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1); + EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); } @@ -888,7 +890,7 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -918,7 +920,7 @@ TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -965,7 +967,7 @@ TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -995,14 +997,14 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the // magic numbers in this test. EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); std::vector<uint8_t> payload(183); return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); std::vector<uint8_t> payload(957); @@ -1018,23 +1020,23 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1046,8 +1048,8 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); - EXPECT_EQ(queue.outstanding_items(), 3u); + EXPECT_EQ(queue.unacked_bytes(), (16 + 4) * 3u); + EXPECT_EQ(queue.unacked_items(), 3u); // Mark the message as lost. EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); @@ -1060,21 +1062,21 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned))); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); // Now ACK those, one at a time. queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); } TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { @@ -1082,21 +1084,21 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { DataGeneratorOptions options; options.stream_id = StreamID(17); options.mid = MID(42); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 24); @@ -1104,7 +1106,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId)) .WillOnce(Return(true)); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); @@ -1118,38 +1120,38 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { RetransmissionQueue queue = CreateQueue(); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(42), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(43), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) // Stream reset - MID reset to zero again. - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44))) .WillOnce(Return(true)); @@ -1160,7 +1162,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0))))); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); EXPECT_THAT( @@ -1176,7 +1178,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; @@ -1184,7 +1186,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1246,7 +1248,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { // This is a fairly long test. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(2); return dts; @@ -1260,7 +1262,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1386,17 +1388,17 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { std::vector<uint8_t> payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1500); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); size_t serialized_size = payload.size() + DataChunk::kHeaderSize; - EXPECT_EQ(queue.outstanding_bytes(), serialized_size); + EXPECT_EQ(queue.unacked_bytes(), serialized_size); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); @@ -1414,12 +1416,12 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1433,7 +1435,7 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_TRUE(queue.can_send_data()); - EXPECT_EQ(queue.outstanding_bytes(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu); } @@ -1447,12 +1449,12 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1467,7 +1469,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); EXPECT_EQ( @@ -1490,7 +1492,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8)); EXPECT_EQ( queue.GetHandoverReadiness(), @@ -1503,7 +1505,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-18 @@ -1515,7 +1517,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-19 @@ -1527,7 +1529,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-20 @@ -1563,7 +1565,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2)); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); @@ -1574,7 +1576,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(*handedover_queue), testing::ElementsAre(TSN(12), TSN(13), TSN(14))); @@ -1592,27 +1594,27 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) { std::vector<uint8_t> payload(mtu - 100); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "B")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "E")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Produce all chunks and put them in the retransmission queue. std::vector<std::pair<TSN, Data>> chunks_to_send = diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc index 7d8fb9761c..8af77041a5 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc @@ -12,28 +12,29 @@ #include <algorithm> #include <cstdint> +#include "api/units/time_delta.h" #include "net/dcsctp/public/dcsctp_options.h" namespace dcsctp { RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options) - : min_rto_(*options.rto_min), - max_rto_(*options.rto_max), - max_rtt_(*options.rtt_max), + : min_rto_(options.rto_min.ToTimeDelta()), + max_rto_(options.rto_max.ToTimeDelta()), + max_rtt_(options.rtt_max.ToTimeDelta()), min_rtt_variance_(*options.min_rtt_variance), scaled_srtt_(*options.rto_initial << kRttShift), rto_(*options.rto_initial) {} -void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { - const int32_t rtt = *measured_rtt; - +void RetransmissionTimeout::ObserveRTT(webrtc::TimeDelta measured_rtt) { // Unrealistic values will be skipped. If a wrongly measured (or otherwise // corrupt) value was processed, it could change the state in a way that would // take a very long time to recover. - if (rtt < 0 || rtt > max_rtt_) { + if (measured_rtt < webrtc::TimeDelta::Zero() || measured_rtt > max_rtt_) { return; } + const int64_t rtt = measured_rtt.ms(); + // From https://tools.ietf.org/html/rfc4960#section-6.3.1, but avoiding // floating point math by implementing algorithm from "V. Jacobson: Congestion // avoidance and control", but adapted for SCTP. @@ -42,7 +43,7 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { scaled_rtt_var_ = (rtt / 2) << kRttVarShift; first_measurement_ = false; } else { - int32_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift); + int64_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift); scaled_srtt_ += rtt_diff; if (rtt_diff < 0) { rtt_diff = -rtt_diff; @@ -58,6 +59,6 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { rto_ = (scaled_srtt_ >> kRttShift) + scaled_rtt_var_; // Clamp RTO between min and max. - rto_ = std::min(std::max(rto_, min_rto_), max_rto_); + rto_ = std::min(std::max(rto_, min_rto_.ms()), max_rto_.ms()); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h index 01530cb3b5..b4b0fd7fef 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h @@ -32,27 +32,29 @@ class RetransmissionTimeout { explicit RetransmissionTimeout(const DcSctpOptions& options); // To be called when a RTT has been measured, to update the RTO value. - void ObserveRTT(DurationMs measured_rtt); + void ObserveRTT(webrtc::TimeDelta measured_rtt); // Returns the Retransmission Timeout (RTO) value, in milliseconds. - DurationMs rto() const { return DurationMs(rto_); } + webrtc::TimeDelta rto() const { return webrtc::TimeDelta::Millis(rto_); } // Returns the smoothed RTT value, in milliseconds. - DurationMs srtt() const { return DurationMs(scaled_srtt_ >> kRttShift); } + webrtc::TimeDelta srtt() const { + return webrtc::TimeDelta::Millis(scaled_srtt_ >> kRttShift); + } private: - const int32_t min_rto_; - const int32_t max_rto_; - const int32_t max_rtt_; - const int32_t min_rtt_variance_; + const webrtc::TimeDelta min_rto_; + const webrtc::TimeDelta max_rto_; + const webrtc::TimeDelta max_rtt_; + const int64_t min_rtt_variance_; // If this is the first measurement bool first_measurement_ = true; // Smoothed Round-Trip Time, shifted by kRttShift - int32_t scaled_srtt_; + int64_t scaled_srtt_; // Round-Trip Time Variation, shifted by kRttVarShift - int32_t scaled_rtt_var_ = 0; + int64_t scaled_rtt_var_ = 0; // Retransmission Timeout - int32_t rto_; + int64_t rto_; }; } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc index b901995e97..7754578f32 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc @@ -15,20 +15,21 @@ namespace dcsctp { namespace { +using ::webrtc::TimeDelta; -constexpr DurationMs kMaxRtt = DurationMs(8'000); -constexpr DurationMs kInitialRto = DurationMs(200); -constexpr DurationMs kMaxRto = DurationMs(800); -constexpr DurationMs kMinRto = DurationMs(120); -constexpr DurationMs kMinRttVariance = DurationMs(220); +constexpr TimeDelta kMaxRtt = TimeDelta::Millis(8'000); +constexpr TimeDelta kInitialRto = TimeDelta::Millis(200); +constexpr TimeDelta kMaxRto = TimeDelta::Millis(800); +constexpr TimeDelta kMinRto = TimeDelta::Millis(120); +constexpr TimeDelta kMinRttVariance = TimeDelta::Millis(220); DcSctpOptions MakeOptions() { DcSctpOptions options; - options.rtt_max = kMaxRtt; - options.rto_initial = kInitialRto; - options.rto_max = kMaxRto; - options.rto_min = kMinRto; - options.min_rtt_variance = kMinRttVariance; + options.rtt_max = DurationMs(kMaxRtt); + options.rto_initial = DurationMs(kInitialRto); + options.rto_max = DurationMs(kMaxRto); + options.rto_min = DurationMs(kMinRto); + options.min_rtt_variance = DurationMs(kMinRttVariance); return options; } @@ -45,31 +46,31 @@ TEST(RetransmissionTimeoutTest, HasValidInitialSrtt) { TEST(RetransmissionTimeoutTest, NegativeValuesDoNotAffectRTO) { RetransmissionTimeout rto_(MakeOptions()); // Initial negative value - rto_.ObserveRTT(DurationMs(-10)); + rto_.ObserveRTT(TimeDelta::Millis(-10)); EXPECT_EQ(rto_.rto(), kInitialRto); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); // Subsequent negative value - rto_.ObserveRTT(DurationMs(-10)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(-10)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); } TEST(RetransmissionTimeoutTest, TooLargeValuesDoNotAffectRTO) { RetransmissionTimeout rto_(MakeOptions()); // Initial too large value - rto_.ObserveRTT(kMaxRtt + DurationMs(100)); + rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100)); EXPECT_EQ(rto_.rto(), kInitialRto); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); // Subsequent too large value - rto_.ObserveRTT(kMaxRtt + DurationMs(100)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); } TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(1)); + rto_.ObserveRTT(TimeDelta::Millis(1)); } EXPECT_GE(rto_.rto(), kMinRto); } @@ -77,67 +78,67 @@ TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) { TEST(RetransmissionTimeoutTest, WillNeverGoAboveMaximumRto) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(kMaxRtt - DurationMs(1)); + rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(1)); // Adding jitter, which would make it RTO be well above RTT. - rto_.ObserveRTT(kMaxRtt - DurationMs(100)); + rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(100)); } EXPECT_LE(rto_.rto(), kMaxRto); } TEST(RetransmissionTimeoutTest, CalculatesRtoForStableRtt) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(128)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(123)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(125)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(127)); - EXPECT_EQ(*rto_.rto(), 344); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(128)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(123)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(125)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(127)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); } TEST(RetransmissionTimeoutTest, CalculatesRtoForUnstableRtt) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(402)); - EXPECT_EQ(*rto_.rto(), 622); - rto_.ObserveRTT(DurationMs(728)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(89)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(126)); - EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(402)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(622)); + rto_.ObserveRTT(TimeDelta::Millis(728)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(89)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(126)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); } TEST(RetransmissionTimeoutTest, WillStabilizeAfterAWhile) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - rto_.ObserveRTT(DurationMs(402)); - rto_.ObserveRTT(DurationMs(728)); - rto_.ObserveRTT(DurationMs(89)); - rto_.ObserveRTT(DurationMs(126)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(122)); - EXPECT_EQ(*rto_.rto(), 710); - rto_.ObserveRTT(DurationMs(123)); - EXPECT_EQ(*rto_.rto(), 631); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 562); - rto_.ObserveRTT(DurationMs(122)); - EXPECT_EQ(*rto_.rto(), 505); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 454); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 410); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 367); + rto_.ObserveRTT(TimeDelta::Millis(124)); + rto_.ObserveRTT(TimeDelta::Millis(402)); + rto_.ObserveRTT(TimeDelta::Millis(728)); + rto_.ObserveRTT(TimeDelta::Millis(89)); + rto_.ObserveRTT(TimeDelta::Millis(126)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(122)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(710)); + rto_.ObserveRTT(TimeDelta::Millis(123)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(631)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(562)); + rto_.ObserveRTT(TimeDelta::Millis(122)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(505)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(454)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(410)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(367)); } TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) { @@ -149,31 +150,33 @@ TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 344); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); } TEST(RetransmissionTimeoutTest, CanSpecifySmallerMinimumRttVariance) { DcSctpOptions options = MakeOptions(); - options.min_rtt_variance = kMinRttVariance - DurationMs(100); + options.min_rtt_variance = + DurationMs(kMinRttVariance - TimeDelta::Millis(100)); RetransmissionTimeout rto_(options); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 244); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(244)); } TEST(RetransmissionTimeoutTest, CanSpecifyLargerMinimumRttVariance) { DcSctpOptions options = MakeOptions(); - options.min_rtt_variance = kMinRttVariance + DurationMs(100); + options.min_rtt_variance = + DurationMs(kMinRttVariance + TimeDelta::Millis(100)); RetransmissionTimeout rto_(options); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 444); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(444)); } } // namespace diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc index facb432c59..7cbead296c 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc @@ -21,15 +21,17 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/common/internal_types.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, @@ -137,7 +139,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, } absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce( - TimeMs now, + Timestamp now, size_t max_size) { RTC_DCHECK(pause_state_ != PauseState::kPaused && pause_state_ != PauseState::kResetting); @@ -349,7 +351,7 @@ bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { return items_.front().mid.has_value(); } -void RRSendQueue::Add(TimeMs now, +void RRSendQueue::Add(Timestamp now, DcSctpMessage message, const SendOptions& send_options) { RTC_DCHECK(!message.payload().empty()); @@ -366,8 +368,9 @@ void RRSendQueue::Add(TimeMs now, ? MaxRetransmits(send_options.max_retransmissions.value()) : MaxRetransmits::NoLimit(), .expires_at = send_options.lifetime.has_value() - ? now + *send_options.lifetime + DurationMs(1) - : TimeMs::InfiniteFuture(), + ? now + send_options.lifetime->ToTimeDelta() + + TimeDelta::Millis(1) + : Timestamp::PlusInfinity(), .lifecycle_id = send_options.lifecycle_id, }; GetOrCreateStreamInfo(message.stream_id()) @@ -383,7 +386,7 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } -absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now, +absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(Timestamp now, size_t max_size) { return scheduler_.Produce(now, max_size); } diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h index bef5fe437d..b6c359dc1e 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h @@ -71,12 +71,13 @@ class RRSendQueue : public SendQueue { // time should be in `now`. Note that it's the responsibility of the caller to // ensure that the buffer is not full (by calling `IsFull`) before adding // messages to it. - void Add(TimeMs now, + void Add(webrtc::Timestamp now, DcSctpMessage message, const SendOptions& send_options = {}); // Implementation of `SendQueue`. - absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override; + absl::optional<DataToSend> Produce(webrtc::Timestamp now, + size_t max_size) override; bool Discard(StreamID stream_id, OutgoingMessageId message_id) override; void PrepareResetStream(StreamID streams) override; bool HasStreamsReadyToBeReset() const override; @@ -104,7 +105,7 @@ class RRSendQueue : public SendQueue { struct MessageAttributes { IsUnordered unordered; MaxRetransmits max_retransmissions; - TimeMs expires_at; + webrtc::Timestamp expires_at; LifecycleId lifecycle_id; }; @@ -154,7 +155,7 @@ class RRSendQueue : public SendQueue { void Add(DcSctpMessage message, MessageAttributes attributes); // Implementing `StreamScheduler::StreamProducer`. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, size_t max_size) override; size_t bytes_to_send_in_next_message() const override; @@ -265,7 +266,7 @@ class RRSendQueue : public SendQueue { OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); absl::optional<DataToSend> Produce( std::map<StreamID, OutgoingStream>::iterator it, - TimeMs now, + webrtc::Timestamp now, size_t max_size); const absl::string_view log_prefix_; diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc index 9d6da7bdff..632cd8fc19 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc @@ -29,8 +29,10 @@ namespace dcsctp { namespace { using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow = TimeMs(0); +constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); constexpr size_t kMaxQueueSize = 1000; @@ -181,9 +183,9 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { std::vector<uint8_t> payload(20); // Default is no expiry - TimeMs now = kNow; + Timestamp now = kNow; buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); SendOptions expires_2_seconds; @@ -191,17 +193,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { // Add and consume within lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2000); + now += TimeDelta::Millis(2000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); // Add and consume just outside lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // A long time after expiry buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // Expire one message, but produce the second that is not expired. @@ -211,7 +213,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { expires_4_seconds.lifetime = DurationMs(4000); buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); @@ -846,8 +848,9 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) { EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize) - .has_value()); + EXPECT_FALSE( + buf_.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize) + .has_value()); } TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) { diff --git a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h index 48eaefaf6a..d0d834c901 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h @@ -17,6 +17,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/types.h" @@ -37,7 +38,7 @@ class SendQueue { // Partial reliability - RFC3758 MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(); - TimeMs expires_at = TimeMs::InfiniteFuture(); + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(); // Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for // all other fragments. @@ -55,7 +56,8 @@ class SendQueue { // // `max_size` refers to how many payload bytes that may be produced, not // including any headers. - virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0; + virtual absl::optional<DataToSend> Produce(webrtc::Timestamp now, + size_t max_size) = 0; // Discards a partially sent message identified by the parameters // `stream_id` and `message_id`. The `message_id` comes from the returned diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc index c1d220aaa2..66c4457481 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc @@ -14,7 +14,6 @@ #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" @@ -22,6 +21,7 @@ #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { @@ -31,7 +31,7 @@ void StreamScheduler::Stream::SetPriority(StreamPriority priority) { } absl::optional<SendQueue::DataToSend> StreamScheduler::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { // For non-interleaved streams, avoid rescheduling while still sending a // message as it needs to be sent in full. For interleaved messaging, @@ -127,7 +127,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( } absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size); diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h index ce836a5826..9d76fc6f56 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h @@ -87,7 +87,7 @@ class StreamScheduler { // The parameter `max_size` specifies the maximum amount of actual payload // that may be returned. If these constraints prevents the stream from // sending some data, `absl::nullopt` should be returned. - virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now, + virtual absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, size_t max_size) = 0; // Returns the number of payload bytes that is scheduled to be sent in the @@ -132,7 +132,8 @@ class StreamScheduler { // Produces a message from this stream. This will only be called on streams // that have data. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, + size_t max_size); void MakeActive(size_t bytes_to_send_next); void ForceMarkInactive(); @@ -180,7 +181,8 @@ class StreamScheduler { // `now` and will be used to skip chunks with expired limited lifetime. The // parameter `max_size` specifies the maximum amount of actual payload that // may be returned. If no data can be produced, `absl::nullopt` is returned. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, + size_t max_size); std::set<StreamID> ActiveStreamsForTesting() const; diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc index 4f5fb0fb84..42d0b3cd35 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc @@ -19,9 +19,11 @@ namespace dcsctp { namespace { using ::testing::Return; using ::testing::StrictMock; +using ::webrtc::Timestamp; constexpr size_t kMtu = 1000; constexpr size_t kPayloadSize = 4; +constexpr Timestamp kNow = Timestamp::Zero(); MATCHER_P(HasDataWithMid, mid, "") { if (!arg.has_value()) { @@ -38,12 +40,12 @@ MATCHER_P(HasDataWithMid, mid, "") { return true; } -std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)> +std::function<absl::optional<SendQueue::DataToSend>(Timestamp, size_t)> CreateChunk(OutgoingMessageId message_id, StreamID sid, MID mid, size_t payload_size = kPayloadSize) { - return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) { + return [sid, mid, payload_size, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend( message_id, Data(sid, SSN(0), mid, FSN(0), PPID(42), @@ -56,8 +58,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler, size_t packets_to_generate) { std::map<StreamID, size_t> packet_counts; for (size_t i = 0; i < packets_to_generate; ++i) { - absl::optional<SendQueue::DataToSend> data = - scheduler.Produce(TimeMs(0), kMtu); + absl::optional<SendQueue::DataToSend> data = scheduler.Produce(kNow, kMtu); if (data.has_value()) { ++packet_counts[data->data.stream_id]; } @@ -69,7 +70,7 @@ class MockStreamProducer : public StreamScheduler::StreamProducer { public: MOCK_METHOD(absl::optional<SendQueue::DataToSend>, Produce, - (TimeMs, size_t), + (Timestamp, size_t), (override)); MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); }; @@ -100,7 +101,7 @@ class TestStream { TEST(StreamSchedulerTest, HasNoActiveStreams) { StreamScheduler scheduler("", kMtu); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Stream properties can be set and retrieved @@ -132,8 +133,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); stream->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet. @@ -168,13 +169,13 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet, but keeps producing from the @@ -232,15 +233,15 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Deactivates a stream before it has finished producing all packets. @@ -259,12 +260,12 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // ... but the stream is made inactive before it can be produced. stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Resumes a paused stream - makes a stream active after inactivating it. @@ -287,14 +288,14 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Iterates between streams, where one is suddenly paused and later resumed. @@ -330,15 +331,15 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); stream1->MakeInactive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Verifies that packet counts are evenly distributed in round robin scheduling. @@ -427,18 +428,18 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { stream2->MaybeMakeActive(); // t = 30 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t = 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t = 70 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t = 90 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); // t = 140 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t = 210 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority. @@ -492,24 +493,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { stream3->MaybeMakeActive(); // t ~= 20 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 40 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 50 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 80 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 100 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 150 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 160 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 240 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority @@ -586,24 +587,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { stream3->MaybeMakeActive(); // t ~= 400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 1400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 2500 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 2800 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 4000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 5600 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 6000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 7000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 11200 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { // A simple test with two streams of different priority, but sending packets @@ -723,11 +724,11 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Sending large messages with large MTU will not fragment messages and will @@ -756,9 +757,9 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } } // namespace |