summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/tx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 01:14:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 01:14:29 +0000
commitfbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 (patch)
tree4c1ccaf5486d4f2009f9a338a98a83e886e29c97 /third_party/libwebrtc/net/dcsctp/tx
parentReleasing progress-linux version 124.0.1-1~progress7.99u1. (diff)
downloadfirefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.tar.xz
firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.zip
Merging upstream version 125.0.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx')
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/BUILD.gn12
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h10
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc261
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h67
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc58
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc69
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h22
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc248
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc19
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h22
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc153
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc15
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h11
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc21
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/send_queue.h6
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc6
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h8
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc149
18 files changed, 605 insertions, 552 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn
index 5547ffa870..d1fd8ab3d5 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn
+++ b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn
@@ -11,6 +11,7 @@ import("../../../webrtc.gni")
rtc_source_set("send_queue") {
deps = [
"../../../api:array_view",
+ "../../../api/units:timestamp",
"../common:internal_types",
"../packet:chunk",
"../packet:data",
@@ -28,9 +29,9 @@ rtc_library("rr_send_queue") {
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base:stringutils",
"../../../rtc_base/containers:flat_map",
"../common:internal_types",
- "../common:str_join",
"../packet:data",
"../public:socket",
"../public:types",
@@ -53,9 +54,9 @@ rtc_library("stream_scheduler") {
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base:stringutils",
"../../../rtc_base:strong_alias",
"../../../rtc_base/containers:flat_set",
- "../common:str_join",
"../packet:chunk",
"../packet:data",
"../packet:sctp_packet",
@@ -89,6 +90,7 @@ rtc_library("retransmission_error_counter") {
rtc_library("retransmission_timeout") {
deps = [
+ "../../../api/units:time_delta",
"../../../rtc_base:checks",
"../public:types",
]
@@ -103,13 +105,15 @@ rtc_library("outstanding_data") {
":retransmission_timeout",
":send_queue",
"../../../api:array_view",
+ "../../../api/units:time_delta",
+ "../../../api/units:timestamp",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base:stringutils",
"../../../rtc_base/containers:flat_set",
"../common:internal_types",
"../common:math",
"../common:sequence_numbers",
- "../common:str_join",
"../packet:chunk",
"../packet:data",
"../public:socket",
@@ -138,7 +142,6 @@ rtc_library("retransmission_queue") {
"../../../rtc_base:stringutils",
"../common:math",
"../common:sequence_numbers",
- "../common:str_join",
"../packet:chunk",
"../packet:data",
"../public:socket",
@@ -162,6 +165,7 @@ if (rtc_include_tests) {
deps = [
":send_queue",
"../../../api:array_view",
+ "../../../api/units:timestamp",
"../../../test:test_support",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
diff --git a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h
index 04921866ae..3511403eab 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h
@@ -15,6 +15,7 @@
#include "absl/types/optional.h"
#include "api/array_view.h"
+#include "api/units/timestamp.h"
#include "net/dcsctp/tx/send_queue.h"
#include "test/gmock.h"
@@ -23,14 +24,15 @@ namespace dcsctp {
class MockSendQueue : public SendQueue {
public:
MockSendQueue() {
- ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) {
- return absl::nullopt;
- });
+ ON_CALL(*this, Produce)
+ .WillByDefault([](webrtc::Timestamp now, size_t max_size) {
+ return absl::nullopt;
+ });
}
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
- (TimeMs now, size_t max_size),
+ (webrtc::Timestamp now, size_t max_size),
(override));
MOCK_METHOD(bool,
Discard,
diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
index c2706bd0d2..ca639abc54 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc
@@ -14,12 +14,16 @@
#include <utility>
#include <vector>
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/public/types.h"
+#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace dcsctp {
+using ::webrtc::Timestamp;
// The number of times a packet must be NACKed before it's retransmitted.
// See https://tools.ietf.org/html/rfc4960#section-7.2.4
@@ -63,18 +67,18 @@ void OutstandingData::Item::MarkAsRetransmitted() {
}
void OutstandingData::Item::Abandon() {
- RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() ||
+ RTC_DCHECK(!expires_at_.IsPlusInfinity() ||
max_retransmissions_ != MaxRetransmits::NoLimit());
lifecycle_ = Lifecycle::kAbandoned;
}
-bool OutstandingData::Item::has_expired(TimeMs now) const {
+bool OutstandingData::Item::has_expired(Timestamp now) const {
return expires_at_ <= now;
}
bool OutstandingData::IsConsistent() const {
- size_t actual_outstanding_bytes = 0;
- size_t actual_outstanding_items = 0;
+ size_t actual_unacked_bytes = 0;
+ size_t actual_unacked_items = 0;
std::set<UnwrappedTSN> combined_to_be_retransmitted;
combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(),
@@ -83,10 +87,12 @@ bool OutstandingData::IsConsistent() const {
to_be_fast_retransmitted_.end());
std::set<UnwrappedTSN> actual_combined_to_be_retransmitted;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (item.is_outstanding()) {
- actual_outstanding_bytes += GetSerializedChunkSize(item.data());
- ++actual_outstanding_items;
+ actual_unacked_bytes += GetSerializedChunkSize(item.data());
+ ++actual_unacked_items;
}
if (item.should_be_retransmitted()) {
@@ -94,33 +100,28 @@ bool OutstandingData::IsConsistent() const {
}
}
- if (outstanding_data_.empty() &&
- next_tsn_ != last_cumulative_tsn_ack_.next_value()) {
- return false;
- }
-
- return actual_outstanding_bytes == outstanding_bytes_ &&
- actual_outstanding_items == outstanding_items_ &&
+ return actual_unacked_bytes == unacked_bytes_ &&
+ actual_unacked_items == unacked_items_ &&
actual_combined_to_be_retransmitted == combined_to_be_retransmitted;
}
void OutstandingData::AckChunk(AckInfo& ack_info,
- std::map<UnwrappedTSN, Item>::iterator iter) {
- if (!iter->second.is_acked()) {
- size_t serialized_size = GetSerializedChunkSize(iter->second.data());
+ UnwrappedTSN tsn,
+ Item& item) {
+ if (!item.is_acked()) {
+ size_t serialized_size = GetSerializedChunkSize(item.data());
ack_info.bytes_acked += serialized_size;
- if (iter->second.is_outstanding()) {
- outstanding_bytes_ -= serialized_size;
- --outstanding_items_;
+ if (item.is_outstanding()) {
+ unacked_bytes_ -= serialized_size;
+ --unacked_items_;
}
- if (iter->second.should_be_retransmitted()) {
- RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) ==
+ if (item.should_be_retransmitted()) {
+ RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) ==
to_be_fast_retransmitted_.end());
- to_be_retransmitted_.erase(iter->first);
+ to_be_retransmitted_.erase(tsn);
}
- iter->second.Ack();
- ack_info.highest_tsn_acked =
- std::max(ack_info.highest_tsn_acked, iter->first);
+ item.Ack();
+ ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn);
}
}
@@ -143,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack(
return ack_info;
}
+OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) {
+ RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
+ RTC_DCHECK(tsn < next_tsn());
+ int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
+ RTC_DCHECK(index >= 0);
+ RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
+ return outstanding_data_[index];
+}
+
+const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const {
+ RTC_DCHECK(tsn > last_cumulative_tsn_ack_);
+ RTC_DCHECK(tsn < next_tsn());
+ int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1;
+ RTC_DCHECK(index >= 0);
+ RTC_DCHECK(index < static_cast<int>(outstanding_data_.size()));
+ return outstanding_data_[index];
+}
+
void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
AckInfo& ack_info) {
- auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack);
-
- for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) {
- AckChunk(ack_info, iter);
- if (iter->second.lifecycle_id().IsSet()) {
- RTC_DCHECK(iter->second.data().is_end);
- if (iter->second.is_abandoned()) {
- ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id());
+ while (!outstanding_data_.empty() &&
+ last_cumulative_tsn_ack_ < cumulative_tsn_ack) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value();
+ Item& item = outstanding_data_.front();
+ AckChunk(ack_info, tsn, item);
+ if (item.lifecycle_id().IsSet()) {
+ RTC_DCHECK(item.data().is_end);
+ if (item.is_abandoned()) {
+ ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id());
} else {
- ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id());
+ ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id());
}
}
+ outstanding_data_.pop_front();
+ last_cumulative_tsn_ack_.Increment();
}
- outstanding_data_.erase(outstanding_data_.begin(), first_unacked);
- last_cumulative_tsn_ack_ = cumulative_tsn_ack;
stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(),
stream_reset_breakpoint_tsns_.upper_bound(
cumulative_tsn_ack.next_value()));
@@ -176,12 +196,13 @@ void OutstandingData::AckGapBlocks(
// handled differently.
for (auto& block : gap_ack_blocks) {
- auto start = outstanding_data_.lower_bound(
- UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start));
- auto end = outstanding_data_.upper_bound(
- UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end));
- for (auto iter = start; iter != end; ++iter) {
- AckChunk(ack_info, iter);
+ UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
+ UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
+ for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) {
+ if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
+ Item& item = GetItem(tsn);
+ AckChunk(ack_info, tsn, item);
+ }
}
}
}
@@ -216,13 +237,12 @@ void OutstandingData::NackBetweenAckBlocks(
for (auto& block : gap_ack_blocks) {
UnwrappedTSN cur_block_first_acked =
UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start);
- for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
- iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
- if (iter->first <= max_tsn_to_nack) {
- ack_info.has_packet_loss |=
- NackItem(iter->first, iter->second, /*retransmit_now=*/false,
- /*do_fast_retransmit=*/!is_in_fast_recovery);
- }
+ for (UnwrappedTSN tsn = prev_block_last_acked.next_value();
+ tsn < cur_block_first_acked && tsn <= max_tsn_to_nack;
+ tsn = tsn.next_value()) {
+ ack_info.has_packet_loss |=
+ NackItem(tsn, /*retransmit_now=*/false,
+ /*do_fast_retransmit=*/!is_in_fast_recovery);
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
}
@@ -234,12 +254,12 @@ void OutstandingData::NackBetweenAckBlocks(
}
bool OutstandingData::NackItem(UnwrappedTSN tsn,
- Item& item,
bool retransmit_now,
bool do_fast_retransmit) {
+ Item& item = GetItem(tsn);
if (item.is_outstanding()) {
- outstanding_bytes_ -= GetSerializedChunkSize(item.data());
- --outstanding_items_;
+ unacked_bytes_ -= GetSerializedChunkSize(item.data());
+ --unacked_items_;
}
switch (item.Nack(retransmit_now)) {
@@ -272,28 +292,25 @@ void OutstandingData::AbandonAllFor(const Item& item) {
// skipped over). So create a new fragment, representing the end, that the
// received will never see as it is abandoned immediately and used as cum
// TSN in the sent FORWARD-TSN.
- UnwrappedTSN tsn = next_tsn_;
- next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn, item.data().mid,
item.data().fsn, item.data().ppid, std::vector<uint8_t>(),
Data::IsBeginning(false), Data::IsEnd(true),
item.data().is_unordered);
- Item& added_item =
- outstanding_data_
- .emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
- std::forward_as_tuple(
- item.message_id(), std::move(message_end), TimeMs(0),
- MaxRetransmits(0), TimeMs::InfiniteFuture(),
- LifecycleId::NotSet()))
- .first->second;
- // The added chunk shouldn't be included in `outstanding_bytes`, so set it
+ UnwrappedTSN tsn = next_tsn();
+ Item& added_item = outstanding_data_.emplace_back(
+ item.message_id(), std::move(message_end), Timestamp::Zero(),
+ MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet());
+
+ // The added chunk shouldn't be included in `unacked_bytes`, so set it
// as acked.
added_item.Ack();
RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn="
<< *tsn.Wrap();
}
- for (auto& [tsn, other] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (Item& other : outstanding_data_) {
+ tsn.Increment();
if (!other.is_abandoned() &&
other.data().stream_id == item.data().stream_id &&
other.message_id() == item.message_id()) {
@@ -315,9 +332,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
for (auto it = chunks.begin(); it != chunks.end();) {
UnwrappedTSN tsn = *it;
- auto elem = outstanding_data_.find(tsn);
- RTC_DCHECK(elem != outstanding_data_.end());
- Item& item = elem->second;
+ Item& item = GetItem(tsn);
RTC_DCHECK(item.should_be_retransmitted());
RTC_DCHECK(!item.is_outstanding());
RTC_DCHECK(!item.is_abandoned());
@@ -328,8 +343,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
item.MarkAsRetransmitted();
result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size;
- outstanding_bytes_ += serialized_size;
- ++outstanding_items_;
+ unacked_bytes_ += serialized_size;
+ ++unacked_items_;
it = chunks.erase(it);
} else {
++it;
@@ -370,8 +385,10 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
return ExtractChunksThatCanFit(to_be_retransmitted_, max_size);
}
-void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
- for (const auto& [tsn, item] : outstanding_data_) {
+void OutstandingData::ExpireOutstandingChunks(Timestamp now) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
// Chunks that are nacked can be expired. Care should be taken not to expire
// unacked (in-flight) chunks as they might have been received, but the SACK
// is either delayed or in-flight and may be received later.
@@ -391,38 +408,33 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
}
UnwrappedTSN OutstandingData::highest_outstanding_tsn() const {
- return outstanding_data_.empty() ? last_cumulative_tsn_ack_
- : outstanding_data_.rbegin()->first;
+ return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_,
+ outstanding_data_.size());
}
absl::optional<UnwrappedTSN> OutstandingData::Insert(
OutgoingMessageId message_id,
const Data& data,
- TimeMs time_sent,
+ Timestamp time_sent,
MaxRetransmits max_retransmissions,
- TimeMs expires_at,
+ Timestamp expires_at,
LifecycleId lifecycle_id) {
- UnwrappedTSN tsn = next_tsn_;
- next_tsn_.Increment();
-
// All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(data);
- outstanding_bytes_ += chunk_size;
- ++outstanding_items_;
- auto it = outstanding_data_
- .emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
- std::forward_as_tuple(message_id, data.Clone(),
- time_sent, max_retransmissions,
- expires_at, lifecycle_id))
- .first;
-
- if (it->second.has_expired(time_sent)) {
+ unacked_bytes_ += chunk_size;
+ ++unacked_items_;
+ UnwrappedTSN tsn = next_tsn();
+ Item& item = outstanding_data_.emplace_back(message_id, data.Clone(),
+ time_sent, max_retransmissions,
+ expires_at, lifecycle_id);
+
+ if (item.has_expired(time_sent)) {
// No need to send it - it was expired when it was in the send
// queue.
- RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk "
- << *it->first.Wrap() << " and message "
- << *it->second.data().mid << " as expired";
- AbandonAllFor(it->second);
+ RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap()
+ << " and message " << *item.data().mid
+ << " as expired";
+ AbandonAllFor(item);
RTC_DCHECK(IsConsistent());
return absl::nullopt;
}
@@ -432,34 +444,47 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
}
void OutstandingData::NackAll() {
- for (auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ // A two-pass algorithm is needed, as NackItem will invalidate iterators.
+ std::vector<UnwrappedTSN> tsns_to_nack;
+ for (Item& item : outstanding_data_) {
+ tsn.Increment();
if (!item.is_acked()) {
- NackItem(tsn, item, /*retransmit_now=*/true,
- /*do_fast_retransmit=*/false);
+ tsns_to_nack.push_back(tsn);
}
}
+
+ for (UnwrappedTSN tsn : tsns_to_nack) {
+ NackItem(tsn, /*retransmit_now=*/true,
+ /*do_fast_retransmit=*/false);
+ }
+
RTC_DCHECK(IsConsistent());
}
-absl::optional<DurationMs> OutstandingData::MeasureRTT(TimeMs now,
- UnwrappedTSN tsn) const {
- auto it = outstanding_data_.find(tsn);
- if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) {
- // https://tools.ietf.org/html/rfc4960#section-6.3.1
- // "Karn's algorithm: RTT measurements MUST NOT be made using
- // packets that were retransmitted (and thus for which it is ambiguous
- // whether the reply was for the first instance of the chunk or for a
- // later instance)"
- return now - it->second.time_sent();
+webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now,
+ UnwrappedTSN tsn) const {
+ if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) {
+ const Item& item = GetItem(tsn);
+ if (!item.has_been_retransmitted()) {
+ // https://tools.ietf.org/html/rfc4960#section-6.3.1
+ // "Karn's algorithm: RTT measurements MUST NOT be made using
+ // packets that were retransmitted (and thus for which it is ambiguous
+ // whether the reply was for the first instance of the chunk or for a
+ // later instance)"
+ return now - item.time_sent();
+ }
}
- return absl::nullopt;
+ return webrtc::TimeDelta::PlusInfinity();
}
std::vector<std::pair<TSN, OutstandingData::State>>
OutstandingData::GetChunkStatesForTesting() const {
std::vector<std::pair<TSN, State>> states;
states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked);
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
State state;
if (item.is_abandoned()) {
state = State::kAbandoned;
@@ -480,9 +505,7 @@ OutstandingData::GetChunkStatesForTesting() const {
bool OutstandingData::ShouldSendForwardTsn() const {
if (!outstanding_data_.empty()) {
- auto it = outstanding_data_.begin();
- return it->first == last_cumulative_tsn_ack_.next_value() &&
- it->second.is_abandoned();
+ return outstanding_data_.front().is_abandoned();
}
return false;
}
@@ -491,7 +514,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const {
std::map<StreamID, SSN> skipped_per_ordered_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
@@ -515,7 +540,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream;
UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_;
- for (const auto& [tsn, item] : outstanding_data_) {
+ UnwrappedTSN tsn = last_cumulative_tsn_ack_;
+ for (const Item& item : outstanding_data_) {
+ tsn.Increment();
if (stream_reset_breakpoint_tsns_.contains(tsn) ||
(tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) {
break;
@@ -539,16 +566,12 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const {
std::move(skipped_streams));
}
-void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn,
- UnwrappedTSN last_cumulative_tsn) {
+void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) {
RTC_DCHECK(outstanding_data_.empty());
- RTC_DCHECK(next_tsn_ == last_cumulative_tsn_ack_.next_value());
- RTC_DCHECK(next_tsn == last_cumulative_tsn.next_value());
- next_tsn_ = next_tsn;
last_cumulative_tsn_ack_ = last_cumulative_tsn;
}
void OutstandingData::BeginResetStreams() {
- stream_reset_breakpoint_tsns_.insert(next_tsn_);
+ stream_reset_breakpoint_tsns_.insert(next_tsn());
}
} // namespace dcsctp
diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h
index f8e939661d..2a214975e6 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h
@@ -10,12 +10,14 @@
#ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_
#define NET_DCSCTP_TX_OUTSTANDING_DATA_H_
+#include <deque>
#include <map>
#include <set>
#include <utility>
#include <vector>
#include "absl/types/optional.h"
+#include "api/units/timestamp.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
@@ -29,6 +31,9 @@ namespace dcsctp {
// This class keeps track of outstanding data chunks (sent, not yet acked) and
// handles acking, nacking, rescheduling and abandoning.
+//
+// Items are added to this queue as they are sent and will be removed when the
+// peer acks them using the cumulative TSN ack.
class OutstandingData {
public:
// State for DATA chunks (message fragments) in the queue - used in tests.
@@ -74,11 +79,9 @@ class OutstandingData {
OutstandingData(
size_t data_chunk_header_size,
- UnwrappedTSN next_tsn,
UnwrappedTSN last_cumulative_tsn_ack,
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue)
: data_chunk_header_size_(data_chunk_header_size),
- next_tsn_(next_tsn),
last_cumulative_tsn_ack_(last_cumulative_tsn_ack),
discard_from_send_queue_(std::move(discard_from_send_queue)) {}
@@ -98,14 +101,14 @@ class OutstandingData {
// it?
std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size);
- size_t outstanding_bytes() const { return outstanding_bytes_; }
+ size_t unacked_bytes() const { return unacked_bytes_; }
- // Returns the number of DATA chunks that are in-flight.
- size_t outstanding_items() const { return outstanding_items_; }
+ // Returns the number of DATA chunks that are in-flight (not acked or nacked).
+ size_t unacked_items() const { return unacked_items_; }
// Given the current time `now_ms`, expire and abandon outstanding (sent at
// least once) chunks that have a limited lifetime.
- void ExpireOutstandingChunks(TimeMs now);
+ void ExpireOutstandingChunks(webrtc::Timestamp now);
bool empty() const { return outstanding_data_.empty(); }
@@ -121,7 +124,9 @@ class OutstandingData {
return last_cumulative_tsn_ack_;
}
- UnwrappedTSN next_tsn() const { return next_tsn_; }
+ UnwrappedTSN next_tsn() const {
+ return highest_outstanding_tsn().next_value();
+ }
UnwrappedTSN highest_outstanding_tsn() const;
@@ -131,9 +136,9 @@ class OutstandingData {
absl::optional<UnwrappedTSN> Insert(
OutgoingMessageId message_id,
const Data& data,
- TimeMs time_sent,
+ webrtc::Timestamp time_sent,
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(),
- TimeMs expires_at = TimeMs::InfiniteFuture(),
+ webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(),
LifecycleId lifecycle_id = LifecycleId::NotSet());
// Nacks all outstanding data.
@@ -147,8 +152,8 @@ class OutstandingData {
// Given the current time and a TSN, it returns the measured RTT between when
// the chunk was sent and now. It takes into acccount Karn's algorithm, so if
- // the chunk has ever been retransmitted, it will return absl::nullopt.
- absl::optional<DurationMs> MeasureRTT(TimeMs now, UnwrappedTSN tsn) const;
+ // the chunk has ever been retransmitted, it will return `PlusInfinity()`.
+ webrtc::TimeDelta MeasureRTT(webrtc::Timestamp now, UnwrappedTSN tsn) const;
// Returns the internal state of all queued chunks. This is only used in
// unit-tests.
@@ -159,8 +164,7 @@ class OutstandingData {
bool ShouldSendForwardTsn() const;
// Sets the next TSN to be used. This is used in handover.
- void ResetSequenceNumbers(UnwrappedTSN next_tsn,
- UnwrappedTSN last_cumulative_tsn);
+ void ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn);
// Called when an outgoing stream reset is sent, marking the last assigned TSN
// as a breakpoint that a FORWARD-TSN shouldn't cross.
@@ -179,9 +183,9 @@ class OutstandingData {
Item(OutgoingMessageId message_id,
Data data,
- TimeMs time_sent,
+ webrtc::Timestamp time_sent,
MaxRetransmits max_retransmissions,
- TimeMs expires_at,
+ webrtc::Timestamp expires_at,
LifecycleId lifecycle_id)
: message_id_(message_id),
time_sent_(time_sent),
@@ -195,7 +199,7 @@ class OutstandingData {
OutgoingMessageId message_id() const { return message_id_; }
- TimeMs time_sent() const { return time_sent_; }
+ webrtc::Timestamp time_sent() const { return time_sent_; }
const Data& data() const { return data_; }
@@ -229,7 +233,7 @@ class OutstandingData {
// Given the current time, and the current state of this DATA chunk, it will
// indicate if it has expired (SCTP Partial Reliability Extension).
- bool has_expired(TimeMs now) const;
+ bool has_expired(webrtc::Timestamp now) const;
LifecycleId lifecycle_id() const { return lifecycle_id_; }
@@ -258,7 +262,7 @@ class OutstandingData {
const OutgoingMessageId message_id_;
// When the packet was sent, and placed in this queue.
- const TimeMs time_sent_;
+ const webrtc::Timestamp time_sent_;
// If the message was sent with a maximum number of retransmissions, this is
// set to that number. The value zero (0) means that it will never be
// retransmitted.
@@ -278,7 +282,7 @@ class OutstandingData {
// At this exact millisecond, the item is considered expired. If the message
// is not to be expired, this is set to the infinite future.
- const TimeMs expires_at_;
+ const webrtc::Timestamp expires_at_;
// An optional lifecycle id, which may only be set for the last fragment.
const LifecycleId lifecycle_id_;
@@ -290,6 +294,9 @@ class OutstandingData {
// Returns how large a chunk will be, serialized, carrying the data
size_t GetSerializedChunkSize(const Data& data) const;
+ Item& GetItem(UnwrappedTSN tsn);
+ const Item& GetItem(UnwrappedTSN tsn) const;
+
// Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items
// in the retransmission queue up until this value and will update `ack_info`
// by setting `bytes_acked_by_cumulative_tsn_ack`.
@@ -313,7 +320,7 @@ class OutstandingData {
// Process the acknowledgement of the chunk referenced by `iter` and updates
// state in `ack_info` and the object's state.
- void AckChunk(AckInfo& ack_info, std::map<UnwrappedTSN, Item>::iterator iter);
+ void AckChunk(AckInfo& ack_info, UnwrappedTSN tsn, Item& item);
// Helper method to process an incoming nack of an item and perform the
// correct operations given the action indicated when nacking an item (e.g.
@@ -323,10 +330,11 @@ class OutstandingData {
// many times so that it should be retransmitted, this will schedule it to be
// "fast retransmitted". This is only done just before going into fast
// recovery.
- bool NackItem(UnwrappedTSN tsn,
- Item& item,
- bool retransmit_now,
- bool do_fast_retransmit);
+ //
+ // Note that since nacking an item may result in it becoming abandoned, which
+ // in turn could alter `outstanding_data_`, any iterators are invalidated
+ // after having called this method.
+ bool NackItem(UnwrappedTSN tsn, bool retransmit_now, bool do_fast_retransmit);
// Given that a message fragment, `item` has been abandoned, abandon all other
// fragments that share the same message - both never-before-sent fragments
@@ -341,19 +349,20 @@ class OutstandingData {
// The size of the data chunk (DATA/I-DATA) header that is used.
const size_t data_chunk_header_size_;
- // Next TSN to used.
- UnwrappedTSN next_tsn_;
// The last cumulative TSN ack number.
UnwrappedTSN last_cumulative_tsn_ack_;
// Callback when to discard items from the send queue.
std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_;
- std::map<UnwrappedTSN, Item> outstanding_data_;
+ // Outstanding items. If non-empty, the first element has
+ // `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict
+ // increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`.
+ std::deque<Item> outstanding_data_;
// The number of bytes that are in-flight (sent but not yet acked or nacked).
- size_t outstanding_bytes_ = 0;
+ size_t unacked_bytes_ = 0;
// The number of DATA chunks that are in-flight (sent but not yet acked or
// nacked).
- size_t outstanding_items_ = 0;
+ size_t unacked_items_ = 0;
// Data chunks that are eligible for fast retransmission.
std::set<UnwrappedTSN> to_be_fast_retransmitted_;
// Data chunks that are to be retransmitted.
diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc
index b8c2e593a1..e4bdb7ce7e 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc
@@ -37,8 +37,10 @@ using ::testing::Property;
using ::testing::Return;
using ::testing::StrictMock;
using ::testing::UnorderedElementsAre;
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
-constexpr TimeMs kNow(42);
+constexpr Timestamp kNow = Timestamp::Millis(42);
constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17);
class OutstandingDataTest : public testing::Test {
@@ -46,7 +48,6 @@ class OutstandingDataTest : public testing::Test {
OutstandingDataTest()
: gen_(MID(42)),
buf_(DataChunk::kHeaderSize,
- unwrapper_.Unwrap(TSN(10)),
unwrapper_.Unwrap(TSN(9)),
on_discard_.AsStdFunction()) {}
@@ -58,8 +59,8 @@ class OutstandingDataTest : public testing::Test {
TEST_F(OutstandingDataTest, HasInitialState) {
EXPECT_TRUE(buf_.empty());
- EXPECT_EQ(buf_.outstanding_bytes(), 0u);
- EXPECT_EQ(buf_.outstanding_items(), 0u);
+ EXPECT_EQ(buf_.unacked_bytes(), 0u);
+ EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(10));
@@ -75,8 +76,8 @@ TEST_F(OutstandingDataTest, InsertChunk) {
EXPECT_EQ(tsn.Wrap(), TSN(10));
- EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
- EXPECT_EQ(buf_.outstanding_items(), 1u);
+ EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
+ EXPECT_EQ(buf_.unacked_items(), 1u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11));
@@ -95,8 +96,8 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) {
EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10));
EXPECT_FALSE(ack.has_packet_loss);
- EXPECT_EQ(buf_.outstanding_bytes(), 0u);
- EXPECT_EQ(buf_.outstanding_items(), 0u);
+ EXPECT_EQ(buf_.unacked_bytes(), 0u);
+ EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10));
EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11));
@@ -109,8 +110,8 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false);
- EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
- EXPECT_EQ(buf_.outstanding_items(), 1u);
+ EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
+ EXPECT_EQ(buf_.unacked_items(), 1u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11));
@@ -131,8 +132,8 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11));
EXPECT_FALSE(ack.has_packet_loss);
- EXPECT_EQ(buf_.outstanding_bytes(), 0u);
- EXPECT_EQ(buf_.outstanding_items(), 0u);
+ EXPECT_EQ(buf_.unacked_bytes(), 0u);
+ EXPECT_EQ(buf_.unacked_items(), 0u);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9));
EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(12));
@@ -277,20 +278,20 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
}
TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) {
- static constexpr TimeMs kExpiresAt = kNow + DurationMs(1);
+ static constexpr Timestamp kExpiresAt = kNow + TimeDelta::Millis(1);
EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow,
MaxRetransmits::NoLimit(), kExpiresAt)
.has_value());
EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""),
- kNow + DurationMs(0), MaxRetransmits::NoLimit(),
- kExpiresAt)
+ kNow + TimeDelta::Millis(0),
+ MaxRetransmits::NoLimit(), kExpiresAt)
.has_value());
EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId))
.WillOnce(Return(false));
EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"),
- kNow + DurationMs(1), MaxRetransmits::NoLimit(),
- kExpiresAt)
+ kNow + TimeDelta::Millis(1),
+ MaxRetransmits::NoLimit(), kExpiresAt)
.has_value());
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
@@ -362,15 +363,14 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) {
TEST_F(OutstandingDataTest, MeasureRTT) {
buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow);
- buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1));
- buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2));
+ buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(1));
+ buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(2));
- static constexpr DurationMs kDuration(123);
- ASSERT_HAS_VALUE_AND_ASSIGN(
- DurationMs duration,
- buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11))));
+ static constexpr TimeDelta kDuration = TimeDelta::Millis(123);
+ TimeDelta duration =
+ buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11)));
- EXPECT_EQ(duration, kDuration - DurationMs(1));
+ EXPECT_EQ(duration, kDuration - TimeDelta::Millis(1));
}
TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
@@ -453,13 +453,13 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) {
buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow,
- MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
+ MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(),
LifecycleId(42));
buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow,
- MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
+ MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(),
LifecycleId(43));
buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow,
- MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(),
+ MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(),
LifecycleId(44));
OutstandingData::AckInfo ack1 =
@@ -479,7 +479,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) {
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
- TimeMs::InfiniteFuture(), LifecycleId(42));
+ Timestamp::PlusInfinity(), LifecycleId(42));
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@@ -515,7 +515,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) {
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
- TimeMs::InfiniteFuture(), LifecycleId(42));
+ Timestamp::PlusInfinity(), LifecycleId(42));
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc
index 2b9843f4a7..8c0d227a36 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc
@@ -25,7 +25,6 @@
#include "api/array_view.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
@@ -40,10 +39,13 @@
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
+#include "rtc_base/strings/str_join.h"
#include "rtc_base/strings/string_builder.h"
namespace dcsctp {
namespace {
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
// Allow sending only slightly less than an MTU, to account for headers.
constexpr float kMinBytesRequiredToSendFactor = 0.9;
@@ -55,7 +57,7 @@ RetransmissionQueue::RetransmissionQueue(
TSN my_initial_tsn,
size_t a_rwnd,
SendQueue& send_queue,
- std::function<void(DurationMs rtt)> on_new_rtt,
+ std::function<void(TimeDelta rtt)> on_new_rtt,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
@@ -84,7 +86,6 @@ RetransmissionQueue::RetransmissionQueue(
send_queue_(send_queue),
outstanding_data_(
data_chunk_header_size_,
- tsn_unwrapper_.Unwrap(my_initial_tsn),
tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)),
[this](StreamID stream_id, OutgoingMessageId message_id) {
return send_queue_.Discard(stream_id, message_id);
@@ -114,12 +115,12 @@ void RetransmissionQueue::MaybeExitFastRecovery(
}
void RetransmissionQueue::HandleIncreasedCumulativeTsnAck(
- size_t outstanding_bytes,
+ size_t unacked_bytes,
size_t total_bytes_acked) {
// Allow some margin for classifying as fully utilized, due to e.g. that too
// small packets (less than kMinimumFragmentedPayload) are not sent +
// overhead.
- bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_;
+ bool is_fully_utilized = unacked_bytes + options_.mtu >= cwnd_;
size_t old_cwnd = cwnd_;
if (phase() == CongestionAlgorithmPhase::kSlowStart) {
if (is_fully_utilized && !is_in_fast_recovery()) {
@@ -204,13 +205,13 @@ void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) {
}
void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) {
- rwnd_ = outstanding_data_.outstanding_bytes() >= a_rwnd
+ rwnd_ = outstanding_data_.unacked_bytes() >= a_rwnd
? 0
- : a_rwnd - outstanding_data_.outstanding_bytes();
+ : a_rwnd - outstanding_data_.unacked_bytes();
}
void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() {
- // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to
+ // Note: Can't use `unacked_bytes()` as that one doesn't count chunks to
// be retransmitted.
if (outstanding_data_.empty()) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
@@ -257,14 +258,14 @@ bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const {
return true;
}
-bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
+bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) {
if (!IsSackValid(sack)) {
return false;
}
UnwrappedTSN old_last_cumulative_tsn_ack =
outstanding_data_.last_cumulative_tsn_ack();
- size_t old_outstanding_bytes = outstanding_data_.outstanding_bytes();
+ size_t old_unacked_bytes = outstanding_data_.unacked_bytes();
size_t old_rwnd = rwnd_;
UnwrappedTSN cumulative_tsn_ack =
tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack());
@@ -301,9 +302,9 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack="
<< *cumulative_tsn_ack.Wrap() << " ("
<< *old_last_cumulative_tsn_ack.Wrap()
- << "), outstanding_bytes="
- << outstanding_data_.outstanding_bytes() << " ("
- << old_outstanding_bytes << "), rwnd=" << rwnd_ << " ("
+ << "), unacked_bytes="
+ << outstanding_data_.unacked_bytes() << " ("
+ << old_unacked_bytes << "), rwnd=" << rwnd_ << " ("
<< old_rwnd << ")";
if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) {
@@ -315,8 +316,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
// Note: It may be started again in a bit further down.
t3_rtx_.Stop();
- HandleIncreasedCumulativeTsnAck(old_outstanding_bytes,
- ack_info.bytes_acked);
+ HandleIncreasedCumulativeTsnAck(old_unacked_bytes, ack_info.bytes_acked);
}
if (ack_info.has_packet_loss) {
@@ -335,7 +335,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
return true;
}
-void RetransmissionQueue::UpdateRTT(TimeMs now,
+void RetransmissionQueue::UpdateRTT(Timestamp now,
UnwrappedTSN cumulative_tsn_ack) {
// RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C,
// Halvorsen P (2006) Considerations of SCTP retransmission delays for thin
@@ -345,17 +345,16 @@ void RetransmissionQueue::UpdateRTT(TimeMs now,
// TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and
// use only those packets for measurement.
- absl::optional<DurationMs> rtt =
- outstanding_data_.MeasureRTT(now, cumulative_tsn_ack);
+ TimeDelta rtt = outstanding_data_.MeasureRTT(now, cumulative_tsn_ack);
- if (rtt.has_value()) {
- on_new_rtt_(*rtt);
+ if (rtt.IsFinite()) {
+ on_new_rtt_(rtt);
}
}
void RetransmissionQueue::HandleT3RtxTimerExpiry() {
size_t old_cwnd = cwnd_;
- size_t old_outstanding_bytes = outstanding_bytes();
+ size_t old_unacked_bytes = unacked_bytes();
// https://tools.ietf.org/html/rfc4960#section-6.3.3
// "For the destination address for which the timer expires, adjust
// its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU."
@@ -392,8 +391,8 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_
- << ", outstanding_bytes " << outstanding_bytes() << " ("
- << old_outstanding_bytes << ")";
+ << ", unacked_bytes " << unacked_bytes() << " ("
+ << old_unacked_bytes << ")";
RTC_DCHECK(IsConsistent());
}
@@ -402,7 +401,7 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) {
RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted());
RTC_DCHECK(IsDivisibleBy4(bytes_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent;
- size_t old_outstanding_bytes = outstanding_bytes();
+ size_t old_unacked_bytes = unacked_bytes();
to_be_sent =
outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet);
@@ -441,21 +440,21 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) {
sb << *c.first;
})
<< " - " << bytes_retransmitted
- << " bytes. outstanding_bytes=" << outstanding_bytes()
- << " (" << old_outstanding_bytes << ")";
+ << " bytes. unacked_bytes=" << unacked_bytes() << " ("
+ << old_unacked_bytes << ")";
RTC_DCHECK(IsConsistent());
return to_be_sent;
}
std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
- TimeMs now,
+ Timestamp now,
size_t bytes_remaining_in_packet) {
// Chunks are always padded to even divisible by four.
RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet));
std::vector<std::pair<TSN, Data>> to_be_sent;
- size_t old_outstanding_bytes = outstanding_bytes();
+ size_t old_unacked_bytes = unacked_bytes();
size_t old_rwnd = rwnd_;
// Calculate the bandwidth budget (how many bytes that is
@@ -494,7 +493,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
chunk_opt->message_id, chunk_opt->data, now,
partial_reliability_ ? chunk_opt->max_retransmissions
: MaxRetransmits::NoLimit(),
- partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(),
+ partial_reliability_ ? chunk_opt->expires_at
+ : Timestamp::PlusInfinity(),
chunk_opt->lifecycle_id);
if (tsn.has_value()) {
@@ -526,8 +526,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
[&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second);
})
- << " bytes. outstanding_bytes=" << outstanding_bytes()
- << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_
+ << " bytes. unacked_bytes=" << unacked_bytes() << " ("
+ << old_unacked_bytes << "), cwnd=" << cwnd_
<< ", rwnd=" << rwnd_ << " (" << old_rwnd << ")";
}
RTC_DCHECK(IsConsistent());
@@ -539,7 +539,7 @@ bool RetransmissionQueue::can_send_data() const {
max_bytes_to_send() >= min_bytes_required_to_send_;
}
-bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) {
+bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) {
if (!partial_reliability_) {
return false;
}
@@ -550,9 +550,9 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) {
}
size_t RetransmissionQueue::max_bytes_to_send() const {
- size_t left = outstanding_bytes() >= cwnd_ ? 0 : cwnd_ - outstanding_bytes();
+ size_t left = unacked_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_bytes();
- if (outstanding_bytes() == 0) {
+ if (unacked_bytes() == 0) {
// https://datatracker.ietf.org/doc/html/rfc4960#section-6.1
// ... However, regardless of the value of rwnd (including if it is 0), the
// data sender can always have one DATA chunk in flight to the receiver if
@@ -619,7 +619,6 @@ void RetransmissionQueue::RestoreFromState(
partial_bytes_acked_ = state.tx.partial_bytes_acked;
outstanding_data_.ResetSequenceNumbers(
- tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn)),
tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn - 1)));
}
} // namespace dcsctp
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h
index b44db2a9a0..a0fbb33c47 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h
@@ -60,7 +60,7 @@ class RetransmissionQueue {
TSN my_initial_tsn,
size_t a_rwnd,
SendQueue& send_queue,
- std::function<void(DurationMs rtt)> on_new_rtt,
+ std::function<void(webrtc::TimeDelta rtt)> on_new_rtt,
std::function<void()> on_clear_retransmission_counter,
Timer& t3_rtx,
const DcSctpOptions& options,
@@ -69,7 +69,7 @@ class RetransmissionQueue {
// Handles a received SACK. Returns true if the `sack` was processed and
// false if it was discarded due to received out-of-order and not relevant.
- bool HandleSack(TimeMs now, const SackChunk& sack);
+ bool HandleSack(webrtc::Timestamp now, const SackChunk& sack);
// Handles an expired retransmission timer.
void HandleT3RtxTimerExpiry();
@@ -90,7 +90,7 @@ class RetransmissionQueue {
// called prior to this method, to abandon expired chunks, as this method will
// not expire any chunks.
std::vector<std::pair<TSN, Data>> GetChunksToSend(
- TimeMs now,
+ webrtc::Timestamp now,
size_t bytes_remaining_in_packet);
// Returns the internal state of all queued chunks. This is only used in
@@ -121,14 +121,10 @@ class RetransmissionQueue {
uint64_t rtx_bytes_count() const { return rtx_bytes_count_; }
// Returns the number of bytes of packets that are in-flight.
- size_t outstanding_bytes() const {
- return outstanding_data_.outstanding_bytes();
- }
+ size_t unacked_bytes() const { return outstanding_data_.unacked_bytes(); }
// Returns the number of DATA chunks that are in-flight.
- size_t outstanding_items() const {
- return outstanding_data_.outstanding_items();
- }
+ size_t unacked_items() const { return outstanding_data_.unacked_items(); }
// Indicates if the congestion control algorithm allows data to be sent.
bool can_send_data() const;
@@ -136,7 +132,7 @@ class RetransmissionQueue {
// Given the current time `now`, it will evaluate if there are chunks that
// have expired and that need to be discarded. It returns true if a
// FORWARD-TSN should be sent.
- bool ShouldSendForwardTsn(TimeMs now);
+ bool ShouldSendForwardTsn(webrtc::Timestamp now);
// Creates a FORWARD-TSN chunk.
ForwardTsnChunk CreateForwardTsn() const {
@@ -185,7 +181,7 @@ class RetransmissionQueue {
// When a SACK chunk is received, this method will be called which _may_ call
// into the `RetransmissionTimeout` to update the RTO.
- void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack);
+ void UpdateRTT(webrtc::Timestamp now, UnwrappedTSN cumulative_tsn_ack);
// If the congestion control is in "fast recovery mode", this may be exited
// now.
@@ -197,7 +193,7 @@ class RetransmissionQueue {
// Update the congestion control algorithm given as the cumulative ack TSN
// value has increased, as reported in an incoming SACK chunk.
- void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes,
+ void HandleIncreasedCumulativeTsnAck(size_t unacked_bytes,
size_t total_bytes_acked);
// Update the congestion control algorithm, given as packet loss has been
// detected, as reported in an incoming SACK chunk.
@@ -230,7 +226,7 @@ class RetransmissionQueue {
// The size of the data chunk (DATA/I-DATA) header that is used.
const size_t data_chunk_header_size_;
// Called when a new RTT measurement has been done
- const std::function<void(DurationMs rtt)> on_new_rtt_;
+ const std::function<void(webrtc::TimeDelta rtt)> on_new_rtt_;
// Called when a SACK has been seen that cleared the retransmission counter.
const std::function<void()> on_clear_retransmission_counter_;
// The retransmission counter.
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc
index d50494f084..eb1e04a5bb 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc
@@ -52,6 +52,8 @@ using ::testing::Pair;
using ::testing::Return;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
constexpr uint32_t kArwnd = 100000;
constexpr uint32_t kMaxMtu = 1191;
@@ -74,12 +76,12 @@ class RetransmissionQueueTest : public testing::Test {
}),
timer_(timer_manager_.CreateTimer(
"test/t3_rtx",
- []() { return absl::nullopt; },
- TimerOptions(options_.rto_initial))) {}
+ []() { return TimeDelta::Zero(); },
+ TimerOptions(options_.rto_initial.ToTimeDelta()))) {}
- std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk(
+ std::function<SendQueue::DataToSend(Timestamp, size_t)> CreateChunk(
OutgoingMessageId message_id) {
- return [this, message_id](TimeMs now, size_t max_size) {
+ return [this, message_id](Timestamp now, size_t max_size) {
return SendQueue::DataToSend(message_id,
gen_.Ordered({1, 2, 3, 4}, "BE"));
};
@@ -127,10 +129,10 @@ class RetransmissionQueueTest : public testing::Test {
MockDcSctpSocketCallbacks callbacks_;
DcSctpOptions options_;
DataGenerator gen_;
- TimeMs now_ = TimeMs(0);
+ Timestamp now_ = Timestamp::Zero();
FakeTimeoutManager timeout_manager_;
TimerManager timer_manager_;
- NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_;
+ NiceMock<MockFunction<void(TimeDelta rtt_ms)>> on_rtt_;
NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
NiceMock<MockSendQueue> producer_;
std::unique_ptr<Timer> timer_;
@@ -146,7 +148,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
@@ -159,7 +161,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
@@ -175,7 +177,7 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
@@ -198,7 +200,7 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
@@ -229,7 +231,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
@@ -241,7 +243,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 18
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(8)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));
// Ack 12, 14-15, 17-18
@@ -262,7 +264,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 19
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(9)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));
// Ack 12, 14-15, 17-19
@@ -274,7 +276,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
// Send 20
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(10)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));
// Ack 12, 14-15, 17-20
@@ -321,16 +323,16 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
- static constexpr TimeMs kStartTime(100000);
+ static constexpr Timestamp kStartTime = Timestamp::Seconds(100);
now_ = kStartTime;
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
// Ack 10, 12, after 100ms.
- now_ += DurationMs(100);
+ now_ += TimeDelta::Millis(100);
queue.HandleSack(
now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));
@@ -342,22 +344,22 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
// Send 13
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(3)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13)));
// Ack 10, 12-13, after 100ms.
- now_ += DurationMs(100);
+ now_ += TimeDelta::Millis(100);
queue.HandleSack(
now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));
// Send 14
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(4)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14)));
// Ack 10, 12-14, after 100 ms.
- now_ += DurationMs(100);
+ now_ += TimeDelta::Millis(100);
queue.HandleSack(
now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));
@@ -383,11 +385,11 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
// Verify that the timer was really restarted when fast-retransmitting. The
// timeout is `options_.rto_initial`, so advance the time just before that.
- now_ += options_.rto_initial - DurationMs(1);
+ now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1);
EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value());
// And ensure it really is running.
- now_ += DurationMs(1);
+ now_ += TimeDelta::Millis(1);
ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout,
timeout_manager_.GetNextExpiredTimeout());
// An expired timeout has to be handled (asserts validate this).
@@ -397,15 +399,15 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(1),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
@@ -420,11 +422,11 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -459,12 +461,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
RetransmissionQueue queue =
CreateQueue(/*supports_partial_reliability=*/false);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -488,12 +490,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -529,12 +531,12 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(3);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -577,16 +579,16 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
static constexpr size_t kCwnd = 1200;
queue.set_cwnd(kCwnd);
EXPECT_EQ(queue.cwnd(), kCwnd);
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1500);
@@ -594,8 +596,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
- EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
- EXPECT_EQ(queue.outstanding_items(), 1u);
+ EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize);
+ EXPECT_EQ(queue.unacked_items(), 1u);
// Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry();
@@ -603,8 +605,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
queue.GetChunksToSend(now_, 1500);
@@ -612,30 +614,30 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
- EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
- EXPECT_EQ(queue.outstanding_items(), 1u);
+ EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize);
+ EXPECT_EQ(queue.unacked_items(), 1u);
}
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -675,23 +677,23 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, "E"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -729,7 +731,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(1);
SendQueue::DataToSend dts(OutgoingMessageId(42),
@@ -737,7 +739,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(2);
SendQueue::DataToSend dts(OutgoingMessageId(43),
@@ -745,7 +747,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(3);
SendQueue::DataToSend dts(OutgoingMessageId(44),
@@ -753,7 +755,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(4);
SendQueue::DataToSend dts(OutgoingMessageId(45),
@@ -761,7 +763,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
@@ -850,21 +852,21 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
TEST_F(RetransmissionQueueTest, MeasureRTT) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(OutgoingMessageId(0),
gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
- now_ = now_ + DurationMs(123);
+ now_ = now_ + TimeDelta::Millis(123);
- EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1);
+ EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1);
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
}
@@ -888,7 +890,7 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
@@ -918,7 +920,7 @@ TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
@@ -965,7 +967,7 @@ TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
@@ -995,14 +997,14 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
// See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the
// magic numbers in this test.
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t size) {
+ .WillOnce([this](Timestamp, size_t size) {
EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(183);
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
- .WillOnce([this](TimeMs, size_t size) {
+ .WillOnce([this](Timestamp, size_t size) {
EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(957);
@@ -1018,23 +1020,23 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -1046,8 +1048,8 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
- EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
- EXPECT_EQ(queue.outstanding_items(), 3u);
+ EXPECT_EQ(queue.unacked_bytes(), (16 + 4) * 3u);
+ EXPECT_EQ(queue.unacked_items(), 3u);
// Mark the message as lost.
EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
@@ -1060,21 +1062,21 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
// Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
- EXPECT_EQ(queue.outstanding_items(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_items(), 0u);
}
TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
@@ -1082,21 +1084,21 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
DataGeneratorOptions options;
options.stream_id = StreamID(17);
options.mid = MID(42);
- TimeMs test_start = now_;
+ Timestamp test_start = now_;
EXPECT_CALL(producer_, Produce)
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({1, 2, 3, 4}, "B", options));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId,
gen_.Ordered({5, 6, 7, 8}, "", options));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 24);
@@ -1104,7 +1106,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId))
.WillOnce(Return(true));
- now_ += DurationMs(100);
+ now_ += TimeDelta::Millis(100);
EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
@@ -1118,38 +1120,38 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) {
RetransmissionQueue queue = CreateQueue();
- TimeMs test_start = now_;
+ Timestamp test_start = now_;
EXPECT_CALL(producer_, Produce)
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(42),
gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(43),
gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
// Stream reset - MID reset to zero again.
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(44),
gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)}));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
- .WillOnce([&](TimeMs, size_t) {
+ .WillOnce([&](Timestamp, size_t) {
SendQueue::DataToSend dts(
OutgoingMessageId(44),
gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)}));
- dts.expires_at = TimeMs(test_start + DurationMs(10));
+ dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
return dts;
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44)))
.WillOnce(Return(true));
@@ -1160,7 +1162,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) {
EXPECT_THAT(queue.GetChunksToSend(now_, 24),
ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0)))));
- now_ += DurationMs(100);
+ now_ += TimeDelta::Millis(100);
EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
EXPECT_THAT(
@@ -1176,7 +1178,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) {
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(0);
return dts;
@@ -1184,7 +1186,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
.WillOnce(CreateChunk(OutgoingMessageId(2)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
@@ -1246,7 +1248,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
// This is a fairly long test.
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
+ .WillOnce([this](Timestamp, size_t) {
SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = MaxRetransmits(2);
return dts;
@@ -1260,7 +1262,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
.WillOnce(CreateChunk(OutgoingMessageId(8)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
@@ -1386,17 +1388,17 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) {
std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1500);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
size_t serialized_size = payload.size() + DataChunk::kHeaderSize;
- EXPECT_EQ(queue.outstanding_bytes(), serialized_size);
+ EXPECT_EQ(queue.unacked_bytes(), serialized_size);
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
@@ -1414,12 +1416,12 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) {
// Fill the congestion window almost - leaving 500 bytes.
size_t chunk_size = intial_cwnd - 500;
EXPECT_CALL(producer_, Produce)
- .WillOnce([chunk_size, this](TimeMs, size_t) {
+ .WillOnce([chunk_size, this](Timestamp, size_t) {
return SendQueue::DataToSend(
OutgoingMessageId(0),
gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_TRUE(queue.can_send_data());
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -1433,7 +1435,7 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) {
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_TRUE(queue.can_send_data());
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
+ EXPECT_EQ(queue.unacked_bytes(), 0u);
EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu);
}
@@ -1447,12 +1449,12 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) {
// Fill the congestion window almost - leaving 500 bytes.
size_t chunk_size = intial_cwnd - 500;
EXPECT_CALL(producer_, Produce)
- .WillOnce([chunk_size, this](TimeMs, size_t) {
+ .WillOnce([chunk_size, this](Timestamp, size_t) {
return SendQueue::DataToSend(
OutgoingMessageId(0),
gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_TRUE(queue.can_send_data());
std::vector<std::pair<TSN, Data>> chunks_to_send =
@@ -1467,7 +1469,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
EXPECT_EQ(
@@ -1490,7 +1492,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
.WillOnce(CreateChunk(OutgoingMessageId(5)))
.WillOnce(CreateChunk(OutgoingMessageId(6)))
.WillOnce(CreateChunk(OutgoingMessageId(7)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8));
EXPECT_EQ(
queue.GetHandoverReadiness(),
@@ -1503,7 +1505,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 18
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(8)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
// Ack 12, 14-15, 17-18
@@ -1515,7 +1517,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 19
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(9)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
// Ack 12, 14-15, 17-19
@@ -1527,7 +1529,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
// Send 20
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(10)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
// Ack 12, 14-15, 17-20
@@ -1563,7 +1565,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) {
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk(OutgoingMessageId(0)))
.WillOnce(CreateChunk(OutgoingMessageId(1)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2));
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
@@ -1574,7 +1576,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) {
.WillOnce(CreateChunk(OutgoingMessageId(2)))
.WillOnce(CreateChunk(OutgoingMessageId(3)))
.WillOnce(CreateChunk(OutgoingMessageId(4)))
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(*handedover_queue),
testing::ElementsAre(TSN(12), TSN(13), TSN(14)));
@@ -1592,27 +1594,27 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) {
std::vector<uint8_t> payload(mtu - 100);
EXPECT_CALL(producer_, Produce)
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "B"));
})
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, ""));
})
- .WillOnce([this, payload](TimeMs, size_t) {
+ .WillOnce([this, payload](Timestamp, size_t) {
return SendQueue::DataToSend(OutgoingMessageId(0),
gen_.Ordered(payload, "E"));
})
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+ .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
// Produce all chunks and put them in the retransmission queue.
std::vector<std::pair<TSN, Data>> chunks_to_send =
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc
index 7d8fb9761c..8af77041a5 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc
@@ -12,28 +12,29 @@
#include <algorithm>
#include <cstdint>
+#include "api/units/time_delta.h"
#include "net/dcsctp/public/dcsctp_options.h"
namespace dcsctp {
RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options)
- : min_rto_(*options.rto_min),
- max_rto_(*options.rto_max),
- max_rtt_(*options.rtt_max),
+ : min_rto_(options.rto_min.ToTimeDelta()),
+ max_rto_(options.rto_max.ToTimeDelta()),
+ max_rtt_(options.rtt_max.ToTimeDelta()),
min_rtt_variance_(*options.min_rtt_variance),
scaled_srtt_(*options.rto_initial << kRttShift),
rto_(*options.rto_initial) {}
-void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) {
- const int32_t rtt = *measured_rtt;
-
+void RetransmissionTimeout::ObserveRTT(webrtc::TimeDelta measured_rtt) {
// Unrealistic values will be skipped. If a wrongly measured (or otherwise
// corrupt) value was processed, it could change the state in a way that would
// take a very long time to recover.
- if (rtt < 0 || rtt > max_rtt_) {
+ if (measured_rtt < webrtc::TimeDelta::Zero() || measured_rtt > max_rtt_) {
return;
}
+ const int64_t rtt = measured_rtt.ms();
+
// From https://tools.ietf.org/html/rfc4960#section-6.3.1, but avoiding
// floating point math by implementing algorithm from "V. Jacobson: Congestion
// avoidance and control", but adapted for SCTP.
@@ -42,7 +43,7 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) {
scaled_rtt_var_ = (rtt / 2) << kRttVarShift;
first_measurement_ = false;
} else {
- int32_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift);
+ int64_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift);
scaled_srtt_ += rtt_diff;
if (rtt_diff < 0) {
rtt_diff = -rtt_diff;
@@ -58,6 +59,6 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) {
rto_ = (scaled_srtt_ >> kRttShift) + scaled_rtt_var_;
// Clamp RTO between min and max.
- rto_ = std::min(std::max(rto_, min_rto_), max_rto_);
+ rto_ = std::min(std::max(rto_, min_rto_.ms()), max_rto_.ms());
}
} // namespace dcsctp
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h
index 01530cb3b5..b4b0fd7fef 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h
@@ -32,27 +32,29 @@ class RetransmissionTimeout {
explicit RetransmissionTimeout(const DcSctpOptions& options);
// To be called when a RTT has been measured, to update the RTO value.
- void ObserveRTT(DurationMs measured_rtt);
+ void ObserveRTT(webrtc::TimeDelta measured_rtt);
// Returns the Retransmission Timeout (RTO) value, in milliseconds.
- DurationMs rto() const { return DurationMs(rto_); }
+ webrtc::TimeDelta rto() const { return webrtc::TimeDelta::Millis(rto_); }
// Returns the smoothed RTT value, in milliseconds.
- DurationMs srtt() const { return DurationMs(scaled_srtt_ >> kRttShift); }
+ webrtc::TimeDelta srtt() const {
+ return webrtc::TimeDelta::Millis(scaled_srtt_ >> kRttShift);
+ }
private:
- const int32_t min_rto_;
- const int32_t max_rto_;
- const int32_t max_rtt_;
- const int32_t min_rtt_variance_;
+ const webrtc::TimeDelta min_rto_;
+ const webrtc::TimeDelta max_rto_;
+ const webrtc::TimeDelta max_rtt_;
+ const int64_t min_rtt_variance_;
// If this is the first measurement
bool first_measurement_ = true;
// Smoothed Round-Trip Time, shifted by kRttShift
- int32_t scaled_srtt_;
+ int64_t scaled_srtt_;
// Round-Trip Time Variation, shifted by kRttVarShift
- int32_t scaled_rtt_var_ = 0;
+ int64_t scaled_rtt_var_ = 0;
// Retransmission Timeout
- int32_t rto_;
+ int64_t rto_;
};
} // namespace dcsctp
diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc
index b901995e97..7754578f32 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc
@@ -15,20 +15,21 @@
namespace dcsctp {
namespace {
+using ::webrtc::TimeDelta;
-constexpr DurationMs kMaxRtt = DurationMs(8'000);
-constexpr DurationMs kInitialRto = DurationMs(200);
-constexpr DurationMs kMaxRto = DurationMs(800);
-constexpr DurationMs kMinRto = DurationMs(120);
-constexpr DurationMs kMinRttVariance = DurationMs(220);
+constexpr TimeDelta kMaxRtt = TimeDelta::Millis(8'000);
+constexpr TimeDelta kInitialRto = TimeDelta::Millis(200);
+constexpr TimeDelta kMaxRto = TimeDelta::Millis(800);
+constexpr TimeDelta kMinRto = TimeDelta::Millis(120);
+constexpr TimeDelta kMinRttVariance = TimeDelta::Millis(220);
DcSctpOptions MakeOptions() {
DcSctpOptions options;
- options.rtt_max = kMaxRtt;
- options.rto_initial = kInitialRto;
- options.rto_max = kMaxRto;
- options.rto_min = kMinRto;
- options.min_rtt_variance = kMinRttVariance;
+ options.rtt_max = DurationMs(kMaxRtt);
+ options.rto_initial = DurationMs(kInitialRto);
+ options.rto_max = DurationMs(kMaxRto);
+ options.rto_min = DurationMs(kMinRto);
+ options.min_rtt_variance = DurationMs(kMinRttVariance);
return options;
}
@@ -45,31 +46,31 @@ TEST(RetransmissionTimeoutTest, HasValidInitialSrtt) {
TEST(RetransmissionTimeoutTest, NegativeValuesDoNotAffectRTO) {
RetransmissionTimeout rto_(MakeOptions());
// Initial negative value
- rto_.ObserveRTT(DurationMs(-10));
+ rto_.ObserveRTT(TimeDelta::Millis(-10));
EXPECT_EQ(rto_.rto(), kInitialRto);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
// Subsequent negative value
- rto_.ObserveRTT(DurationMs(-10));
- EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(TimeDelta::Millis(-10));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
}
TEST(RetransmissionTimeoutTest, TooLargeValuesDoNotAffectRTO) {
RetransmissionTimeout rto_(MakeOptions());
// Initial too large value
- rto_.ObserveRTT(kMaxRtt + DurationMs(100));
+ rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100));
EXPECT_EQ(rto_.rto(), kInitialRto);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
// Subsequent too large value
- rto_.ObserveRTT(kMaxRtt + DurationMs(100));
- EXPECT_EQ(*rto_.rto(), 372);
+ rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
}
TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) {
RetransmissionTimeout rto_(MakeOptions());
for (int i = 0; i < 1000; ++i) {
- rto_.ObserveRTT(DurationMs(1));
+ rto_.ObserveRTT(TimeDelta::Millis(1));
}
EXPECT_GE(rto_.rto(), kMinRto);
}
@@ -77,67 +78,67 @@ TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) {
TEST(RetransmissionTimeoutTest, WillNeverGoAboveMaximumRto) {
RetransmissionTimeout rto_(MakeOptions());
for (int i = 0; i < 1000; ++i) {
- rto_.ObserveRTT(kMaxRtt - DurationMs(1));
+ rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(1));
// Adding jitter, which would make it RTO be well above RTT.
- rto_.ObserveRTT(kMaxRtt - DurationMs(100));
+ rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(100));
}
EXPECT_LE(rto_.rto(), kMaxRto);
}
TEST(RetransmissionTimeoutTest, CalculatesRtoForStableRtt) {
RetransmissionTimeout rto_(MakeOptions());
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 372);
- rto_.ObserveRTT(DurationMs(128));
- EXPECT_EQ(*rto_.rto(), 344);
- rto_.ObserveRTT(DurationMs(123));
- EXPECT_EQ(*rto_.rto(), 344);
- rto_.ObserveRTT(DurationMs(125));
- EXPECT_EQ(*rto_.rto(), 344);
- rto_.ObserveRTT(DurationMs(127));
- EXPECT_EQ(*rto_.rto(), 344);
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
+ rto_.ObserveRTT(TimeDelta::Millis(128));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344));
+ rto_.ObserveRTT(TimeDelta::Millis(123));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344));
+ rto_.ObserveRTT(TimeDelta::Millis(125));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344));
+ rto_.ObserveRTT(TimeDelta::Millis(127));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344));
}
TEST(RetransmissionTimeoutTest, CalculatesRtoForUnstableRtt) {
RetransmissionTimeout rto_(MakeOptions());
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 372);
- rto_.ObserveRTT(DurationMs(402));
- EXPECT_EQ(*rto_.rto(), 622);
- rto_.ObserveRTT(DurationMs(728));
- EXPECT_EQ(*rto_.rto(), 800);
- rto_.ObserveRTT(DurationMs(89));
- EXPECT_EQ(*rto_.rto(), 800);
- rto_.ObserveRTT(DurationMs(126));
- EXPECT_EQ(*rto_.rto(), 800);
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
+ rto_.ObserveRTT(TimeDelta::Millis(402));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(622));
+ rto_.ObserveRTT(TimeDelta::Millis(728));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800));
+ rto_.ObserveRTT(TimeDelta::Millis(89));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800));
+ rto_.ObserveRTT(TimeDelta::Millis(126));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800));
}
TEST(RetransmissionTimeoutTest, WillStabilizeAfterAWhile) {
RetransmissionTimeout rto_(MakeOptions());
- rto_.ObserveRTT(DurationMs(124));
- rto_.ObserveRTT(DurationMs(402));
- rto_.ObserveRTT(DurationMs(728));
- rto_.ObserveRTT(DurationMs(89));
- rto_.ObserveRTT(DurationMs(126));
- EXPECT_EQ(*rto_.rto(), 800);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 800);
- rto_.ObserveRTT(DurationMs(122));
- EXPECT_EQ(*rto_.rto(), 710);
- rto_.ObserveRTT(DurationMs(123));
- EXPECT_EQ(*rto_.rto(), 631);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 562);
- rto_.ObserveRTT(DurationMs(122));
- EXPECT_EQ(*rto_.rto(), 505);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 454);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 410);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 372);
- rto_.ObserveRTT(DurationMs(124));
- EXPECT_EQ(*rto_.rto(), 367);
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ rto_.ObserveRTT(TimeDelta::Millis(402));
+ rto_.ObserveRTT(TimeDelta::Millis(728));
+ rto_.ObserveRTT(TimeDelta::Millis(89));
+ rto_.ObserveRTT(TimeDelta::Millis(126));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800));
+ rto_.ObserveRTT(TimeDelta::Millis(122));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(710));
+ rto_.ObserveRTT(TimeDelta::Millis(123));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(631));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(562));
+ rto_.ObserveRTT(TimeDelta::Millis(122));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(505));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(454));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(410));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(367));
}
TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) {
@@ -149,31 +150,33 @@ TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) {
RetransmissionTimeout rto_(MakeOptions());
for (int i = 0; i < 1000; ++i) {
- rto_.ObserveRTT(DurationMs(124));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
}
- EXPECT_EQ(*rto_.rto(), 344);
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344));
}
TEST(RetransmissionTimeoutTest, CanSpecifySmallerMinimumRttVariance) {
DcSctpOptions options = MakeOptions();
- options.min_rtt_variance = kMinRttVariance - DurationMs(100);
+ options.min_rtt_variance =
+ DurationMs(kMinRttVariance - TimeDelta::Millis(100));
RetransmissionTimeout rto_(options);
for (int i = 0; i < 1000; ++i) {
- rto_.ObserveRTT(DurationMs(124));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
}
- EXPECT_EQ(*rto_.rto(), 244);
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(244));
}
TEST(RetransmissionTimeoutTest, CanSpecifyLargerMinimumRttVariance) {
DcSctpOptions options = MakeOptions();
- options.min_rtt_variance = kMinRttVariance + DurationMs(100);
+ options.min_rtt_variance =
+ DurationMs(kMinRttVariance + TimeDelta::Millis(100));
RetransmissionTimeout rto_(options);
for (int i = 0; i < 1000; ++i) {
- rto_.ObserveRTT(DurationMs(124));
+ rto_.ObserveRTT(TimeDelta::Millis(124));
}
- EXPECT_EQ(*rto_.rto(), 444);
+ EXPECT_EQ(rto_.rto(), TimeDelta::Millis(444));
}
} // namespace
diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc
index facb432c59..7cbead296c 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc
@@ -21,15 +21,17 @@
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/logging.h"
+#include "rtc_base/strings/str_join.h"
namespace dcsctp {
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
RRSendQueue::RRSendQueue(absl::string_view log_prefix,
DcSctpSocketCallbacks* callbacks,
@@ -137,7 +139,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
}
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
- TimeMs now,
+ Timestamp now,
size_t max_size) {
RTC_DCHECK(pause_state_ != PauseState::kPaused &&
pause_state_ != PauseState::kResetting);
@@ -349,7 +351,7 @@ bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
return items_.front().mid.has_value();
}
-void RRSendQueue::Add(TimeMs now,
+void RRSendQueue::Add(Timestamp now,
DcSctpMessage message,
const SendOptions& send_options) {
RTC_DCHECK(!message.payload().empty());
@@ -366,8 +368,9 @@ void RRSendQueue::Add(TimeMs now,
? MaxRetransmits(send_options.max_retransmissions.value())
: MaxRetransmits::NoLimit(),
.expires_at = send_options.lifetime.has_value()
- ? now + *send_options.lifetime + DurationMs(1)
- : TimeMs::InfiniteFuture(),
+ ? now + send_options.lifetime->ToTimeDelta() +
+ TimeDelta::Millis(1)
+ : Timestamp::PlusInfinity(),
.lifecycle_id = send_options.lifecycle_id,
};
GetOrCreateStreamInfo(message.stream_id())
@@ -383,7 +386,7 @@ bool RRSendQueue::IsEmpty() const {
return total_buffered_amount() == 0;
}
-absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(Timestamp now,
size_t max_size) {
return scheduler_.Produce(now, max_size);
}
diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
index bef5fe437d..b6c359dc1e 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
@@ -71,12 +71,13 @@ class RRSendQueue : public SendQueue {
// time should be in `now`. Note that it's the responsibility of the caller to
// ensure that the buffer is not full (by calling `IsFull`) before adding
// messages to it.
- void Add(TimeMs now,
+ void Add(webrtc::Timestamp now,
DcSctpMessage message,
const SendOptions& send_options = {});
// Implementation of `SendQueue`.
- absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
+ absl::optional<DataToSend> Produce(webrtc::Timestamp now,
+ size_t max_size) override;
bool Discard(StreamID stream_id, OutgoingMessageId message_id) override;
void PrepareResetStream(StreamID streams) override;
bool HasStreamsReadyToBeReset() const override;
@@ -104,7 +105,7 @@ class RRSendQueue : public SendQueue {
struct MessageAttributes {
IsUnordered unordered;
MaxRetransmits max_retransmissions;
- TimeMs expires_at;
+ webrtc::Timestamp expires_at;
LifecycleId lifecycle_id;
};
@@ -154,7 +155,7 @@ class RRSendQueue : public SendQueue {
void Add(DcSctpMessage message, MessageAttributes attributes);
// Implementing `StreamScheduler::StreamProducer`.
- absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
+ absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) override;
size_t bytes_to_send_in_next_message() const override;
@@ -265,7 +266,7 @@ class RRSendQueue : public SendQueue {
OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
absl::optional<DataToSend> Produce(
std::map<StreamID, OutgoingStream>::iterator it,
- TimeMs now,
+ webrtc::Timestamp now,
size_t max_size);
const absl::string_view log_prefix_;
diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
index 9d6da7bdff..632cd8fc19 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc
@@ -29,8 +29,10 @@ namespace dcsctp {
namespace {
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
+using ::webrtc::TimeDelta;
+using ::webrtc::Timestamp;
-constexpr TimeMs kNow = TimeMs(0);
+constexpr Timestamp kNow = Timestamp::Zero();
constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53);
constexpr size_t kMaxQueueSize = 1000;
@@ -181,9 +183,9 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
std::vector<uint8_t> payload(20);
// Default is no expiry
- TimeMs now = kNow;
+ Timestamp now = kNow;
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
- now += DurationMs(1000000);
+ now += TimeDelta::Seconds(1000);
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
SendOptions expires_2_seconds;
@@ -191,17 +193,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
// Add and consume within lifetime
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
- now += DurationMs(2000);
+ now += TimeDelta::Millis(2000);
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
// Add and consume just outside lifetime
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
- now += DurationMs(2001);
+ now += TimeDelta::Millis(2001);
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
// A long time after expiry
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
- now += DurationMs(1000000);
+ now += TimeDelta::Seconds(1000);
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
// Expire one message, but produce the second that is not expired.
@@ -211,7 +213,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
expires_4_seconds.lifetime = DurationMs(4000);
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
- now += DurationMs(2001);
+ now += TimeDelta::Millis(2001);
ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize));
ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize));
@@ -846,8 +848,9 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
/*maybe_delivered=*/false));
EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
- EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
- .has_value());
+ EXPECT_FALSE(
+ buf_.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize)
+ .has_value());
}
TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
diff --git a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h
index 48eaefaf6a..d0d834c901 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h
@@ -17,6 +17,7 @@
#include "absl/types/optional.h"
#include "api/array_view.h"
+#include "api/units/timestamp.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/types.h"
@@ -37,7 +38,7 @@ class SendQueue {
// Partial reliability - RFC3758
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit();
- TimeMs expires_at = TimeMs::InfiniteFuture();
+ webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity();
// Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for
// all other fragments.
@@ -55,7 +56,8 @@ class SendQueue {
//
// `max_size` refers to how many payload bytes that may be produced, not
// including any headers.
- virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0;
+ virtual absl::optional<DataToSend> Produce(webrtc::Timestamp now,
+ size_t max_size) = 0;
// Discards a partially sent message identified by the parameters
// `stream_id` and `message_id`. The `message_id` comes from the returned
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc
index c1d220aaa2..66c4457481 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc
@@ -14,7 +14,6 @@
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
@@ -22,6 +21,7 @@
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
+#include "rtc_base/strings/str_join.h"
namespace dcsctp {
@@ -31,7 +31,7 @@ void StreamScheduler::Stream::SetPriority(StreamPriority priority) {
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
- TimeMs now,
+ webrtc::Timestamp now,
size_t max_size) {
// For non-interleaved streams, avoid rescheduling while still sending a
// message as it needs to be sent in full. For interleaved messaging,
@@ -127,7 +127,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime(
}
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
- TimeMs now,
+ webrtc::Timestamp now,
size_t max_size) {
absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h
index ce836a5826..9d76fc6f56 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h
+++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h
@@ -87,7 +87,7 @@ class StreamScheduler {
// The parameter `max_size` specifies the maximum amount of actual payload
// that may be returned. If these constraints prevents the stream from
// sending some data, `absl::nullopt` should be returned.
- virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
+ virtual absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
size_t max_size) = 0;
// Returns the number of payload bytes that is scheduled to be sent in the
@@ -132,7 +132,8 @@ class StreamScheduler {
// Produces a message from this stream. This will only be called on streams
// that have data.
- absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
+ absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
+ size_t max_size);
void MakeActive(size_t bytes_to_send_next);
void ForceMarkInactive();
@@ -180,7 +181,8 @@ class StreamScheduler {
// `now` and will be used to skip chunks with expired limited lifetime. The
// parameter `max_size` specifies the maximum amount of actual payload that
// may be returned. If no data can be produced, `absl::nullopt` is returned.
- absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
+ absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now,
+ size_t max_size);
std::set<StreamID> ActiveStreamsForTesting() const;
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
index 4f5fb0fb84..42d0b3cd35 100644
--- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
+++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc
@@ -19,9 +19,11 @@ namespace dcsctp {
namespace {
using ::testing::Return;
using ::testing::StrictMock;
+using ::webrtc::Timestamp;
constexpr size_t kMtu = 1000;
constexpr size_t kPayloadSize = 4;
+constexpr Timestamp kNow = Timestamp::Zero();
MATCHER_P(HasDataWithMid, mid, "") {
if (!arg.has_value()) {
@@ -38,12 +40,12 @@ MATCHER_P(HasDataWithMid, mid, "") {
return true;
}
-std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
+std::function<absl::optional<SendQueue::DataToSend>(Timestamp, size_t)>
CreateChunk(OutgoingMessageId message_id,
StreamID sid,
MID mid,
size_t payload_size = kPayloadSize) {
- return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) {
+ return [sid, mid, payload_size, message_id](Timestamp now, size_t max_size) {
return SendQueue::DataToSend(
message_id,
Data(sid, SSN(0), mid, FSN(0), PPID(42),
@@ -56,8 +58,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
size_t packets_to_generate) {
std::map<StreamID, size_t> packet_counts;
for (size_t i = 0; i < packets_to_generate; ++i) {
- absl::optional<SendQueue::DataToSend> data =
- scheduler.Produce(TimeMs(0), kMtu);
+ absl::optional<SendQueue::DataToSend> data = scheduler.Produce(kNow, kMtu);
if (data.has_value()) {
++packet_counts[data->data.stream_id];
}
@@ -69,7 +70,7 @@ class MockStreamProducer : public StreamScheduler::StreamProducer {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
- (TimeMs, size_t),
+ (Timestamp, size_t),
(override));
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
};
@@ -100,7 +101,7 @@ class TestStream {
TEST(StreamSchedulerTest, HasNoActiveStreams) {
StreamScheduler scheduler("", kMtu);
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Stream properties can be set and retrieved
@@ -132,8 +133,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Switches between two streams after every packet.
@@ -168,13 +169,13 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Switches between two streams after every packet, but keeps producing from the
@@ -232,15 +233,15 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Deactivates a stream before it has finished producing all packets.
@@ -259,12 +260,12 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// ... but the stream is made inactive before it can be produced.
stream1->MakeInactive();
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Resumes a paused stream - makes a stream active after inactivating it.
@@ -287,14 +288,14 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
stream1->MakeInactive();
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
stream1->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Iterates between streams, where one is suddenly paused and later resumed.
@@ -330,15 +331,15 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
stream1->MakeInactive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
stream1->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Verifies that packet counts are evenly distributed in round robin scheduling.
@@ -427,18 +428,18 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
stream2->MaybeMakeActive();
// t = 30
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t = 60
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t = 70
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t = 90
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
// t = 140
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t = 210
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Will do weighted fair queuing with three streams having different priority.
@@ -492,24 +493,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
stream3->MaybeMakeActive();
// t ~= 20
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
// t ~= 40
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
// t ~= 50
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t ~= 60
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
// t ~= 80
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t ~= 100
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t ~= 150
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
// t ~= 160
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t ~= 240
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Will do weighted fair queuing with three streams having different priority
@@ -586,24 +587,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
stream3->MaybeMakeActive();
// t ~= 400
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300)));
// t ~= 1400
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301)));
// t ~= 2500
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200)));
// t ~= 2800
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302)));
// t ~= 4000
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100)));
// t ~= 5600
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101)));
// t ~= 6000
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201)));
// t ~= 7000
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202)));
// t ~= 11200
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
// A simple test with two streams of different priority, but sending packets
@@ -723,11 +724,11 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
stream2->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
// Sending large messages with large MTU will not fragment messages and will
@@ -756,9 +757,9 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
stream2->MaybeMakeActive();
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
- EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
- EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1)));
+ EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0)));
+ EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt);
}
} // namespace