diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:27 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:27 +0000 |
commit | 40a355a42d4a9444dc753c04c6608dade2f06a23 (patch) | |
tree | 871fc667d2de662f171103ce5ec067014ef85e61 /third_party/libwebrtc/net/dcsctp | |
parent | Adding upstream version 124.0.1. (diff) | |
download | firefox-40a355a42d4a9444dc753c04c6608dade2f06a23.tar.xz firefox-40a355a42d4a9444dc753c04c6608dade2f06a23.zip |
Adding upstream version 125.0.1.upstream/125.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp')
59 files changed, 1056 insertions, 1098 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/common/BUILD.gn b/third_party/libwebrtc/net/dcsctp/common/BUILD.gn index 78fa0d307e..d496c64a56 100644 --- a/third_party/libwebrtc/net/dcsctp/common/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/common/BUILD.gn @@ -29,12 +29,6 @@ rtc_source_set("sequence_numbers") { sources = [ "sequence_numbers.h" ] } -rtc_source_set("str_join") { - deps = [ "../../../rtc_base:stringutils" ] - sources = [ "str_join.h" ] - absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] -} - if (rtc_include_tests) { rtc_library("dcsctp_common_unittests") { testonly = true @@ -43,7 +37,6 @@ if (rtc_include_tests) { deps = [ ":math", ":sequence_numbers", - ":str_join", "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:gunit_helpers", @@ -52,7 +45,6 @@ if (rtc_include_tests) { sources = [ "math_test.cc", "sequence_numbers_test.cc", - "str_join_test.cc", ] } } diff --git a/third_party/libwebrtc/net/dcsctp/common/str_join.h b/third_party/libwebrtc/net/dcsctp/common/str_join.h deleted file mode 100644 index 04517827b7..0000000000 --- a/third_party/libwebrtc/net/dcsctp/common/str_join.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#ifndef NET_DCSCTP_COMMON_STR_JOIN_H_ -#define NET_DCSCTP_COMMON_STR_JOIN_H_ - -#include <string> - -#include "absl/strings/string_view.h" -#include "rtc_base/strings/string_builder.h" - -namespace dcsctp { - -template <typename Range> -std::string StrJoin(const Range& seq, absl::string_view delimiter) { - rtc::StringBuilder sb; - int idx = 0; - - for (const typename Range::value_type& elem : seq) { - if (idx > 0) { - sb << delimiter; - } - sb << elem; - - ++idx; - } - return sb.Release(); -} - -template <typename Range, typename Functor> -std::string StrJoin(const Range& seq, - absl::string_view delimiter, - const Functor& fn) { - rtc::StringBuilder sb; - int idx = 0; - - for (const typename Range::value_type& elem : seq) { - if (idx > 0) { - sb << delimiter; - } - fn(sb, elem); - - ++idx; - } - return sb.Release(); -} - -} // namespace dcsctp - -#endif // NET_DCSCTP_COMMON_STR_JOIN_H_ diff --git a/third_party/libwebrtc/net/dcsctp/common/str_join_test.cc b/third_party/libwebrtc/net/dcsctp/common/str_join_test.cc deleted file mode 100644 index dbfd92c1cf..0000000000 --- a/third_party/libwebrtc/net/dcsctp/common/str_join_test.cc +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#include "net/dcsctp/common/str_join.h" - -#include <string> -#include <utility> -#include <vector> - -#include "test/gmock.h" - -namespace dcsctp { -namespace { - -TEST(StrJoinTest, CanJoinStringsFromVector) { - std::vector<std::string> strings = {"Hello", "World"}; - std::string s = StrJoin(strings, " "); - EXPECT_EQ(s, "Hello World"); -} - -TEST(StrJoinTest, CanJoinNumbersFromArray) { - std::array<int, 3> numbers = {1, 2, 3}; - std::string s = StrJoin(numbers, ","); - EXPECT_EQ(s, "1,2,3"); -} - -TEST(StrJoinTest, CanFormatElementsWhileJoining) { - std::vector<std::pair<std::string, std::string>> pairs = { - {"hello", "world"}, {"foo", "bar"}, {"fum", "gazonk"}}; - std::string s = StrJoin(pairs, ",", - [&](rtc::StringBuilder& sb, - const std::pair<std::string, std::string>& p) { - sb << p.first << "=" << p.second; - }); - EXPECT_EQ(s, "hello=world,foo=bar,fum=gazonk"); -} - -} // namespace -} // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/fuzzers/dcsctp_fuzzers.h b/third_party/libwebrtc/net/dcsctp/fuzzers/dcsctp_fuzzers.h index 90cfa35099..49aa7f0430 100644 --- a/third_party/libwebrtc/net/dcsctp/fuzzers/dcsctp_fuzzers.h +++ b/third_party/libwebrtc/net/dcsctp/fuzzers/dcsctp_fuzzers.h @@ -64,7 +64,7 @@ class FuzzerCallbacks : public DcSctpSocketCallbacks { // The fuzzer timeouts don't implement |precision|. return std::make_unique<FuzzerTimeout>(active_timeouts_); } - TimeMs TimeMillis() override { return TimeMs(42); } + webrtc::Timestamp Now() override { return webrtc::Timestamp::Millis(42); } uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return kRandomValue; } diff --git a/third_party/libwebrtc/net/dcsctp/packet/BUILD.gn b/third_party/libwebrtc/net/dcsctp/packet/BUILD.gn index 7abccc004b..a0c9d8d4df 100644 --- a/third_party/libwebrtc/net/dcsctp/packet/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/packet/BUILD.gn @@ -72,7 +72,6 @@ rtc_library("parameter") { "../../../rtc_base:stringutils", "../common:internal_types", "../common:math", - "../common:str_join", "../public:types", ] sources = [ @@ -120,7 +119,6 @@ rtc_library("error_cause") { "../../../rtc_base:stringutils", "../common:internal_types", "../common:math", - "../common:str_join", "../packet:bounded_io", "../public:types", ] @@ -172,7 +170,6 @@ rtc_library("chunk") { "../../../rtc_base:logging", "../../../rtc_base:stringutils", "../common:math", - "../common:str_join", "../packet:bounded_io", ] sources = [ diff --git a/third_party/libwebrtc/net/dcsctp/packet/chunk/sack_chunk.cc b/third_party/libwebrtc/net/dcsctp/packet/chunk/sack_chunk.cc index d80e430082..179f7ea379 100644 --- a/third_party/libwebrtc/net/dcsctp/packet/chunk/sack_chunk.cc +++ b/third_party/libwebrtc/net/dcsctp/packet/chunk/sack_chunk.cc @@ -18,11 +18,11 @@ #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/bounded_byte_reader.h" #include "net/dcsctp/packet/bounded_byte_writer.h" #include "net/dcsctp/packet/tlv_trait.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" #include "rtc_base/strings/string_builder.h" namespace dcsctp { diff --git a/third_party/libwebrtc/net/dcsctp/packet/error_cause/missing_mandatory_parameter_cause.cc b/third_party/libwebrtc/net/dcsctp/packet/error_cause/missing_mandatory_parameter_cause.cc index b89f86e43e..679439d4c2 100644 --- a/third_party/libwebrtc/net/dcsctp/packet/error_cause/missing_mandatory_parameter_cause.cc +++ b/third_party/libwebrtc/net/dcsctp/packet/error_cause/missing_mandatory_parameter_cause.cc @@ -18,11 +18,11 @@ #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/bounded_byte_reader.h" #include "net/dcsctp/packet/bounded_byte_writer.h" #include "net/dcsctp/packet/tlv_trait.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" #include "rtc_base/strings/string_builder.h" namespace dcsctp { @@ -83,7 +83,7 @@ void MissingMandatoryParameterCause::SerializeTo( std::string MissingMandatoryParameterCause::ToString() const { rtc::StringBuilder sb; sb << "Missing Mandatory Parameter, missing_parameter_types=" - << StrJoin(missing_parameter_types_, ","); + << webrtc::StrJoin(missing_parameter_types_, ","); return sb.Release(); } diff --git a/third_party/libwebrtc/net/dcsctp/packet/parameter/supported_extensions_parameter.cc b/third_party/libwebrtc/net/dcsctp/packet/parameter/supported_extensions_parameter.cc index 6a8fb214de..87a5bd9b52 100644 --- a/third_party/libwebrtc/net/dcsctp/packet/parameter/supported_extensions_parameter.cc +++ b/third_party/libwebrtc/net/dcsctp/packet/parameter/supported_extensions_parameter.cc @@ -16,10 +16,10 @@ #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/bounded_byte_reader.h" #include "net/dcsctp/packet/bounded_byte_writer.h" #include "net/dcsctp/packet/tlv_trait.h" +#include "rtc_base/strings/str_join.h" #include "rtc_base/strings/string_builder.h" namespace dcsctp { @@ -59,7 +59,7 @@ void SupportedExtensionsParameter::SerializeTo( std::string SupportedExtensionsParameter::ToString() const { rtc::StringBuilder sb; - sb << "Supported Extensions (" << StrJoin(chunk_types_, ", ") << ")"; + sb << "Supported Extensions (" << webrtc::StrJoin(chunk_types_, ", ") << ")"; return sb.Release(); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/public/BUILD.gn b/third_party/libwebrtc/net/dcsctp/public/BUILD.gn index 6cb289bf5b..8af0fd88c4 100644 --- a/third_party/libwebrtc/net/dcsctp/public/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/public/BUILD.gn @@ -11,6 +11,7 @@ import("../../../webrtc.gni") rtc_source_set("types") { deps = [ "../../../api:array_view", + "../../../api/units:time_delta", "../../../rtc_base:strong_alias", ] sources = [ @@ -26,6 +27,7 @@ rtc_source_set("socket") { ":types", "../../../api:array_view", "../../../api/task_queue:task_queue", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:strong_alias", ] diff --git a/third_party/libwebrtc/net/dcsctp/public/dcsctp_socket.h b/third_party/libwebrtc/net/dcsctp/public/dcsctp_socket.h index 3cfb8052f8..d0a81eaeb2 100644 --- a/third_party/libwebrtc/net/dcsctp/public/dcsctp_socket.h +++ b/third_party/libwebrtc/net/dcsctp/public/dcsctp_socket.h @@ -18,6 +18,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" #include "net/dcsctp/public/dcsctp_handover_state.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_options.h" @@ -323,9 +324,21 @@ class DcSctpSocketCallbacks { // Returns the current time in milliseconds (from any epoch). // + // TODO(bugs.webrtc.org/15593): This method is deprecated, see `Now`. + // // Note that it's NOT ALLOWED to call into this library from within this // callback. - virtual TimeMs TimeMillis() = 0; + virtual TimeMs TimeMillis() { return TimeMs(0); } + + // Returns the current time (from any epoch). + // + // This callback will eventually replace `TimeMillis()`. + // + // Note that it's NOT ALLOWED to call into this library from within this + // callback. + virtual webrtc::Timestamp Now() { + return webrtc::Timestamp::Millis(*TimeMillis()); + } // Called when the library needs a random number uniformly distributed between // `low` (inclusive) and `high` (exclusive). The random numbers used by the diff --git a/third_party/libwebrtc/net/dcsctp/public/types.h b/third_party/libwebrtc/net/dcsctp/public/types.h index 7d69875d1a..02e2ce1e5e 100644 --- a/third_party/libwebrtc/net/dcsctp/public/types.h +++ b/third_party/libwebrtc/net/dcsctp/public/types.h @@ -14,6 +14,7 @@ #include <cstdint> #include <limits> +#include "api/units/time_delta.h" #include "rtc_base/strong_alias.h" namespace dcsctp { @@ -41,6 +42,10 @@ class DurationMs : public webrtc::StrongAlias<class DurationMsTag, int32_t> { constexpr explicit DurationMs(const UnderlyingType& v) : webrtc::StrongAlias<class DurationMsTag, int32_t>(v) {} + constexpr explicit DurationMs(webrtc::TimeDelta v) + : webrtc::StrongAlias<class DurationMsTag, int32_t>( + v.IsInfinite() ? InfiniteDuration() : DurationMs(v.ms())) {} + static constexpr DurationMs InfiniteDuration() { return DurationMs(std::numeric_limits<int32_t>::max()); } @@ -58,6 +63,11 @@ class DurationMs : public webrtc::StrongAlias<class DurationMsTag, int32_t> { value_ *= factor; return *this; } + constexpr webrtc::TimeDelta ToTimeDelta() const { + return *this == DurationMs::InfiniteDuration() + ? webrtc::TimeDelta::PlusInfinity() + : webrtc::TimeDelta::Millis(value_); + } }; constexpr inline DurationMs operator+(DurationMs lhs, DurationMs rhs) { diff --git a/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn b/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn index f5f5b7ed81..15b9f60f3d 100644 --- a/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/rx/BUILD.gn @@ -95,10 +95,10 @@ rtc_library("reassembly_queue") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_set", "../common:internal_types", "../common:sequence_numbers", - "../common:str_join", "../packet:chunk", "../packet:data", "../packet:parameter", diff --git a/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc b/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc index 07192fda54..0e9e4fcb60 100644 --- a/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc +++ b/third_party/libwebrtc/net/dcsctp/rx/data_tracker_test.cc @@ -29,6 +29,8 @@ using ::testing::ElementsAre; using ::testing::IsEmpty; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr size_t kArwnd = 10000; constexpr TSN kInitialTSN(11); @@ -42,8 +44,8 @@ class DataTrackerTest : public testing::Test { }), timer_(timer_manager_.CreateTimer( "test/delayed_ack", - []() { return absl::nullopt; }, - TimerOptions(DurationMs(0)))), + []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Zero()))), tracker_( std::make_unique<DataTracker>("log: ", timer_.get(), kInitialTSN)) { } @@ -71,7 +73,7 @@ class DataTrackerTest : public testing::Test { tracker_->RestoreFromState(state); } - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; std::unique_ptr<Timer> timer_; @@ -784,5 +786,16 @@ TEST_F(DataTrackerTest, DoesNotAcceptGapsWithDuplicateData) { EXPECT_FALSE(tracker_->Observe(TSN(12))); } +TEST_F(DataTrackerTest, NotReadyForHandoverWhenHavingTsnGaps) { + tracker_->Observe(TSN(10)); + tracker_->Observe(TSN(12)); + EXPECT_EQ(tracker_->GetHandoverReadiness(), + HandoverReadinessStatus().Add( + HandoverUnreadinessReason::kDataTrackerTsnBlocksPending)); + + tracker_->Observe(TSN(11)); + EXPECT_EQ(tracker_->GetHandoverReadiness(), HandoverReadinessStatus()); +} + } // namespace } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc index 573443635c..4c223d0532 100644 --- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc @@ -23,7 +23,6 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/common/sequence_numbers.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" @@ -34,6 +33,7 @@ #include "net/dcsctp/rx/reassembly_streams.h" #include "net/dcsctp/rx/traditional_reassembly_streams.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { namespace { @@ -57,8 +57,6 @@ ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, : log_prefix_(log_prefix), max_size_bytes_(max_size_bytes), watermark_bytes_(max_size_bytes * kHighWatermarkLimit), - last_assembled_tsn_watermark_( - tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))), last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)), streams_(CreateStreams( log_prefix_, @@ -180,33 +178,9 @@ void ReassemblyQueue::AddReassembledMessage( << ", ppid=" << *message.ppid() << ", payload=" << message.payload().size() << " bytes"; - for (const UnwrappedTSN tsn : tsns) { - if (tsn == last_assembled_tsn_watermark_.next_value()) { - // Update watermark, or insert into delivered_tsns_ - last_assembled_tsn_watermark_.Increment(); - } else { - delivered_tsns_.insert(tsn); - } - } - - // With new TSNs in delivered_tsns, gaps might be filled. - MaybeMoveLastAssembledWatermarkFurther(); - reassembled_messages_.emplace_back(std::move(message)); } -void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { - // `delivered_tsns_` contain TSNS when there is a gap between ranges of - // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to - // that list, because if so, it can be moved. - while (!delivered_tsns_.empty() && - *delivered_tsns_.begin() == - last_assembled_tsn_watermark_.next_value()) { - last_assembled_tsn_watermark_.Increment(); - delivered_tsns_.erase(delivered_tsns_.begin()); - } -} - void ReassemblyQueue::HandleForwardTsn( TSN new_cumulative_tsn, rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) { @@ -228,33 +202,19 @@ void ReassemblyQueue::HandleForwardTsn( RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() << " - performing."; - last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); - delivered_tsns_.erase(delivered_tsns_.begin(), - delivered_tsns_.upper_bound(tsn)); - MaybeMoveLastAssembledWatermarkFurther(); queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams); RTC_DCHECK(IsConsistent()); } bool ReassemblyQueue::IsConsistent() const { - // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be - // adjacent. - if (!delivered_tsns_.empty() && - last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) { - return false; - } - // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively - // enforced in this class. This comparison will still trigger if queued_bytes_ - // became "negative". - return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_); + // enforced in this class. But in case it wraps around (becomes negative, but + // as it's unsigned, that would wrap to very big), this would trigger. + return (queued_bytes_ <= 2 * max_size_bytes_); } HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const { HandoverReadinessStatus status = streams_->GetHandoverReadiness(); - if (!delivered_tsns_.empty()) { - status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap); - } if (deferred_reset_streams_.has_value()) { status.Add(HandoverUnreadinessReason::kStreamResetDeferred); } @@ -262,7 +222,6 @@ HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const { } void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) { - state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value(); state.rx.last_completed_deferred_reset_req_sn = last_completed_reset_req_seq_nbr_.value(); streams_->AddHandoverState(state); @@ -272,8 +231,6 @@ void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { // Validate that the component is in pristine state. RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0)); - last_assembled_tsn_watermark_ = - tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn)); last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn); streams_->RestoreFromState(state); diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h index 761ec3556c..82a66d089c 100644 --- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h +++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.h @@ -142,19 +142,12 @@ class ReassemblyQueue { bool IsConsistent() const; void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns, DcSctpMessage message); - void MaybeMoveLastAssembledWatermarkFurther(); const absl::string_view log_prefix_; const size_t max_size_bytes_; const size_t watermark_bytes_; UnwrappedTSN::Unwrapper tsn_unwrapper_; - // Whenever a message has been assembled, either increase - // `last_assembled_tsn_watermark_` or - if there are gaps - add the message's - // TSNs into delivered_tsns_ so that messages are not re-delivered on - // duplicate chunks. - UnwrappedTSN last_assembled_tsn_watermark_; - std::set<UnwrappedTSN> delivered_tsns_; // Messages that have been reassembled, and will be returned by // `FlushMessages`. std::vector<DcSctpMessage> reassembled_messages_; diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc index fd8c423a5f..81bb7af963 100644 --- a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue_test.cc @@ -252,33 +252,6 @@ TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) { ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload))); } -TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) { - ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); - reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B")); - EXPECT_FALSE(reasm.HasMessages()); - - reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE")); - EXPECT_TRUE(reasm.HasMessages()); - EXPECT_EQ( - reasm.GetHandoverReadiness(), - HandoverReadinessStatus() - .Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap) - .Add( - HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); - - EXPECT_THAT(reasm.FlushMessages(), - ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload))); - EXPECT_EQ( - reasm.GetHandoverReadiness(), - HandoverReadinessStatus() - .Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap) - .Add( - HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks)); - - reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>()); - EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus()); -} - TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) { ReassemblyQueue reasm("log: ", TSN(10), kBufferSize); reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); diff --git a/third_party/libwebrtc/net/dcsctp/socket/BUILD.gn b/third_party/libwebrtc/net/dcsctp/socket/BUILD.gn index 681ddd47e9..04f61e5b72 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/socket/BUILD.gn @@ -11,6 +11,7 @@ import("../../../webrtc.gni") rtc_source_set("context") { sources = [ "context.h" ] deps = [ + "../../../api/units:time_delta", "../common:internal_types", "../packet:sctp_packet", "../public:socket", @@ -23,6 +24,7 @@ rtc_library("heartbeat_handler") { deps = [ ":context", "../../../api:array_view", + "../../../api/units:time_delta", "../../../rtc_base:checks", "../../../rtc_base:logging", "../packet:bounded_io", @@ -48,11 +50,12 @@ rtc_library("stream_reset_handler") { deps = [ ":context", "../../../api:array_view", + "../../../api/units:time_delta", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_set", "../common:internal_types", - "../common:str_join", "../packet:chunk", "../packet:parameter", "../packet:sctp_packet", @@ -97,6 +100,7 @@ rtc_library("transmission_control_block") { ":stream_reset_handler", "../../../api:array_view", "../../../api/task_queue:task_queue", + "../../../api/units:time_delta", "../../../rtc_base:checks", "../../../rtc_base:logging", "../../../rtc_base:stringutils", diff --git a/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.cc b/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.cc index 123526e782..0a24020167 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.cc @@ -28,7 +28,7 @@ class MessageDeliverer { } private: - struct State : public rtc::RefCountInterface { + struct State : public webrtc::RefCountInterface { explicit State(DcSctpMessage&& m) : has_delivered(false), message(std::move(m)) {} bool has_delivered; @@ -70,6 +70,8 @@ std::unique_ptr<Timeout> CallbackDeferrer::CreateTimeout( } TimeMs CallbackDeferrer::TimeMillis() { + // This should not be called by the library - it's migrated to `Now()`. + RTC_DCHECK(false); // Will not be deferred - call directly. return underlying_.TimeMillis(); } diff --git a/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.h b/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.h index 1c35dda6cf..6659e87155 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.h +++ b/third_party/libwebrtc/net/dcsctp/socket/callback_deferrer.h @@ -65,6 +65,7 @@ class CallbackDeferrer : public DcSctpSocketCallbacks { std::unique_ptr<Timeout> CreateTimeout( webrtc::TaskQueueBase::DelayPrecision precision) override; TimeMs TimeMillis() override; + webrtc::Timestamp Now() override { return underlying_.Now(); } uint32_t GetRandomInt(uint32_t low, uint32_t high) override; void OnMessageReceived(DcSctpMessage message) override; void OnError(ErrorKind error, absl::string_view message) override; diff --git a/third_party/libwebrtc/net/dcsctp/socket/context.h b/third_party/libwebrtc/net/dcsctp/socket/context.h index eca5b9e4fb..8e970e8c8e 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/context.h +++ b/third_party/libwebrtc/net/dcsctp/socket/context.h @@ -13,6 +13,7 @@ #include <cstdint> #include "absl/strings/string_view.h" +#include "api/units/time_delta.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/public/dcsctp_socket.h" @@ -39,11 +40,11 @@ class Context { // Returns the socket callbacks. virtual DcSctpSocketCallbacks& callbacks() const = 0; - // Observes a measured RTT value, in milliseconds. - virtual void ObserveRTT(DurationMs rtt_ms) = 0; + // Observes a measured RTT value. + virtual void ObserveRTT(webrtc::TimeDelta rtt_ms) = 0; // Returns the current Retransmission Timeout (rto) value, in milliseconds. - virtual DurationMs current_rto() const = 0; + virtual webrtc::TimeDelta current_rto() const = 0; // Increments the transmission error counter, given a human readable reason. virtual bool IncrementTxErrorCounter(absl::string_view reason) = 0; diff --git a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc index 32bcdaaacf..f0f9590943 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc @@ -82,6 +82,8 @@ namespace dcsctp { namespace { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // https://tools.ietf.org/html/rfc4960#section-5.1 constexpr uint32_t kMinVerificationTag = 1; @@ -187,19 +189,19 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, t1_init_(timer_manager_.CreateTimer( "t1-init", absl::bind_front(&DcSctpSocket::OnInitTimerExpiry, this), - TimerOptions(options.t1_init_timeout, + TimerOptions(options.t1_init_timeout.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, options.max_init_retransmits))), t1_cookie_(timer_manager_.CreateTimer( "t1-cookie", absl::bind_front(&DcSctpSocket::OnCookieTimerExpiry, this), - TimerOptions(options.t1_cookie_timeout, + TimerOptions(options.t1_cookie_timeout.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, options.max_init_retransmits))), t2_shutdown_(timer_manager_.CreateTimer( "t2-shutdown", absl::bind_front(&DcSctpSocket::OnShutdownTimerExpiry, this), - TimerOptions(options.t2_shutdown_timeout, + TimerOptions(options.t2_shutdown_timeout.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, options.max_retransmissions))), packet_sender_(callbacks_, @@ -518,7 +520,7 @@ SendStatus DcSctpSocket::Send(DcSctpMessage message, return SendStatus::kErrorResourceExhaustion; } - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); ++metrics_.tx_messages_count; send_queue_.Add(now, std::move(message), send_options); if (tcb_ != nullptr) { @@ -600,11 +602,11 @@ absl::optional<Metrics> DcSctpSocket::GetMetrics() const { Metrics metrics = metrics_; metrics.cwnd_bytes = tcb_->cwnd(); - metrics.srtt_ms = tcb_->current_srtt().value(); + metrics.srtt_ms = tcb_->current_srtt().ms(); size_t packet_payload_size = options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize; metrics.unack_data_count = - tcb_->retransmission_queue().outstanding_items() + + tcb_->retransmission_queue().unacked_items() + (send_queue_.total_buffered_amount() + packet_payload_size - 1) / packet_payload_size; metrics.peer_rwnd_bytes = tcb_->retransmission_queue().rwnd(); @@ -768,7 +770,7 @@ void DcSctpSocket::ReceivePacket(rtc::ArrayView<const uint8_t> data) { ++metrics_.rx_packets_count; if (packet_observer_ != nullptr) { - packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data); + packet_observer_->OnReceivedPacket(TimeMs(callbacks_.Now().ms()), data); } absl::optional<SctpPacket> packet = SctpPacket::Parse(data, options_); @@ -921,7 +923,7 @@ bool DcSctpSocket::HandleUnrecognizedChunk( return continue_processing; } -absl::optional<DurationMs> DcSctpSocket::OnInitTimerExpiry() { +TimeDelta DcSctpSocket::OnInitTimerExpiry() { RTC_DLOG(LS_VERBOSE) << log_prefix() << "Timer " << t1_init_->name() << " has expired: " << t1_init_->expiration_count() << "/" << t1_init_->options().max_restarts.value_or(-1); @@ -933,10 +935,10 @@ absl::optional<DurationMs> DcSctpSocket::OnInitTimerExpiry() { InternalClose(ErrorKind::kTooManyRetries, "No INIT_ACK received"); } RTC_DCHECK(IsConsistent()); - return absl::nullopt; + return TimeDelta::Zero(); } -absl::optional<DurationMs> DcSctpSocket::OnCookieTimerExpiry() { +TimeDelta DcSctpSocket::OnCookieTimerExpiry() { // https://tools.ietf.org/html/rfc4960#section-4 // "If the T1-cookie timer expires, the endpoint MUST retransmit COOKIE // ECHO and restart the T1-cookie timer without changing state. This MUST @@ -951,16 +953,16 @@ absl::optional<DurationMs> DcSctpSocket::OnCookieTimerExpiry() { RTC_DCHECK(state_ == State::kCookieEchoed); if (t1_cookie_->is_running()) { - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); } else { InternalClose(ErrorKind::kTooManyRetries, "No COOKIE_ACK received"); } RTC_DCHECK(IsConsistent()); - return absl::nullopt; + return TimeDelta::Zero(); } -absl::optional<DurationMs> DcSctpSocket::OnShutdownTimerExpiry() { +TimeDelta DcSctpSocket::OnShutdownTimerExpiry() { RTC_DLOG(LS_VERBOSE) << log_prefix() << "Timer " << t2_shutdown_->name() << " has expired: " << t2_shutdown_->expiration_count() << "/" @@ -980,7 +982,7 @@ absl::optional<DurationMs> DcSctpSocket::OnShutdownTimerExpiry() { InternalClose(ErrorKind::kTooManyRetries, "No SHUTDOWN_ACK received"); RTC_DCHECK(IsConsistent()); - return absl::nullopt; + return TimeDelta::Zero(); } // https://tools.ietf.org/html/rfc4960#section-9.2 @@ -996,7 +998,7 @@ void DcSctpSocket::OnSentPacket(rtc::ArrayView<const uint8_t> packet, // The packet observer is invoked even if the packet was failed to be sent, to // indicate an attempt was made. if (packet_observer_ != nullptr) { - packet_observer_->OnSentPacket(callbacks_.TimeMillis(), packet); + packet_observer_->OnSentPacket(TimeMs(callbacks_.Now().ms()), packet); } if (status == SendPacketStatus::kSuccess) { @@ -1282,7 +1284,7 @@ void DcSctpSocket::HandleInitAck( // The connection isn't fully established just yet. tcb_->SetCookieEchoChunk(CookieEchoChunk(cookie->data())); - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); t1_cookie_->Start(); } @@ -1351,7 +1353,7 @@ void DcSctpSocket::HandleCookieEcho( // "A COOKIE ACK chunk may be bundled with any pending DATA chunks (and/or // SACK chunks), but the COOKIE ACK chunk MUST be the first chunk in the // packet." - tcb_->SendBufferedPackets(b, callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(b, callbacks_.Now()); } bool DcSctpSocket::HandleCookieEchoWithTCB(const CommonHeader& header, @@ -1449,7 +1451,7 @@ void DcSctpSocket::HandleCookieAck( t1_cookie_->Stop(); tcb_->ClearCookieEchoChunk(); SetState(State::kEstablished, "COOKIE_ACK received"); - tcb_->SendBufferedPackets(callbacks_.TimeMillis()); + tcb_->SendBufferedPackets(callbacks_.Now()); callbacks_.OnConnected(); } @@ -1465,7 +1467,7 @@ void DcSctpSocket::HandleSack(const CommonHeader& header, absl::optional<SackChunk> chunk = SackChunk::Parse(descriptor.data); if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); SackChunk sack = ChunkValidators::Clean(*std::move(chunk)); if (tcb_->retransmission_queue().HandleSack(now, sack)) { @@ -1554,7 +1556,7 @@ void DcSctpSocket::HandleError(const CommonHeader& header, void DcSctpSocket::HandleReconfig( const CommonHeader& header, const SctpPacket::ChunkDescriptor& descriptor) { - TimeMs now = callbacks_.TimeMillis(); + Timestamp now = callbacks_.Now(); absl::optional<ReConfigChunk> chunk = ReConfigChunk::Parse(descriptor.data); if (ValidateParseSuccess(chunk) && ValidateHasTCB()) { tcb_->stream_reset_handler().HandleReConfig(*std::move(chunk)); @@ -1718,7 +1720,7 @@ void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) { } void DcSctpSocket::MaybeSendShutdownOrAck() { - if (tcb_->retransmission_queue().outstanding_bytes() != 0) { + if (tcb_->retransmission_queue().unacked_bytes() != 0) { return; } diff --git a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.h b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.h index f91eb3ead4..deb6ee23e7 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.h +++ b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.h @@ -155,9 +155,9 @@ class DcSctpSocket : public DcSctpSocketInterface { // Closes the association, because of too many retransmission errors. void CloseConnectionBecauseOfTooManyTransmissionErrors(); // Timer expiration handlers - absl::optional<DurationMs> OnInitTimerExpiry(); - absl::optional<DurationMs> OnCookieTimerExpiry(); - absl::optional<DurationMs> OnShutdownTimerExpiry(); + webrtc::TimeDelta OnInitTimerExpiry(); + webrtc::TimeDelta OnCookieTimerExpiry(); + webrtc::TimeDelta OnShutdownTimerExpiry(); void OnSentPacket(rtc::ArrayView<const uint8_t> packet, SendPacketStatus status); // Sends SHUTDOWN or SHUTDOWN-ACK if the socket is shutting down and if all diff --git a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_network_test.cc b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_network_test.cc index f097bfa095..f73ecce445 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_network_test.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_network_test.cc @@ -55,6 +55,8 @@ using ::testing::AllOf; using ::testing::Ge; using ::testing::Le; using ::testing::SizeIs; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr StreamID kStreamId(1); constexpr PPID kPpid(53); @@ -142,13 +144,13 @@ class SctpActor : public DcSctpSocketCallbacks { emulated_socket_(emulated_socket), timeout_factory_( *thread_, - [this]() { return TimeMillis(); }, + [this]() { return TimeMs(Now().ms()); }, [this](dcsctp::TimeoutID timeout_id) { sctp_socket_.HandleTimeout(timeout_id); }), random_(GetUniqueSeed()), sctp_socket_(name, *this, nullptr, sctp_options), - last_bandwidth_printout_(TimeMs(TimeMillis())) { + last_bandwidth_printout_(Now()) { emulated_socket.SetReceiver([this](rtc::CopyOnWriteBuffer buf) { // The receiver will be executed on the NetworkEmulation task queue, but // the dcSCTP socket is owned by `thread_` and is not thread-safe. @@ -157,11 +159,11 @@ class SctpActor : public DcSctpSocketCallbacks { } void PrintBandwidth() { - TimeMs now = TimeMillis(); - DurationMs duration = now - last_bandwidth_printout_; + Timestamp now = Now(); + TimeDelta duration = now - last_bandwidth_printout_; double bitrate_mbps = - static_cast<double>(received_bytes_ * 8) / *duration / 1000; + static_cast<double>(received_bytes_ * 8) / duration.ms() / 1000; RTC_LOG(LS_INFO) << log_prefix() << rtc::StringFormat("Received %0.2f Mbps", bitrate_mbps); @@ -185,7 +187,7 @@ class SctpActor : public DcSctpSocketCallbacks { return timeout_factory_.CreateTimeout(precision); } - TimeMs TimeMillis() override { return TimeMs(rtc::TimeMillis()); } + Timestamp Now() override { return Timestamp::Millis(rtc::TimeMillis()); } uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return random_.Rand(low, high); @@ -314,7 +316,7 @@ class SctpActor : public DcSctpSocketCallbacks { DcSctpSocket sctp_socket_; size_t received_bytes_ = 0; absl::optional<DcSctpMessage> last_received_message_; - TimeMs last_bandwidth_printout_; + Timestamp last_bandwidth_printout_; // Per-second received bitrates, in Mbps std::vector<double> received_bitrate_mbps_; webrtc::ScopedTaskSafety safety_; diff --git a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_test.cc b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_test.cc index 13202846ac..dc76b80a37 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket_test.cc @@ -73,6 +73,8 @@ using ::testing::Not; using ::testing::Property; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr SendOptions kSendOptions; constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20; @@ -269,7 +271,7 @@ void RunTimers(SocketUnderTest& s) { } } -void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, DurationMs duration) { +void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, TimeDelta duration) { a.cb.AdvanceTime(duration); z.cb.AdvanceTime(duration); @@ -282,14 +284,14 @@ void AdvanceTime(SocketUnderTest& a, SocketUnderTest& z, DurationMs duration) { void ExchangeMessagesAndAdvanceTime( SocketUnderTest& a, SocketUnderTest& z, - DurationMs max_timeout = DurationMs(10000)) { - TimeMs time_started = a.cb.TimeMillis(); - while (a.cb.TimeMillis() - time_started < max_timeout) { + TimeDelta max_timeout = TimeDelta::Seconds(10)) { + Timestamp time_started = a.cb.Now(); + while (a.cb.Now() - time_started < max_timeout) { ExchangeMessages(a, z); - DurationMs time_to_next_timeout = + TimeDelta time_to_next_timeout = std::min(a.cb.GetTimeToNextTimeout(), z.cb.GetTimeToNextTimeout()); - if (time_to_next_timeout == DurationMs::InfiniteDuration()) { + if (time_to_next_timeout.IsPlusInfinity()) { // No more pending timer. return; } @@ -535,7 +537,7 @@ TEST(DcSctpSocketTest, EstablishConnectionLostCookieAck) { EXPECT_EQ(z.socket.state(), SocketState::kConnected); // This will make A re-send the COOKIE_ECHO - AdvanceTime(a, z, DurationMs(a.options.t1_cookie_timeout)); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta()); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -555,7 +557,7 @@ TEST(DcSctpSocketTest, ResendInitAndEstablishConnection) { EXPECT_THAT(a.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsChunkType(InitChunk::kType)))); - AdvanceTime(a, z, a.options.t1_init_timeout); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta()); // Z reads INIT, produces INIT_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -581,7 +583,7 @@ TEST(DcSctpSocketTest, ResendingInitTooManyTimesAborts) { HasChunks(ElementsAre(IsChunkType(InitChunk::kType)))); for (int i = 0; i < *a.options.max_init_retransmits; ++i) { - AdvanceTime(a, z, a.options.t1_init_timeout * (1 << i)); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta() * (1 << i)); // INIT is resent EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -590,8 +592,9 @@ TEST(DcSctpSocketTest, ResendingInitTooManyTimesAborts) { // Another timeout, after the max init retransmits. EXPECT_CALL(a.cb, OnAborted).Times(1); - AdvanceTime( - a, z, a.options.t1_init_timeout * (1 << *a.options.max_init_retransmits)); + AdvanceTime(a, z, + a.options.t1_init_timeout.ToTimeDelta() * + (1 << *a.options.max_init_retransmits)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); } @@ -611,7 +614,7 @@ TEST(DcSctpSocketTest, ResendCookieEchoAndEstablishConnection) { EXPECT_THAT(a.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsChunkType(CookieEchoChunk::kType)))); - AdvanceTime(a, z, a.options.t1_init_timeout); + AdvanceTime(a, z, a.options.t1_init_timeout.ToTimeDelta()); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -638,7 +641,7 @@ TEST(DcSctpSocketTest, ResendingCookieEchoTooManyTimesAborts) { HasChunks(ElementsAre(IsChunkType(CookieEchoChunk::kType)))); for (int i = 0; i < *a.options.max_init_retransmits; ++i) { - AdvanceTime(a, z, a.options.t1_cookie_timeout * (1 << i)); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta() * (1 << i)); // COOKIE_ECHO is resent EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -647,9 +650,9 @@ TEST(DcSctpSocketTest, ResendingCookieEchoTooManyTimesAborts) { // Another timeout, after the max init retransmits. EXPECT_CALL(a.cb, OnAborted).Times(1); - AdvanceTime( - a, z, - a.options.t1_cookie_timeout * (1 << *a.options.max_init_retransmits)); + AdvanceTime(a, z, + a.options.t1_cookie_timeout.ToTimeDelta() * + (1 << *a.options.max_init_retransmits)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); } @@ -680,11 +683,13 @@ TEST(DcSctpSocketTest, DoesntSendMorePacketsUntilCookieAckHasBeenReceived) { // will be T1-COOKIE that drives retransmissions, so when the T3-RTX expires, // nothing should be retransmitted. ASSERT_TRUE(a.options.rto_initial < a.options.t1_cookie_timeout); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); // When T1-COOKIE expires, both the COOKIE-ECHO and DATA should be present. - AdvanceTime(a, z, a.options.t1_cookie_timeout - a.options.rto_initial); + AdvanceTime(a, z, + a.options.t1_cookie_timeout.ToTimeDelta() - + a.options.rto_initial.ToTimeDelta()); // And this COOKIE-ECHO and DATA is also lost - never received by Z. EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -694,7 +699,7 @@ TEST(DcSctpSocketTest, DoesntSendMorePacketsUntilCookieAckHasBeenReceived) { EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); // COOKIE_ECHO has exponential backoff. - AdvanceTime(a, z, a.options.t1_cookie_timeout * 2); + AdvanceTime(a, z, a.options.t1_cookie_timeout.ToTimeDelta() * 2); // Z reads COOKIE_ECHO, produces COOKIE_ACK z.socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -747,7 +752,7 @@ TEST(DcSctpSocketTest, ShutdownTimerExpiresTooManyTimeClosesConnection) { EXPECT_EQ(a.socket.state(), SocketState::kShuttingDown); for (int i = 0; i < *a.options.max_retransmissions; ++i) { - AdvanceTime(a, z, DurationMs(a.options.rto_initial * (1 << i))); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta() * (1 << i)); // Dropping every shutdown chunk. EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -757,7 +762,8 @@ TEST(DcSctpSocketTest, ShutdownTimerExpiresTooManyTimeClosesConnection) { // The last expiry, makes it abort the connection. EXPECT_CALL(a.cb, OnAborted).Times(1); AdvanceTime(a, z, - a.options.rto_initial * (1 << *a.options.max_retransmissions)); + a.options.rto_initial.ToTimeDelta() * + (1 << *a.options.max_retransmissions)); EXPECT_EQ(a.socket.state(), SocketState::kClosed); EXPECT_THAT(a.cb.ConsumeSentPacket(), @@ -815,7 +821,7 @@ TEST_P(DcSctpSocketParametrizedTest, TimeoutResendsPacket) { a.cb.ConsumeSentPacket(); RTC_LOG(LS_INFO) << "Advancing time"; - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); z->socket.ReceivePacket(a.cb.ConsumeSentPacket()); @@ -886,7 +892,7 @@ TEST_P(DcSctpSocketParametrizedTest, ExpectHeartbeatToBeSent) { EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); - AdvanceTime(a, *z, a.options.heartbeat_interval); + AdvanceTime(a, *z, a.options.heartbeat_interval.ToTimeDelta()); std::vector<uint8_t> packet = a.cb.ConsumeSentPacket(); // The info is a single 64-bit number. @@ -920,7 +926,7 @@ TEST_P(DcSctpSocketParametrizedTest, for (int i = 0; i < *a.options.max_retransmissions; ++i) { RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; - AdvanceTime(a, *z, time_to_next_hearbeat); + AdvanceTime(a, *z, time_to_next_hearbeat.ToTimeDelta()); // Dropping every heartbeat. ASSERT_HAS_VALUE_AND_ASSIGN( @@ -929,20 +935,20 @@ TEST_P(DcSctpSocketParametrizedTest, EXPECT_EQ(hb_packet.descriptors()[0].type, HeartbeatRequestChunk::kType); RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Millis(1000)); time_to_next_hearbeat = a.options.heartbeat_interval - DurationMs(1000); } RTC_LOG(LS_INFO) << "Letting HEARTBEAT interval timer expire - sending..."; - AdvanceTime(a, *z, time_to_next_hearbeat); + AdvanceTime(a, *z, time_to_next_hearbeat.ToTimeDelta()); // Last heartbeat EXPECT_THAT(a.cb.ConsumeSentPacket(), Not(IsEmpty())); EXPECT_CALL(a.cb, OnAborted).Times(1); // Should suffice as exceeding RTO - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Millis(1000)); z = MaybeHandoverSocket(std::move(z)); } @@ -959,7 +965,7 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { // Force-close socket Z so that it doesn't interfere from now on. z->socket.Close(); - DurationMs time_to_next_hearbeat = a.options.heartbeat_interval; + TimeDelta time_to_next_hearbeat = a.options.heartbeat_interval.ToTimeDelta(); for (int i = 0; i < *a.options.max_retransmissions; ++i) { AdvanceTime(a, *z, time_to_next_hearbeat); @@ -968,9 +974,10 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { a.cb.ConsumeSentPacket(); RTC_LOG(LS_INFO) << "Letting the heartbeat expire."; - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Seconds(1)); - time_to_next_hearbeat = a.options.heartbeat_interval - DurationMs(1000); + time_to_next_hearbeat = + a.options.heartbeat_interval.ToTimeDelta() - TimeDelta::Seconds(1); } RTC_LOG(LS_INFO) << "Getting the last heartbeat - and acking it"; @@ -990,7 +997,7 @@ TEST_P(DcSctpSocketParametrizedTest, RecoversAfterASuccessfulAck) { // Should suffice as exceeding RTO - which will not fire. EXPECT_CALL(a.cb, OnAborted).Times(0); - AdvanceTime(a, *z, DurationMs(1000)); + AdvanceTime(a, *z, TimeDelta::Seconds(1)); EXPECT_THAT(a.cb.ConsumeSentPacket(), IsEmpty()); @@ -1245,7 +1252,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendMessageWithLimitedRtx) { a.socket.ReceivePacket(z->cb.ConsumeSentPacket()); // Handle delayed SACK for third DATA - AdvanceTime(a, *z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, *z, a.options.delayed_ack_max_timeout.ToTimeDelta()); // Handle SACK for second DATA a.socket.ReceivePacket(z->cb.ConsumeSentPacket()); @@ -1254,7 +1261,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendMessageWithLimitedRtx) { // in-flight and the reported gap could be due to out-of-order delivery. So // the RetransmissionQueue will not mark it as "to be retransmitted" until // after the t3-rtx timer has expired. - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); // The chunk will be marked as retransmitted, and then as abandoned, which // will trigger a FORWARD-TSN to be sent. @@ -1352,7 +1359,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendManyFragmentedMessagesWithLimitedRtx) { ExchangeMessages(a, *z); // Let the RTX timer expire, and exchange FORWARD-TSN/SACKs - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, *z); @@ -1484,7 +1491,7 @@ TEST(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { .Build()); // The receiver might have moved into delayed ack mode. - AdvanceTime(a, z, z.options.rto_initial); + AdvanceTime(a, z, z.options.rto_initial.ToTimeDelta()); EXPECT_THAT(z.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsSack( @@ -1528,7 +1535,7 @@ TEST(DcSctpSocketTest, PassingHighWatermarkWillOnlyAcceptCumAckTsn) { .Build()); // The receiver might have moved into delayed ack mode. - AdvanceTime(a, z, z.options.rto_initial); + AdvanceTime(a, z, z.options.rto_initial.ToTimeDelta()); EXPECT_THAT(z.cb.ConsumeSentPacket(), HasChunks(ElementsAre(IsSack( @@ -1562,13 +1569,13 @@ TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) { z = MaybeHandoverSocket(std::move(z)); // Mock that the time always goes forward. - TimeMs now(0); - EXPECT_CALL(a.cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + Timestamp now = Timestamp::Zero(); + EXPECT_CALL(a.cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); - EXPECT_CALL(z->cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + EXPECT_CALL(z->cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); @@ -1592,7 +1599,7 @@ TEST_P(DcSctpSocketParametrizedTest, SendsMessagesWithLowLifetime) { EXPECT_FALSE(z->cb.ConsumeReceivedMessage().has_value()); // Validate that the sockets really make the time move forward. - EXPECT_GE(*now, kIterations * 2); + EXPECT_GE(now.ms(), kIterations * 2); MaybeHandoverSocketAndSendMessage(a, std::move(z)); } @@ -1614,13 +1621,13 @@ TEST_P(DcSctpSocketParametrizedTest, lifetime_1.lifetime = DurationMs(1); // Mock that the time always goes forward. - TimeMs now(0); - EXPECT_CALL(a.cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + Timestamp now = Timestamp::Zero(); + EXPECT_CALL(a.cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); - EXPECT_CALL(z->cb, TimeMillis).WillRepeatedly([&]() { - now += DurationMs(3); + EXPECT_CALL(z->cb, Now).WillRepeatedly([&]() { + now += TimeDelta::Millis(3); return now; }); @@ -1944,7 +1951,7 @@ TEST(DcSctpSocketTest, RxAndTxPacketMetricsIncrease) { EXPECT_EQ(z.socket.GetMetrics()->rx_messages_count, 2u); // Delayed sack - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); a.socket.ReceivePacket(z.cb.ConsumeSentPacket()); // SACK EXPECT_EQ(a.socket.GetMetrics()->unack_data_count, 0u); @@ -1981,7 +1988,7 @@ TEST(DcSctpSocketTest, RetransmissionMetricsAreSetForNormalRetransmit) { a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), payload), kSendOptions); a.cb.ConsumeSentPacket(); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_EQ(a.socket.GetMetrics()->rtx_packets_count, 1u); @@ -2185,7 +2192,7 @@ TEST_P(DcSctpSocketParametrizedTest, CanLoseFirstOrderedMessage) { // First DATA is lost, and retransmission timer will delete it. a.cb.ConsumeSentPacket(); - AdvanceTime(a, *z, a.options.rto_initial); + AdvanceTime(a, *z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, *z); // Send a second message (SID=0, SSN=1). @@ -2574,7 +2581,7 @@ TEST(DcSctpSocketTest, LifecycleEventsAreGeneratedForAckedMessages) { EXPECT_CALL(a.cb, OnLifecycleEnd(LifecycleId(42))); ExchangeMessages(a, z); // In case of delayed ack. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_THAT(GetReceivedMessagePpids(z), ElementsAre(101, 102, 103)); @@ -2617,15 +2624,15 @@ TEST(DcSctpSocketTest, LifecycleEventsForFailMaxRetransmissions) { ExchangeMessages(a, z); // Handle delayed SACK. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); // The chunk is now NACKed. Let the RTO expire, to discard the message. - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); ExchangeMessages(a, z); // Handle delayed SACK. - AdvanceTime(a, z, a.options.delayed_ack_max_timeout); + AdvanceTime(a, z, a.options.delayed_ack_max_timeout.ToTimeDelta()); ExchangeMessages(a, z); EXPECT_THAT(GetReceivedMessagePpids(z), ElementsAre(51, 53)); @@ -2672,7 +2679,7 @@ TEST(DcSctpSocketTest, LifecycleEventsForExpiredMessageWithLifetimeLimit) { .lifecycle_id = LifecycleId(1), }); - AdvanceTime(a, z, DurationMs(200)); + AdvanceTime(a, z, TimeDelta::Millis(200)); EXPECT_CALL(a.cb, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); @@ -2769,7 +2776,7 @@ TEST(DcSctpSocketTest, ResetStreamsDeferred) { // Z sent "in progress", which will make A buffer packets until it's sure // that the reconfiguration has been applied. A will retry - wait for that. - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); auto reconfig2 = a.cb.ConsumeSentPacket(); EXPECT_THAT(reconfig2, HasChunks(ElementsAre(IsReConfig(HasParameters( @@ -3018,7 +3025,7 @@ TEST(DcSctpSocketTest, HandlesForwardTsnOutOfOrderWithStreamResetting) { HasChunks(ElementsAre( IsDataChunk(AllOf(Property(&DataChunk::ssn, SSN(0)), Property(&DataChunk::ppid, PPID(51))))))); - AdvanceTime(a, z, a.options.rto_initial); + AdvanceTime(a, z, a.options.rto_initial.ToTimeDelta()); auto fwd_tsn_packet = a.cb.ConsumeSentPacket(); EXPECT_THAT(fwd_tsn_packet, diff --git a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.cc b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.cc index 902dff962f..31211e0d13 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.cc @@ -21,6 +21,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/time_delta.h" #include "net/dcsctp/packet/bounded_byte_reader.h" #include "net/dcsctp/packet/bounded_byte_writer.h" #include "net/dcsctp/packet/chunk/heartbeat_ack_chunk.h" @@ -35,6 +36,8 @@ #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // This is stored (in serialized form) as HeartbeatInfoParameter sent in // HeartbeatRequestChunk and received back in HeartbeatAckChunk. It should be @@ -49,11 +52,11 @@ class HeartbeatInfo { static constexpr size_t kBufferSize = sizeof(uint64_t); static_assert(kBufferSize == 8, "Unexpected buffer size"); - explicit HeartbeatInfo(TimeMs created_at) : created_at_(created_at) {} + explicit HeartbeatInfo(Timestamp created_at) : created_at_(created_at) {} std::vector<uint8_t> Serialize() { - uint32_t high_bits = static_cast<uint32_t>(*created_at_ >> 32); - uint32_t low_bits = static_cast<uint32_t>(*created_at_); + uint32_t high_bits = static_cast<uint32_t>(created_at_.ms() >> 32); + uint32_t low_bits = static_cast<uint32_t>(created_at_.ms()); std::vector<uint8_t> data(kBufferSize); BoundedByteWriter<kBufferSize> writer(data); @@ -75,13 +78,13 @@ class HeartbeatInfo { uint32_t low_bits = reader.Load32<4>(); uint64_t created_at = static_cast<uint64_t>(high_bits) << 32 | low_bits; - return HeartbeatInfo(TimeMs(created_at)); + return HeartbeatInfo(Timestamp::Millis(created_at)); } - TimeMs created_at() const { return created_at_; } + Timestamp created_at() const { return created_at_; } private: - const TimeMs created_at_; + const Timestamp created_at_; }; HeartbeatHandler::HeartbeatHandler(absl::string_view log_prefix, @@ -91,17 +94,18 @@ HeartbeatHandler::HeartbeatHandler(absl::string_view log_prefix, : log_prefix_(log_prefix), ctx_(context), timer_manager_(timer_manager), - interval_duration_(options.heartbeat_interval), + interval_duration_(options.heartbeat_interval.ToTimeDelta()), interval_duration_should_include_rtt_( options.heartbeat_interval_include_rtt), interval_timer_(timer_manager_->CreateTimer( "heartbeat-interval", absl::bind_front(&HeartbeatHandler::OnIntervalTimerExpiry, this), - TimerOptions(interval_duration_, TimerBackoffAlgorithm::kFixed))), + TimerOptions(interval_duration_, + TimerBackoffAlgorithm::kFixed))), timeout_timer_(timer_manager_->CreateTimer( "heartbeat-timeout", absl::bind_front(&HeartbeatHandler::OnTimeoutTimerExpiry, this), - TimerOptions(options.rto_initial, + TimerOptions(options.rto_initial.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, /*max_restarts=*/0))) { // The interval timer must always be running as long as the association is up. @@ -109,7 +113,7 @@ HeartbeatHandler::HeartbeatHandler(absl::string_view log_prefix, } void HeartbeatHandler::RestartTimer() { - if (interval_duration_ == DurationMs(0)) { + if (interval_duration_.IsZero()) { // Heartbeating has been disabled. return; } @@ -117,7 +121,8 @@ void HeartbeatHandler::RestartTimer() { if (interval_duration_should_include_rtt_) { // The RTT should be used, but it's not easy accessible. The RTO will // suffice. - interval_timer_->set_duration(interval_duration_ + ctx_->current_rto()); + interval_timer_->set_duration( + interval_duration_ + ctx_->current_rto()); } else { interval_timer_->set_duration(interval_duration_); } @@ -153,8 +158,8 @@ void HeartbeatHandler::HandleHeartbeatAck(HeartbeatAckChunk chunk) { return; } - TimeMs now = ctx_->callbacks().TimeMillis(); - if (info->created_at() > TimeMs(0) && info->created_at() <= now) { + Timestamp now = ctx_->callbacks().Now(); + if (info->created_at() > Timestamp::Zero() && info->created_at() <= now) { ctx_->ObserveRTT(now - info->created_at()); } @@ -164,13 +169,13 @@ void HeartbeatHandler::HandleHeartbeatAck(HeartbeatAckChunk chunk) { ctx_->ClearTxErrorCounter(); } -absl::optional<DurationMs> HeartbeatHandler::OnIntervalTimerExpiry() { +TimeDelta HeartbeatHandler::OnIntervalTimerExpiry() { if (ctx_->is_connection_established()) { - HeartbeatInfo info(ctx_->callbacks().TimeMillis()); + HeartbeatInfo info(ctx_->callbacks().Now()); timeout_timer_->set_duration(ctx_->current_rto()); timeout_timer_->Start(); RTC_DLOG(LS_INFO) << log_prefix_ << "Sending HEARTBEAT with timeout " - << *timeout_timer_->duration(); + << webrtc::ToString(timeout_timer_->duration()); Parameters parameters = Parameters::Builder() .Add(HeartbeatInfoParameter(info.Serialize())) @@ -183,14 +188,14 @@ absl::optional<DurationMs> HeartbeatHandler::OnIntervalTimerExpiry() { << log_prefix_ << "Will not send HEARTBEAT when connection not established"; } - return absl::nullopt; + return TimeDelta::Zero(); } -absl::optional<DurationMs> HeartbeatHandler::OnTimeoutTimerExpiry() { +TimeDelta HeartbeatHandler::OnTimeoutTimerExpiry() { // Note that the timeout timer is not restarted. It will be started again when // the interval timer expires. RTC_DCHECK(!timeout_timer_->is_running()); ctx_->IncrementTxErrorCounter("HEARTBEAT timeout"); - return absl::nullopt; + return TimeDelta::Zero(); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.h b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.h index 318b02955b..ac58b97a64 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.h +++ b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler.h @@ -50,14 +50,14 @@ class HeartbeatHandler { void HandleHeartbeatAck(HeartbeatAckChunk chunk); private: - absl::optional<DurationMs> OnIntervalTimerExpiry(); - absl::optional<DurationMs> OnTimeoutTimerExpiry(); + webrtc::TimeDelta OnIntervalTimerExpiry(); + webrtc::TimeDelta OnTimeoutTimerExpiry(); const absl::string_view log_prefix_; Context* ctx_; TimerManager* timer_manager_; // The time for a connection to be idle before a heartbeat is sent. - const DurationMs interval_duration_; + const webrtc::TimeDelta interval_duration_; // Adding RTT to the duration will add some jitter, which is good in // production, but less good in unit tests, which is why it can be disabled. const bool interval_duration_should_include_rtt_; diff --git a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler_test.cc b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler_test.cc index d573192440..4475527322 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler_test.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/heartbeat_handler_test.cc @@ -30,6 +30,7 @@ using ::testing::IsEmpty; using ::testing::NiceMock; using ::testing::Return; using ::testing::SizeIs; +using ::webrtc::TimeDelta; constexpr DurationMs kHeartbeatInterval = DurationMs(30'000); @@ -51,7 +52,7 @@ class HeartbeatHandlerTestBase : public testing::Test { }), handler_("log: ", options_, &context_, &timer_manager_) {} - void AdvanceTime(DurationMs duration) { + void AdvanceTime(webrtc::TimeDelta duration) { callbacks_.AdvanceTime(duration); for (;;) { absl::optional<TimeoutID> timeout_id = callbacks_.GetNextExpiredTimeout(); @@ -80,7 +81,7 @@ class DisabledHeartbeatHandlerTest : public HeartbeatHandlerTestBase { }; TEST_F(HeartbeatHandlerTest, HasRunningHeartbeatIntervalTimer) { - AdvanceTime(options_.heartbeat_interval); + AdvanceTime(options_.heartbeat_interval.ToTimeDelta()); // Validate that a heartbeat request was sent. std::vector<uint8_t> payload = callbacks_.ConsumeSentPacket(); @@ -119,7 +120,7 @@ TEST_F(HeartbeatHandlerTest, RepliesToHeartbeatRequests) { } TEST_F(HeartbeatHandlerTest, SendsHeartbeatRequestsOnIdleChannel) { - AdvanceTime(options_.heartbeat_interval); + AdvanceTime(options_.heartbeat_interval.ToTimeDelta()); // Grab the request, and make a response. std::vector<uint8_t> payload = callbacks_.ConsumeSentPacket(); @@ -134,7 +135,7 @@ TEST_F(HeartbeatHandlerTest, SendsHeartbeatRequestsOnIdleChannel) { HeartbeatAckChunk ack(std::move(req).extract_parameters()); // Respond a while later. This RTT will be measured by the handler - constexpr DurationMs rtt(313); + constexpr TimeDelta rtt = TimeDelta::Millis(313); EXPECT_CALL(context_, ObserveRTT(rtt)).Times(1); @@ -143,7 +144,7 @@ TEST_F(HeartbeatHandlerTest, SendsHeartbeatRequestsOnIdleChannel) { } TEST_F(HeartbeatHandlerTest, DoesntObserveInvalidHeartbeats) { - AdvanceTime(options_.heartbeat_interval); + AdvanceTime(options_.heartbeat_interval.ToTimeDelta()); // Grab the request, and make a response. std::vector<uint8_t> payload = callbacks_.ConsumeSentPacket(); @@ -161,15 +162,15 @@ TEST_F(HeartbeatHandlerTest, DoesntObserveInvalidHeartbeats) { // Go backwards in time - which make the HEARTBEAT-ACK have an invalid // timestamp in it, as it will be in the future. - callbacks_.AdvanceTime(DurationMs(-100)); + callbacks_.AdvanceTime(TimeDelta::Millis(-100)); handler_.HandleHeartbeatAck(std::move(ack)); } TEST_F(HeartbeatHandlerTest, IncreasesErrorIfNotAckedInTime) { - DurationMs rto(105); + TimeDelta rto = TimeDelta::Millis(105); EXPECT_CALL(context_, current_rto).WillOnce(Return(rto)); - AdvanceTime(options_.heartbeat_interval); + AdvanceTime(options_.heartbeat_interval.ToTimeDelta()); // Validate that a request was sent. EXPECT_THAT(callbacks_.ConsumeSentPacket(), Not(IsEmpty())); @@ -179,7 +180,7 @@ TEST_F(HeartbeatHandlerTest, IncreasesErrorIfNotAckedInTime) { } TEST_F(DisabledHeartbeatHandlerTest, IsReallyDisabled) { - AdvanceTime(options_.heartbeat_interval); + AdvanceTime(options_.heartbeat_interval.ToTimeDelta()); // Validate that a request was NOT sent. EXPECT_THAT(callbacks_.ConsumeSentPacket(), IsEmpty()); diff --git a/third_party/libwebrtc/net/dcsctp/socket/mock_context.h b/third_party/libwebrtc/net/dcsctp/socket/mock_context.h index 88e71d1b35..bbd9cd17d9 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/mock_context.h +++ b/third_party/libwebrtc/net/dcsctp/socket/mock_context.h @@ -40,7 +40,8 @@ class MockContext : public Context { ON_CALL(*this, peer_initial_tsn) .WillByDefault(testing::Return(PeerInitialTsn())); ON_CALL(*this, callbacks).WillByDefault(testing::ReturnRef(callbacks_)); - ON_CALL(*this, current_rto).WillByDefault(testing::Return(DurationMs(123))); + ON_CALL(*this, current_rto) + .WillByDefault(testing::Return(webrtc::TimeDelta::Millis(123))); ON_CALL(*this, Send).WillByDefault([this](SctpPacket::Builder& builder) { callbacks_.SendPacketWithStatus(builder.Build()); }); @@ -51,8 +52,8 @@ class MockContext : public Context { MOCK_METHOD(TSN, peer_initial_tsn, (), (const, override)); MOCK_METHOD(DcSctpSocketCallbacks&, callbacks, (), (const, override)); - MOCK_METHOD(void, ObserveRTT, (DurationMs rtt_ms), (override)); - MOCK_METHOD(DurationMs, current_rto, (), (const, override)); + MOCK_METHOD(void, ObserveRTT, (webrtc::TimeDelta rtt), (override)); + MOCK_METHOD(webrtc::TimeDelta, current_rto, (), (const, override)); MOCK_METHOD(bool, IncrementTxErrorCounter, (absl::string_view reason), diff --git a/third_party/libwebrtc/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h b/third_party/libwebrtc/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h index 150c1b9fa5..972e547b12 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h +++ b/third_party/libwebrtc/net/dcsctp/socket/mock_dcsctp_socket_callbacks.h @@ -80,7 +80,7 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { << log_prefix_ << "Socket abort: " << ToString(error) << "; " << message; }); - ON_CALL(*this, TimeMillis).WillByDefault([this]() { return now_; }); + ON_CALL(*this, Now).WillByDefault([this]() { return now_; }); } MOCK_METHOD(SendPacketStatus, @@ -94,7 +94,7 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { return timeout_manager_.CreateTimeout(); } - MOCK_METHOD(TimeMs, TimeMillis, (), (override)); + MOCK_METHOD(webrtc::Timestamp, Now, (), (override)); uint32_t GetRandomInt(uint32_t low, uint32_t high) override { return random_.Rand(low, high); } @@ -159,20 +159,20 @@ class MockDcSctpSocketCallbacks : public DcSctpSocketCallbacks { return ret; } - void AdvanceTime(DurationMs duration_ms) { now_ = now_ + duration_ms; } - void SetTime(TimeMs now) { now_ = now; } + void AdvanceTime(webrtc::TimeDelta duration) { now_ = now_ + duration; } + void SetTime(webrtc::Timestamp now) { now_ = now; } absl::optional<TimeoutID> GetNextExpiredTimeout() { return timeout_manager_.GetNextExpiredTimeout(); } - DurationMs GetTimeToNextTimeout() const { + webrtc::TimeDelta GetTimeToNextTimeout() const { return timeout_manager_.GetTimeToNextTimeout(); } private: const std::string log_prefix_; - TimeMs now_ = TimeMs(0); + webrtc::Timestamp now_ = webrtc::Timestamp::Zero(); webrtc::Random random_; FakeTimeoutManager timeout_manager_; std::deque<std::vector<uint8_t>> sent_packets_; diff --git a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.cc b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.cc index 2094309afe..fafb9933e5 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.cc @@ -16,8 +16,8 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/time_delta.h" #include "net/dcsctp/common/internal_types.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/reconfig_chunk.h" #include "net/dcsctp/packet/parameter/add_incoming_streams_request_parameter.h" #include "net/dcsctp/packet/parameter/add_outgoing_streams_request_parameter.h" @@ -35,9 +35,11 @@ #include "net/dcsctp/timer/timer.h" #include "net/dcsctp/tx/retransmission_queue.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { namespace { +using ::webrtc::TimeDelta; using ResponseResult = ReconfigurationResponseParameter::Result; bool DescriptorsAre(const std::vector<ParameterDescriptor>& c, @@ -347,13 +349,13 @@ void StreamResetHandler::ResetStreams( } } -absl::optional<DurationMs> StreamResetHandler::OnReconfigTimerExpiry() { +TimeDelta StreamResetHandler::OnReconfigTimerExpiry() { if (current_request_->has_been_sent()) { // There is an outstanding request, which timed out while waiting for a // response. if (!ctx_->IncrementTxErrorCounter("RECONFIG timeout")) { // Timed out. The connection will close after processing the timers. - return absl::nullopt; + return TimeDelta::Zero(); } } else { // There is no outstanding request, but there is a prepared one. This means diff --git a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.h b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.h index c335130175..77e8f3bd97 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.h +++ b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler.h @@ -20,6 +20,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/time_delta.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/chunk/reconfig_chunk.h" #include "net/dcsctp/packet/parameter/incoming_ssn_reset_request_parameter.h" @@ -80,7 +81,7 @@ class StreamResetHandler { reconfig_timer_(timer_manager->CreateTimer( "re-config", absl::bind_front(&StreamResetHandler::OnReconfigTimerExpiry, this), - TimerOptions(DurationMs(0)))), + TimerOptions(webrtc::TimeDelta::Zero()))), next_outgoing_req_seq_nbr_( handover_state ? ReconfigRequestSN(handover_state->tx.next_reset_req_sn) @@ -211,7 +212,7 @@ class StreamResetHandler { void HandleResponse(const ParameterDescriptor& descriptor); // Expiration handler for the Reconfig timer. - absl::optional<DurationMs> OnReconfigTimerExpiry(); + webrtc::TimeDelta OnReconfigTimerExpiry(); const absl::string_view log_prefix_; Context* ctx_; diff --git a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler_test.cc b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler_test.cc index 091d717f8a..e675c9bcef 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler_test.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/stream_reset_handler_test.cc @@ -48,6 +48,7 @@ using ::testing::Property; using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; using ResponseResult = ReconfigurationResponseParameter::Result; using SkippedStream = AnyForwardTsnChunk::SkippedStream; @@ -57,7 +58,7 @@ constexpr TSN kPeerInitialTsn = MockContext::PeerInitialTsn(); constexpr ReconfigRequestSN kPeerInitialReqSn = ReconfigRequestSN(*kPeerInitialTsn); constexpr uint32_t kArwnd = 131072; -constexpr DurationMs kRto = DurationMs(250); +constexpr TimeDelta kRto = TimeDelta::Millis(250); constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4}; @@ -97,12 +98,12 @@ class StreamResetHandlerTest : public testing::Test { }), delayed_ack_timer_(timer_manager_.CreateTimer( "test/delayed_ack", - []() { return absl::nullopt; }, - TimerOptions(DurationMs(0)))), + []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Zero()))), t3_rtx_timer_(timer_manager_.CreateTimer( "test/t3_rtx", - []() { return absl::nullopt; }, - TimerOptions(DurationMs(0)))), + []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Zero()))), data_tracker_(std::make_unique<DataTracker>("log: ", delayed_ack_timer_.get(), kPeerInitialTsn)), @@ -115,7 +116,7 @@ class StreamResetHandlerTest : public testing::Test { kMyInitialTsn, kArwnd, producer_, - [](DurationMs rtt_ms) {}, + [](TimeDelta rtt) {}, []() {}, *t3_rtx_timer_, DcSctpOptions())), @@ -129,8 +130,8 @@ class StreamResetHandlerTest : public testing::Test { EXPECT_CALL(ctx_, current_rto).WillRepeatedly(Return(kRto)); } - void AdvanceTime(DurationMs duration) { - callbacks_.AdvanceTime(kRto); + void AdvanceTime(TimeDelta duration) { + callbacks_.AdvanceTime(duration); for (;;) { absl::optional<TimeoutID> timeout_id = callbacks_.GetNextExpiredTimeout(); if (!timeout_id.has_value()) { @@ -204,8 +205,8 @@ class StreamResetHandlerTest : public testing::Test { std::make_unique<ReassemblyQueue>("log: ", kPeerInitialTsn, kArwnd); reasm_->RestoreFromState(state); retransmission_queue_ = std::make_unique<RetransmissionQueue>( - "", &callbacks_, kMyInitialTsn, kArwnd, producer_, - [](DurationMs rtt_ms) {}, []() {}, *t3_rtx_timer_, DcSctpOptions(), + "", &callbacks_, kMyInitialTsn, kArwnd, producer_, [](TimeDelta rtt) {}, + []() {}, *t3_rtx_timer_, DcSctpOptions(), /*supports_partial_reliability=*/true, /*use_message_interleaving=*/false); retransmission_queue_->RestoreFromState(state); diff --git a/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.cc b/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.cc index 0621b48e80..c6c8861e1f 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.cc @@ -17,6 +17,7 @@ #include <vector> #include "absl/types/optional.h" +#include "api/units/time_delta.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/idata_chunk.h" @@ -37,6 +38,8 @@ #include "rtc_base/strings/string_builder.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; TransmissionControlBlock::TransmissionControlBlock( TimerManager& timer_manager, @@ -61,20 +64,20 @@ TransmissionControlBlock::TransmissionControlBlock( t3_rtx_(timer_manager_.CreateTimer( "t3-rtx", absl::bind_front(&TransmissionControlBlock::OnRtxTimerExpiry, this), - TimerOptions(options.rto_initial, + TimerOptions(options.rto_initial.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, /*max_restarts=*/absl::nullopt, options.max_timer_backoff_duration.has_value() - ? *options.max_timer_backoff_duration - : DurationMs::InfiniteDuration()))), + ? options.max_timer_backoff_duration->ToTimeDelta() + : TimeDelta::PlusInfinity()))), delayed_ack_timer_(timer_manager_.CreateTimer( "delayed-ack", absl::bind_front(&TransmissionControlBlock::OnDelayedAckTimerExpiry, this), - TimerOptions(options.delayed_ack_max_timeout, + TimerOptions(options.delayed_ack_max_timeout.ToTimeDelta(), TimerBackoffAlgorithm::kExponential, /*max_restarts=*/0, - /*max_backoff_duration=*/DurationMs::InfiniteDuration(), + /*max_backoff_duration=*/TimeDelta::PlusInfinity(), webrtc::TaskQueueBase::DelayPrecision::kHigh))), my_verification_tag_(my_verification_tag), my_initial_tsn_(my_initial_tsn), @@ -112,21 +115,22 @@ TransmissionControlBlock::TransmissionControlBlock( send_queue.EnableMessageInterleaving(capabilities.message_interleaving); } -void TransmissionControlBlock::ObserveRTT(DurationMs rtt) { - DurationMs prev_rto = rto_.rto(); +void TransmissionControlBlock::ObserveRTT(TimeDelta rtt) { + TimeDelta prev_rto = rto_.rto(); rto_.ObserveRTT(rtt); - RTC_DLOG(LS_VERBOSE) << log_prefix_ << "new rtt=" << *rtt - << ", srtt=" << *rto_.srtt() << ", rto=" << *rto_.rto() - << " (" << *prev_rto << ")"; + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "new rtt=" << webrtc::ToString(rtt) + << ", srtt=" << webrtc::ToString(rto_.srtt()) + << ", rto=" << webrtc::ToString(rto_.rto()) << " (" + << webrtc::ToString(prev_rto) << ")"; t3_rtx_->set_duration(rto_.rto()); - DurationMs delayed_ack_tmo = - std::min(rto_.rto() * 0.5, options_.delayed_ack_max_timeout); + TimeDelta delayed_ack_tmo = std::min( + rto_.rto() * 0.5, options_.delayed_ack_max_timeout.ToTimeDelta()); delayed_ack_timer_->set_duration(delayed_ack_tmo); } -absl::optional<DurationMs> TransmissionControlBlock::OnRtxTimerExpiry() { - TimeMs now = callbacks_.TimeMillis(); +TimeDelta TransmissionControlBlock::OnRtxTimerExpiry() { + Timestamp now = callbacks_.Now(); RTC_DLOG(LS_INFO) << log_prefix_ << "Timer " << t3_rtx_->name() << " has expired"; if (cookie_echo_chunk_.has_value()) { @@ -139,13 +143,13 @@ absl::optional<DurationMs> TransmissionControlBlock::OnRtxTimerExpiry() { SendBufferedPackets(now); } } - return absl::nullopt; + return TimeDelta::Zero(); } -absl::optional<DurationMs> TransmissionControlBlock::OnDelayedAckTimerExpiry() { +TimeDelta TransmissionControlBlock::OnDelayedAckTimerExpiry() { data_tracker_.HandleDelayedAckTimerExpiry(); MaybeSendSack(); - return absl::nullopt; + return TimeDelta::Zero(); } void TransmissionControlBlock::MaybeSendSack() { @@ -158,7 +162,7 @@ void TransmissionControlBlock::MaybeSendSack() { } void TransmissionControlBlock::MaybeSendForwardTsn(SctpPacket::Builder& builder, - TimeMs now) { + Timestamp now) { if (now >= limit_forward_tsn_until_ && retransmission_queue_.ShouldSendForwardTsn(now)) { if (capabilities_.message_interleaving) { @@ -173,7 +177,8 @@ void TransmissionControlBlock::MaybeSendForwardTsn(SctpPacket::Builder& builder, // sending a duplicate FORWARD TSN." // "Any delay applied to the sending of FORWARD TSN chunk SHOULD NOT exceed // 200ms and MUST NOT exceed 500ms". - limit_forward_tsn_until_ = now + std::min(DurationMs(200), rto_.srtt()); + limit_forward_tsn_until_ = + now + std::min(TimeDelta::Millis(200), rto_.srtt()); } } @@ -205,7 +210,7 @@ void TransmissionControlBlock::MaybeSendFastRetransmit() { } void TransmissionControlBlock::SendBufferedPackets(SctpPacket::Builder& builder, - TimeMs now) { + Timestamp now) { for (int packet_idx = 0; packet_idx < options_.max_burst && retransmission_queue_.can_send_data(); ++packet_idx) { diff --git a/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.h b/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.h index 46a39d5a7b..f8b2445525 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.h +++ b/third_party/libwebrtc/net/dcsctp/socket/transmission_control_block.h @@ -67,8 +67,8 @@ class TransmissionControlBlock : public Context { TSN my_initial_tsn() const override { return my_initial_tsn_; } TSN peer_initial_tsn() const override { return peer_initial_tsn_; } DcSctpSocketCallbacks& callbacks() const override { return callbacks_; } - void ObserveRTT(DurationMs rtt) override; - DurationMs current_rto() const override { return rto_.rto(); } + void ObserveRTT(webrtc::TimeDelta rtt) override; + webrtc::TimeDelta current_rto() const override { return rto_.rto(); } bool IncrementTxErrorCounter(absl::string_view reason) override { return tx_error_counter_.Increment(reason); } @@ -91,7 +91,7 @@ class TransmissionControlBlock : public Context { StreamResetHandler& stream_reset_handler() { return stream_reset_handler_; } HeartbeatHandler& heartbeat_handler() { return heartbeat_handler_; } size_t cwnd() const { return retransmission_queue_.cwnd(); } - DurationMs current_srtt() const { return rto_.srtt(); } + webrtc::TimeDelta current_srtt() const { return rto_.srtt(); } // Returns this socket's verification tag, set in all packet headers. VerificationTag my_verification_tag() const { return my_verification_tag_; } @@ -108,7 +108,7 @@ class TransmissionControlBlock : public Context { void MaybeSendSack(); // Sends a FORWARD-TSN, if it is needed and allowed (rate-limited). - void MaybeSendForwardTsn(SctpPacket::Builder& builder, TimeMs now); + void MaybeSendForwardTsn(SctpPacket::Builder& builder, webrtc::Timestamp now); // Will be set while the socket is in kCookieEcho state. In this state, there // can only be a single packet outstanding, and it must contain the COOKIE @@ -129,12 +129,12 @@ class TransmissionControlBlock : public Context { // Fills `builder` (which may already be filled with control chunks) with // other control and data chunks, and sends packets as much as can be // allowed by the congestion control algorithm. - void SendBufferedPackets(SctpPacket::Builder& builder, TimeMs now); + void SendBufferedPackets(SctpPacket::Builder& builder, webrtc::Timestamp now); // As above, but without passing in a builder. If `cookie_echo_chunk_` is // present, then only one packet will be sent, with this chunk as the first // chunk. - void SendBufferedPackets(TimeMs now) { + void SendBufferedPackets(webrtc::Timestamp now) { SctpPacket::Builder builder(peer_verification_tag_, options_); SendBufferedPackets(builder, now); } @@ -149,9 +149,9 @@ class TransmissionControlBlock : public Context { private: // Will be called when the retransmission timer (t3-rtx) expires. - absl::optional<DurationMs> OnRtxTimerExpiry(); + webrtc::TimeDelta OnRtxTimerExpiry(); // Will be called when the delayed ack timer expires. - absl::optional<DurationMs> OnDelayedAckTimerExpiry(); + webrtc::TimeDelta OnDelayedAckTimerExpiry(); const absl::string_view log_prefix_; const DcSctpOptions options_; @@ -172,7 +172,7 @@ class TransmissionControlBlock : public Context { const std::function<bool()> is_connection_established_; PacketSender& packet_sender_; // Rate limiting of FORWARD-TSN. Next can be sent at or after this timestamp. - TimeMs limit_forward_tsn_until_ = TimeMs(0); + webrtc::Timestamp limit_forward_tsn_until_ = webrtc::Timestamp::Zero(); RetransmissionTimeout rto_; RetransmissionErrorCounter tx_error_counter_; diff --git a/third_party/libwebrtc/net/dcsctp/timer/BUILD.gn b/third_party/libwebrtc/net/dcsctp/timer/BUILD.gn index d3be1ec872..9dbe11b3ba 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/timer/BUILD.gn @@ -12,6 +12,8 @@ rtc_library("timer") { deps = [ "../../../api:array_view", "../../../api/task_queue:task_queue", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_map", @@ -37,6 +39,7 @@ rtc_library("task_queue_timeout") { "../../../api/task_queue:pending_task_safety_flag", "../../../api/task_queue:task_queue", "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:logging", "../public:socket", @@ -59,6 +62,7 @@ if (rtc_include_tests) { "../../../api:array_view", "../../../api/task_queue:task_queue", "../../../api/task_queue/test:mock_task_queue_base", + "../../../api/units:time_delta", "../../../rtc_base:checks", "../../../rtc_base:gunit_helpers", "../../../test:test_support", diff --git a/third_party/libwebrtc/net/dcsctp/timer/fake_timeout.h b/third_party/libwebrtc/net/dcsctp/timer/fake_timeout.h index 4621b2ce83..cac49287d4 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/fake_timeout.h +++ b/third_party/libwebrtc/net/dcsctp/timer/fake_timeout.h @@ -19,6 +19,7 @@ #include "absl/types/optional.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" #include "net/dcsctp/public/timeout.h" #include "net/dcsctp/public/types.h" #include "rtc_base/checks.h" @@ -29,46 +30,46 @@ namespace dcsctp { // A timeout used in tests. class FakeTimeout : public Timeout { public: - FakeTimeout(std::function<TimeMs()> get_time, + FakeTimeout(std::function<webrtc::Timestamp()> get_time, std::function<void(FakeTimeout*)> on_delete) : get_time_(std::move(get_time)), on_delete_(std::move(on_delete)) {} ~FakeTimeout() override { on_delete_(this); } void Start(DurationMs duration_ms, TimeoutID timeout_id) override { - RTC_DCHECK(expiry_ == TimeMs::InfiniteFuture()); + RTC_DCHECK(expiry_.IsPlusInfinity()); timeout_id_ = timeout_id; - expiry_ = get_time_() + duration_ms; + expiry_ = get_time_() + duration_ms.ToTimeDelta(); } void Stop() override { - RTC_DCHECK(expiry_ != TimeMs::InfiniteFuture()); - expiry_ = TimeMs::InfiniteFuture(); + RTC_DCHECK(!expiry_.IsPlusInfinity()); + expiry_ = webrtc::Timestamp::PlusInfinity(); } - bool EvaluateHasExpired(TimeMs now) { + bool EvaluateHasExpired(webrtc::Timestamp now) { if (now >= expiry_) { - expiry_ = TimeMs::InfiniteFuture(); + expiry_ = webrtc::Timestamp::PlusInfinity(); return true; } return false; } TimeoutID timeout_id() const { return timeout_id_; } - TimeMs expiry() const { return expiry_; } + webrtc::Timestamp expiry() const { return expiry_; } private: - const std::function<TimeMs()> get_time_; + const std::function<webrtc::Timestamp()> get_time_; const std::function<void(FakeTimeout*)> on_delete_; TimeoutID timeout_id_ = TimeoutID(0); - TimeMs expiry_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp expiry_ = webrtc::Timestamp::PlusInfinity(); }; class FakeTimeoutManager { public: // The `get_time` function must return the current time, relative to any // epoch. - explicit FakeTimeoutManager(std::function<TimeMs()> get_time) + explicit FakeTimeoutManager(std::function<webrtc::Timestamp()> get_time) : get_time_(std::move(get_time)) {} std::unique_ptr<FakeTimeout> CreateTimeout() { @@ -89,7 +90,7 @@ class FakeTimeoutManager { // Timer::is_running_ to false before you operate on the Timer or Timeout // again. absl::optional<TimeoutID> GetNextExpiredTimeout() { - TimeMs now = get_time_(); + webrtc::Timestamp now = get_time_(); std::vector<TimeoutID> expired_timers; for (auto& timer : timers_) { if (timer->EvaluateHasExpired(now)) { @@ -99,21 +100,21 @@ class FakeTimeoutManager { return absl::nullopt; } - DurationMs GetTimeToNextTimeout() const { - TimeMs next_expiry = TimeMs::InfiniteFuture(); + webrtc::TimeDelta GetTimeToNextTimeout() const { + webrtc::Timestamp next_expiry = webrtc::Timestamp::PlusInfinity(); for (const FakeTimeout* timer : timers_) { if (timer->expiry() < next_expiry) { next_expiry = timer->expiry(); } } - TimeMs now = get_time_(); - return next_expiry != TimeMs::InfiniteFuture() && next_expiry >= now + webrtc::Timestamp now = get_time_(); + return !next_expiry.IsPlusInfinity() && next_expiry >= now ? next_expiry - now - : DurationMs::InfiniteDuration(); + : webrtc::TimeDelta::PlusInfinity(); } private: - const std::function<TimeMs()> get_time_; + const std::function<webrtc::Timestamp()> get_time_; webrtc::flat_set<FakeTimeout*> timers_; }; diff --git a/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.cc b/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.cc index 6c43640d39..7612f98f3a 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.cc +++ b/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.cc @@ -14,6 +14,8 @@ #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; TaskQueueTimeoutFactory::TaskQueueTimeout::TaskQueueTimeout( TaskQueueTimeoutFactory& parent, @@ -30,8 +32,8 @@ TaskQueueTimeoutFactory::TaskQueueTimeout::~TaskQueueTimeout() { void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, TimeoutID timeout_id) { RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - RTC_DCHECK(timeout_expiration_ == TimeMs::InfiniteFuture()); - timeout_expiration_ = parent_.get_time_() + duration_ms; + RTC_DCHECK(timeout_expiration_.IsPlusInfinity()); + timeout_expiration_ = parent_.Now() + duration_ms.ToTimeDelta(); timeout_id_ = timeout_id; if (timeout_expiration_ >= posted_task_expiration_) { @@ -43,7 +45,7 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, return; } - if (posted_task_expiration_ != TimeMs::InfiniteFuture()) { + if (!posted_task_expiration_.IsPlusInfinity()) { RTC_DLOG(LS_VERBOSE) << "New timeout duration is less than scheduled - " "ghosting old delayed task."; // There is already a scheduled delayed task, but its expiration time is @@ -63,10 +65,10 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, [timeout_id, this]() { RTC_DLOG(LS_VERBOSE) << "Timout expired: " << timeout_id.value(); RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - RTC_DCHECK(posted_task_expiration_ != TimeMs::InfiniteFuture()); - posted_task_expiration_ = TimeMs::InfiniteFuture(); + RTC_DCHECK(!posted_task_expiration_.IsPlusInfinity()); + posted_task_expiration_ = Timestamp::PlusInfinity(); - if (timeout_expiration_ == TimeMs::InfiniteFuture()) { + if (timeout_expiration_.IsPlusInfinity()) { // The timeout was stopped before it expired. Very common. } else { // Note that the timeout might have been restarted, which updated @@ -74,10 +76,10 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, // if it's not quite time to trigger the timeout yet, schedule a // new delayed task with what's remaining and retry at that point // in time. - DurationMs remaining = timeout_expiration_ - parent_.get_time_(); - timeout_expiration_ = TimeMs::InfiniteFuture(); - if (*remaining > 0) { - Start(remaining, timeout_id_); + TimeDelta remaining = timeout_expiration_ - parent_.Now(); + timeout_expiration_ = Timestamp::PlusInfinity(); + if (remaining > TimeDelta::Zero()) { + Start(DurationMs(remaining.ms()), timeout_id_); } else { // It has actually triggered. RTC_DLOG(LS_VERBOSE) @@ -93,7 +95,7 @@ void TaskQueueTimeoutFactory::TaskQueueTimeout::Stop() { // As the TaskQueue doesn't support deleting a posted task, just mark the // timeout as not running. RTC_DCHECK_RUN_ON(&parent_.thread_checker_); - timeout_expiration_ = TimeMs::InfiniteFuture(); + timeout_expiration_ = Timestamp::PlusInfinity(); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.h b/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.h index faae14464f..4b40309f83 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.h +++ b/third_party/libwebrtc/net/dcsctp/timer/task_queue_timeout.h @@ -15,6 +15,7 @@ #include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/timestamp.h" #include "net/dcsctp/public/timeout.h" namespace dcsctp { @@ -74,14 +75,17 @@ class TaskQueueTimeoutFactory { rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> pending_task_safety_flag_; // The time when the posted delayed task is set to expire. Will be set to // the infinite future if there is no such task running. - TimeMs posted_task_expiration_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp posted_task_expiration_ = + webrtc::Timestamp::PlusInfinity(); // The time when the timeout expires. It will be set to the infinite future // if the timeout is not running/not started. - TimeMs timeout_expiration_ = TimeMs::InfiniteFuture(); + webrtc::Timestamp timeout_expiration_ = webrtc::Timestamp::PlusInfinity(); // The current timeout ID that will be reported when expired. TimeoutID timeout_id_ = TimeoutID(0); }; + webrtc::Timestamp Now() { return webrtc::Timestamp::Millis(*get_time_()); } + RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_; webrtc::TaskQueueBase& task_queue_; const std::function<TimeMs()> get_time_; diff --git a/third_party/libwebrtc/net/dcsctp/timer/timer.cc b/third_party/libwebrtc/net/dcsctp/timer/timer.cc index 208f26fdf9..07c9f3d786 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/timer.cc +++ b/third_party/libwebrtc/net/dcsctp/timer/timer.cc @@ -22,21 +22,23 @@ namespace dcsctp { namespace { +using ::webrtc::TimeDelta; + TimeoutID MakeTimeoutId(TimerID timer_id, TimerGeneration generation) { return TimeoutID(static_cast<uint64_t>(*timer_id) << 32 | *generation); } -DurationMs GetBackoffDuration(const TimerOptions& options, - DurationMs base_duration, - int expiration_count) { +TimeDelta GetBackoffDuration(const TimerOptions& options, + TimeDelta base_duration, + int expiration_count) { switch (options.backoff_algorithm) { case TimerBackoffAlgorithm::kFixed: return base_duration; case TimerBackoffAlgorithm::kExponential: { - DurationMs duration = base_duration; + TimeDelta duration = base_duration; while (expiration_count > 0 && duration < Timer::kMaxTimerDuration) { - duration *= 2; + duration = duration * 2; --expiration_count; if (duration > options.max_backoff_duration) { @@ -44,13 +46,13 @@ DurationMs GetBackoffDuration(const TimerOptions& options, } } - return DurationMs(std::min(duration, Timer::kMaxTimerDuration)); + return TimeDelta(std::min(duration, Timer::kMaxTimerDuration)); } } } } // namespace -constexpr DurationMs Timer::kMaxTimerDuration; +constexpr TimeDelta Timer::kMaxTimerDuration; Timer::Timer(TimerID id, absl::string_view name, @@ -76,12 +78,12 @@ void Timer::Start() { if (!is_running()) { is_running_ = true; generation_ = TimerGeneration(*generation_ + 1); - timeout_->Start(duration_, MakeTimeoutId(id_, generation_)); + timeout_->Start(DurationMs(duration_), MakeTimeoutId(id_, generation_)); } else { // Timer was running - stop and restart it, to make it expire in `duration_` // from now. generation_ = TimerGeneration(*generation_ + 1); - timeout_->Restart(duration_, MakeTimeoutId(id_, generation_)); + timeout_->Restart(DurationMs(duration_), MakeTimeoutId(id_, generation_)); } } @@ -103,23 +105,24 @@ void Timer::Trigger(TimerGeneration generation) { // timer. Note that it might be very quickly restarted again, if the // `on_expired_` callback returns a new duration. is_running_ = true; - DurationMs duration = + TimeDelta duration = GetBackoffDuration(options_, duration_, expiration_count_); generation_ = TimerGeneration(*generation_ + 1); - timeout_->Start(duration, MakeTimeoutId(id_, generation_)); + timeout_->Start(DurationMs(duration), MakeTimeoutId(id_, generation_)); } - absl::optional<DurationMs> new_duration = on_expired_(); - if (new_duration.has_value() && new_duration != duration_) { - duration_ = new_duration.value(); + TimeDelta new_duration = on_expired_(); + RTC_DCHECK(new_duration != TimeDelta::PlusInfinity()); + if (new_duration > TimeDelta::Zero() && new_duration != duration_) { + duration_ = new_duration; if (is_running_) { // Restart it with new duration. timeout_->Stop(); - DurationMs duration = + TimeDelta duration = GetBackoffDuration(options_, duration_, expiration_count_); generation_ = TimerGeneration(*generation_ + 1); - timeout_->Start(duration, MakeTimeoutId(id_, generation_)); + timeout_->Start(DurationMs(duration), MakeTimeoutId(id_, generation_)); } } } diff --git a/third_party/libwebrtc/net/dcsctp/timer/timer.h b/third_party/libwebrtc/net/dcsctp/timer/timer.h index 95aae570c8..30b07f9bfa 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/timer.h +++ b/third_party/libwebrtc/net/dcsctp/timer/timer.h @@ -22,6 +22,7 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "net/dcsctp/public/timeout.h" #include "rtc_base/strong_alias.h" @@ -40,30 +41,31 @@ enum class TimerBackoffAlgorithm { }; struct TimerOptions { - explicit TimerOptions(DurationMs duration) + explicit TimerOptions(webrtc::TimeDelta duration) : TimerOptions(duration, TimerBackoffAlgorithm::kExponential) {} - TimerOptions(DurationMs duration, TimerBackoffAlgorithm backoff_algorithm) + TimerOptions(webrtc::TimeDelta duration, + TimerBackoffAlgorithm backoff_algorithm) : TimerOptions(duration, backoff_algorithm, absl::nullopt) {} - TimerOptions(DurationMs duration, + TimerOptions(webrtc::TimeDelta duration, TimerBackoffAlgorithm backoff_algorithm, absl::optional<int> max_restarts) : TimerOptions(duration, backoff_algorithm, max_restarts, - DurationMs::InfiniteDuration()) {} - TimerOptions(DurationMs duration, + webrtc::TimeDelta::PlusInfinity()) {} + TimerOptions(webrtc::TimeDelta duration, TimerBackoffAlgorithm backoff_algorithm, absl::optional<int> max_restarts, - DurationMs max_backoff_duration) + webrtc::TimeDelta max_backoff_duration) : TimerOptions(duration, backoff_algorithm, max_restarts, max_backoff_duration, webrtc::TaskQueueBase::DelayPrecision::kLow) {} - TimerOptions(DurationMs duration, + TimerOptions(webrtc::TimeDelta duration, TimerBackoffAlgorithm backoff_algorithm, absl::optional<int> max_restarts, - DurationMs max_backoff_duration, + webrtc::TimeDelta max_backoff_duration, webrtc::TaskQueueBase::DelayPrecision precision) : duration(duration), backoff_algorithm(backoff_algorithm), @@ -72,7 +74,7 @@ struct TimerOptions { precision(precision) {} // The initial timer duration. Can be overridden with `set_duration`. - const DurationMs duration; + const webrtc::TimeDelta duration; // If the duration should be increased (using exponential backoff) when it is // restarted. If not set, the same duration will be used. const TimerBackoffAlgorithm backoff_algorithm; @@ -80,7 +82,7 @@ struct TimerOptions { // or absl::nullopt if there is no limit. const absl::optional<int> max_restarts; // The maximum timeout value for exponential backoff. - const DurationMs max_backoff_duration; + const webrtc::TimeDelta max_backoff_duration; // The precision of the webrtc::TaskQueueBase used for scheduling. const webrtc::TaskQueueBase::DelayPrecision precision; }; @@ -100,12 +102,14 @@ struct TimerOptions { class Timer { public: // The maximum timer duration - one day. - static constexpr DurationMs kMaxTimerDuration = DurationMs(24 * 3600 * 1000); + static constexpr webrtc::TimeDelta kMaxTimerDuration = + webrtc::TimeDelta::Seconds(24 * 3600); - // When expired, the timer handler can optionally return a new duration which - // will be set as `duration` and used as base duration when the timer is - // restarted and as input to the backoff algorithm. - using OnExpired = std::function<absl::optional<DurationMs>()>; + // When expired, the timer handler can optionally return a new non-zero + // duration which will be set as `duration` and used as base duration when the + // timer is restarted and as input to the backoff algorithm. If zero is + // returned, the current duration is used. + using OnExpired = std::function<webrtc::TimeDelta()>; // TimerManager will have pointers to these instances, so they must not move. Timer(const Timer&) = delete; @@ -123,13 +127,13 @@ class Timer { // Sets the base duration. The actual timer duration may be larger depending // on the backoff algorithm. - void set_duration(DurationMs duration) { + void set_duration(webrtc::TimeDelta duration) { duration_ = std::min(duration, kMaxTimerDuration); } // Retrieves the base duration. The actual timer duration may be larger // depending on the backoff algorithm. - DurationMs duration() const { return duration_; } + webrtc::TimeDelta duration() const { return duration_; } // Returns the number of times the timer has expired. int expiration_count() const { return expiration_count_; } @@ -167,7 +171,7 @@ class Timer { const UnregisterHandler unregister_handler_; const std::unique_ptr<Timeout> timeout_; - DurationMs duration_; + webrtc::TimeDelta duration_; // Increased on each start, and is matched on Trigger, to avoid races. And by // race, meaning that a timeout - which may be evaluated/expired on a diff --git a/third_party/libwebrtc/net/dcsctp/timer/timer_test.cc b/third_party/libwebrtc/net/dcsctp/timer/timer_test.cc index 93876160bb..9a7c029ec5 100644 --- a/third_party/libwebrtc/net/dcsctp/timer/timer_test.cc +++ b/third_party/libwebrtc/net/dcsctp/timer/timer_test.cc @@ -13,6 +13,7 @@ #include "absl/types/optional.h" #include "api/task_queue/task_queue_base.h" +#include "api/units/time_delta.h" #include "net/dcsctp/public/timeout.h" #include "net/dcsctp/timer/fake_timeout.h" #include "rtc_base/gunit.h" @@ -21,6 +22,8 @@ namespace dcsctp { namespace { using ::testing::Return; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; class TimerTest : public testing::Test { protected: @@ -29,10 +32,10 @@ class TimerTest : public testing::Test { manager_([this](webrtc::TaskQueueBase::DelayPrecision precision) { return timeout_manager_.CreateTimeout(precision); }) { - ON_CALL(on_expired_, Call).WillByDefault(Return(absl::nullopt)); + ON_CALL(on_expired_, Call).WillByDefault(Return(TimeDelta::Zero())); } - void AdvanceTimeAndRunTimers(DurationMs duration) { + void AdvanceTimeAndRunTimers(TimeDelta duration) { now_ = now_ + duration; for (;;) { @@ -45,16 +48,16 @@ class TimerTest : public testing::Test { } } - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager manager_; - testing::MockFunction<absl::optional<DurationMs>()> on_expired_; + testing::MockFunction<TimeDelta()> on_expired_; }; TEST_F(TimerTest, TimerIsInitiallyStopped) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed)); EXPECT_FALSE(t1->is_running()); } @@ -62,50 +65,50 @@ TEST_F(TimerTest, TimerIsInitiallyStopped) { TEST_F(TimerTest, TimerExpiresAtGivenTime) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); EXPECT_TRUE(t1->is_running()); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); } TEST_F(TimerTest, TimerReschedulesAfterExpiredWithFixedBackoff) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); EXPECT_EQ(t1->expiration_count(), 0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Fire first time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_EQ(t1->expiration_count(), 1); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Second time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_EQ(t1->expiration_count(), 2); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Third time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_EQ(t1->expiration_count(), 3); } @@ -113,151 +116,151 @@ TEST_F(TimerTest, TimerReschedulesAfterExpiredWithFixedBackoff) { TEST_F(TimerTest, TimerWithNoRestarts) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed, + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed, /*max_restart=*/0)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Fire first time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_FALSE(t1->is_running()); // Second time - shouldn't fire EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(5000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(5)); EXPECT_FALSE(t1->is_running()); } TEST_F(TimerTest, TimerWithOneRestart) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed, + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed, /*max_restart=*/1)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Fire first time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Second time - max restart limit reached. EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_FALSE(t1->is_running()); // Third time - should not fire. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(5000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(5)); EXPECT_FALSE(t1->is_running()); } TEST_F(TimerTest, TimerWithTwoRestart) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed, + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed, /*max_restart=*/2)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Fire first time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Second time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_TRUE(t1->is_running()); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Third time EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_FALSE(t1->is_running()); } TEST_F(TimerTest, TimerWithExponentialBackoff) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kExponential)); t1->Start(); // Fire first time at 5 seconds EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(5000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(5)); // Second time at 5*2^1 = 10 seconds later. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(9000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(9)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); // Third time at 5*2^2 = 20 seconds later. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(19000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(19)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); // Fourth time at 5*2^3 = 40 seconds later. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(39000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(39)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); } TEST_F(TimerTest, StartTimerWillStopAndStart) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kExponential)); t1->Start(); - AdvanceTimeAndRunTimers(DurationMs(3000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(3)); t1->Start(); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(2000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(2)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(3000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(3)); } TEST_F(TimerTest, ExpirationCounterWillResetIfStopped) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kExponential)); t1->Start(); // Fire first time at 5 seconds EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(5000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(5)); EXPECT_EQ(t1->expiration_count(), 1); // Second time at 5*2^1 = 10 seconds later. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(9000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(9)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_EQ(t1->expiration_count(), 2); t1->Start(); @@ -265,79 +268,79 @@ TEST_F(TimerTest, ExpirationCounterWillResetIfStopped) { // Third time at 5*2^0 = 5 seconds later. EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_EQ(t1->expiration_count(), 1); } TEST_F(TimerTest, StopTimerWillMakeItNotExpire) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kExponential)); t1->Start(); EXPECT_TRUE(t1->is_running()); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); t1->Stop(); EXPECT_FALSE(t1->is_running()); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); } TEST_F(TimerTest, ReturningNewDurationWhenExpired) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(5000), TimerBackoffAlgorithm::kFixed)); + TimerOptions(TimeDelta::Seconds(5), TimerBackoffAlgorithm::kFixed)); EXPECT_CALL(on_expired_, Call).Times(0); t1->Start(); - EXPECT_EQ(t1->duration(), DurationMs(5000)); + EXPECT_EQ(t1->duration(), TimeDelta::Seconds(5)); - AdvanceTimeAndRunTimers(DurationMs(4000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(4)); // Fire first time - EXPECT_CALL(on_expired_, Call).WillOnce(Return(DurationMs(2000))); - AdvanceTimeAndRunTimers(DurationMs(1000)); - EXPECT_EQ(t1->duration(), DurationMs(2000)); + EXPECT_CALL(on_expired_, Call).WillOnce(Return(TimeDelta::Seconds(2))); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); + EXPECT_EQ(t1->duration(), TimeDelta::Seconds(2)); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); // Second time - EXPECT_CALL(on_expired_, Call).WillOnce(Return(DurationMs(10000))); - AdvanceTimeAndRunTimers(DurationMs(1000)); - EXPECT_EQ(t1->duration(), DurationMs(10000)); + EXPECT_CALL(on_expired_, Call).WillOnce(Return(TimeDelta::Seconds(10))); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); + EXPECT_EQ(t1->duration(), TimeDelta::Seconds(10)); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(9000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(9)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); } TEST_F(TimerTest, TimersHaveMaximumDuration) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(1000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(1), TimerBackoffAlgorithm::kExponential)); - t1->set_duration(DurationMs(2 * *Timer::kMaxTimerDuration)); + t1->set_duration(2 * Timer::kMaxTimerDuration); EXPECT_EQ(t1->duration(), Timer::kMaxTimerDuration); } TEST_F(TimerTest, TimersHaveMaximumBackoffDuration) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(1000), TimerBackoffAlgorithm::kExponential)); + TimerOptions(TimeDelta::Seconds(1), TimerBackoffAlgorithm::kExponential)); t1->Start(); - int max_exponent = static_cast<int>(log2(*Timer::kMaxTimerDuration / 1000)); + int max_exponent = static_cast<int>(log2(Timer::kMaxTimerDuration.seconds())); for (int i = 0; i < max_exponent; ++i) { EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000 * (1 << i))); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1 * (1 << i))); } // Reached the maximum duration. @@ -357,77 +360,77 @@ TEST_F(TimerTest, TimersHaveMaximumBackoffDuration) { TEST_F(TimerTest, TimerCanBeStartedFromWithinExpirationHandler) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(1000), TimerBackoffAlgorithm::kFixed)); + TimerOptions(TimeDelta::Seconds(1), TimerBackoffAlgorithm::kFixed)); t1->Start(); // Start a timer, but don't return any new duration in callback. EXPECT_CALL(on_expired_, Call).WillOnce([&]() { EXPECT_TRUE(t1->is_running()); - t1->set_duration(DurationMs(5000)); + t1->set_duration(TimeDelta::Seconds(5)); t1->Start(); - return absl::nullopt; + return TimeDelta::Zero(); }); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(4999)); // Start a timer, and return any new duration in callback. EXPECT_CALL(on_expired_, Call).WillOnce([&]() { EXPECT_TRUE(t1->is_running()); - t1->set_duration(DurationMs(5000)); + t1->set_duration(TimeDelta::Seconds(5)); t1->Start(); - return absl::make_optional(DurationMs(8000)); + return TimeDelta::Seconds(8); }); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(7999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(7999)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); } TEST_F(TimerTest, DurationStaysWithinMaxTimerBackOffDuration) { std::unique_ptr<Timer> t1 = manager_.CreateTimer( "t1", on_expired_.AsStdFunction(), - TimerOptions(DurationMs(1000), TimerBackoffAlgorithm::kExponential, - /*max_restarts=*/absl::nullopt, DurationMs(5000))); + TimerOptions(TimeDelta::Seconds(1), TimerBackoffAlgorithm::kExponential, + /*max_restarts=*/absl::nullopt, TimeDelta::Seconds(5))); t1->Start(); // Initial timeout, 1000 ms EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1000)); + AdvanceTimeAndRunTimers(TimeDelta::Seconds(1)); // Exponential backoff -> 2000 ms EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(1999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1999)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); // Exponential backoff -> 4000 ms EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(3999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(3999)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); // Limited backoff -> 5000ms EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(4999)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); // ... where it plateaus EXPECT_CALL(on_expired_, Call).Times(0); - AdvanceTimeAndRunTimers(DurationMs(4999)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(4999)); EXPECT_CALL(on_expired_, Call).Times(1); - AdvanceTimeAndRunTimers(DurationMs(1)); + AdvanceTimeAndRunTimers(TimeDelta::Millis(1)); } TEST(TimerManagerTest, TimerManagerPassesPrecisionToCreateTimeoutMethod) { - FakeTimeoutManager timeout_manager([&]() { return TimeMs(0); }); + FakeTimeoutManager timeout_manager([&]() { return Timestamp::Zero(); }); absl::optional<webrtc::TaskQueueBase::DelayPrecision> create_timer_precison; TimerManager manager([&](webrtc::TaskQueueBase::DelayPrecision precision) { create_timer_precison = precision; @@ -435,22 +438,22 @@ TEST(TimerManagerTest, TimerManagerPassesPrecisionToCreateTimeoutMethod) { }); // Default TimerOptions. manager.CreateTimer( - "test_timer", []() { return absl::optional<DurationMs>(); }, - TimerOptions(DurationMs(123))); + "test_timer", []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Millis(123))); EXPECT_EQ(create_timer_precison, webrtc::TaskQueueBase::DelayPrecision::kLow); // High precision TimerOptions. manager.CreateTimer( - "test_timer", []() { return absl::optional<DurationMs>(); }, - TimerOptions(DurationMs(123), TimerBackoffAlgorithm::kExponential, - absl::nullopt, DurationMs::InfiniteDuration(), + "test_timer", []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Millis(123), TimerBackoffAlgorithm::kExponential, + absl::nullopt, TimeDelta::PlusInfinity(), webrtc::TaskQueueBase::DelayPrecision::kHigh)); EXPECT_EQ(create_timer_precison, webrtc::TaskQueueBase::DelayPrecision::kHigh); // Low precision TimerOptions. manager.CreateTimer( - "test_timer", []() { return absl::optional<DurationMs>(); }, - TimerOptions(DurationMs(123), TimerBackoffAlgorithm::kExponential, - absl::nullopt, DurationMs::InfiniteDuration(), + "test_timer", []() { return TimeDelta::Zero(); }, + TimerOptions(TimeDelta::Millis(123), TimerBackoffAlgorithm::kExponential, + absl::nullopt, TimeDelta::PlusInfinity(), webrtc::TaskQueueBase::DelayPrecision::kLow)); EXPECT_EQ(create_timer_precison, webrtc::TaskQueueBase::DelayPrecision::kLow); } diff --git a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn index 5547ffa870..d1fd8ab3d5 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn +++ b/third_party/libwebrtc/net/dcsctp/tx/BUILD.gn @@ -11,6 +11,7 @@ import("../../../webrtc.gni") rtc_source_set("send_queue") { deps = [ "../../../api:array_view", + "../../../api/units:timestamp", "../common:internal_types", "../packet:chunk", "../packet:data", @@ -28,9 +29,9 @@ rtc_library("rr_send_queue") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_map", "../common:internal_types", - "../common:str_join", "../packet:data", "../public:socket", "../public:types", @@ -53,9 +54,9 @@ rtc_library("stream_scheduler") { "../../../api:array_view", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_set", - "../common:str_join", "../packet:chunk", "../packet:data", "../packet:sctp_packet", @@ -89,6 +90,7 @@ rtc_library("retransmission_error_counter") { rtc_library("retransmission_timeout") { deps = [ + "../../../api/units:time_delta", "../../../rtc_base:checks", "../public:types", ] @@ -103,13 +105,15 @@ rtc_library("outstanding_data") { ":retransmission_timeout", ":send_queue", "../../../api:array_view", + "../../../api/units:time_delta", + "../../../api/units:timestamp", "../../../rtc_base:checks", "../../../rtc_base:logging", + "../../../rtc_base:stringutils", "../../../rtc_base/containers:flat_set", "../common:internal_types", "../common:math", "../common:sequence_numbers", - "../common:str_join", "../packet:chunk", "../packet:data", "../public:socket", @@ -138,7 +142,6 @@ rtc_library("retransmission_queue") { "../../../rtc_base:stringutils", "../common:math", "../common:sequence_numbers", - "../common:str_join", "../packet:chunk", "../packet:data", "../public:socket", @@ -162,6 +165,7 @@ if (rtc_include_tests) { deps = [ ":send_queue", "../../../api:array_view", + "../../../api/units:timestamp", "../../../test:test_support", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] diff --git a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h index 04921866ae..3511403eab 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h @@ -15,6 +15,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/tx/send_queue.h" #include "test/gmock.h" @@ -23,14 +24,15 @@ namespace dcsctp { class MockSendQueue : public SendQueue { public: MockSendQueue() { - ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) { - return absl::nullopt; - }); + ON_CALL(*this, Produce) + .WillByDefault([](webrtc::Timestamp now, size_t max_size) { + return absl::nullopt; + }); } MOCK_METHOD(absl::optional<SendQueue::DataToSend>, Produce, - (TimeMs now, size_t max_size), + (webrtc::Timestamp now, size_t max_size), (override)); MOCK_METHOD(bool, Discard, diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc index c2706bd0d2..ca639abc54 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.cc @@ -14,12 +14,16 @@ #include <utility> #include <vector> +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/public/types.h" +#include "rtc_base/checks.h" #include "rtc_base/logging.h" namespace dcsctp { +using ::webrtc::Timestamp; // The number of times a packet must be NACKed before it's retransmitted. // See https://tools.ietf.org/html/rfc4960#section-7.2.4 @@ -63,18 +67,18 @@ void OutstandingData::Item::MarkAsRetransmitted() { } void OutstandingData::Item::Abandon() { - RTC_DCHECK(expires_at_ != TimeMs::InfiniteFuture() || + RTC_DCHECK(!expires_at_.IsPlusInfinity() || max_retransmissions_ != MaxRetransmits::NoLimit()); lifecycle_ = Lifecycle::kAbandoned; } -bool OutstandingData::Item::has_expired(TimeMs now) const { +bool OutstandingData::Item::has_expired(Timestamp now) const { return expires_at_ <= now; } bool OutstandingData::IsConsistent() const { - size_t actual_outstanding_bytes = 0; - size_t actual_outstanding_items = 0; + size_t actual_unacked_bytes = 0; + size_t actual_unacked_items = 0; std::set<UnwrappedTSN> combined_to_be_retransmitted; combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(), @@ -83,10 +87,12 @@ bool OutstandingData::IsConsistent() const { to_be_fast_retransmitted_.end()); std::set<UnwrappedTSN> actual_combined_to_be_retransmitted; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (item.is_outstanding()) { - actual_outstanding_bytes += GetSerializedChunkSize(item.data()); - ++actual_outstanding_items; + actual_unacked_bytes += GetSerializedChunkSize(item.data()); + ++actual_unacked_items; } if (item.should_be_retransmitted()) { @@ -94,33 +100,28 @@ bool OutstandingData::IsConsistent() const { } } - if (outstanding_data_.empty() && - next_tsn_ != last_cumulative_tsn_ack_.next_value()) { - return false; - } - - return actual_outstanding_bytes == outstanding_bytes_ && - actual_outstanding_items == outstanding_items_ && + return actual_unacked_bytes == unacked_bytes_ && + actual_unacked_items == unacked_items_ && actual_combined_to_be_retransmitted == combined_to_be_retransmitted; } void OutstandingData::AckChunk(AckInfo& ack_info, - std::map<UnwrappedTSN, Item>::iterator iter) { - if (!iter->second.is_acked()) { - size_t serialized_size = GetSerializedChunkSize(iter->second.data()); + UnwrappedTSN tsn, + Item& item) { + if (!item.is_acked()) { + size_t serialized_size = GetSerializedChunkSize(item.data()); ack_info.bytes_acked += serialized_size; - if (iter->second.is_outstanding()) { - outstanding_bytes_ -= serialized_size; - --outstanding_items_; + if (item.is_outstanding()) { + unacked_bytes_ -= serialized_size; + --unacked_items_; } - if (iter->second.should_be_retransmitted()) { - RTC_DCHECK(to_be_fast_retransmitted_.find(iter->first) == + if (item.should_be_retransmitted()) { + RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) == to_be_fast_retransmitted_.end()); - to_be_retransmitted_.erase(iter->first); + to_be_retransmitted_.erase(tsn); } - iter->second.Ack(); - ack_info.highest_tsn_acked = - std::max(ack_info.highest_tsn_acked, iter->first); + item.Ack(); + ack_info.highest_tsn_acked = std::max(ack_info.highest_tsn_acked, tsn); } } @@ -143,24 +144,43 @@ OutstandingData::AckInfo OutstandingData::HandleSack( return ack_info; } +OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + +const OutstandingData::Item& OutstandingData::GetItem(UnwrappedTSN tsn) const { + RTC_DCHECK(tsn > last_cumulative_tsn_ack_); + RTC_DCHECK(tsn < next_tsn()); + int index = UnwrappedTSN::Difference(tsn, last_cumulative_tsn_ack_) - 1; + RTC_DCHECK(index >= 0); + RTC_DCHECK(index < static_cast<int>(outstanding_data_.size())); + return outstanding_data_[index]; +} + void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info) { - auto first_unacked = outstanding_data_.upper_bound(cumulative_tsn_ack); - - for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) { - AckChunk(ack_info, iter); - if (iter->second.lifecycle_id().IsSet()) { - RTC_DCHECK(iter->second.data().is_end); - if (iter->second.is_abandoned()) { - ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id()); + while (!outstanding_data_.empty() && + last_cumulative_tsn_ack_ < cumulative_tsn_ack) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_.next_value(); + Item& item = outstanding_data_.front(); + AckChunk(ack_info, tsn, item); + if (item.lifecycle_id().IsSet()) { + RTC_DCHECK(item.data().is_end); + if (item.is_abandoned()) { + ack_info.abandoned_lifecycle_ids.push_back(item.lifecycle_id()); } else { - ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id()); + ack_info.acked_lifecycle_ids.push_back(item.lifecycle_id()); } } + outstanding_data_.pop_front(); + last_cumulative_tsn_ack_.Increment(); } - outstanding_data_.erase(outstanding_data_.begin(), first_unacked); - last_cumulative_tsn_ack_ = cumulative_tsn_ack; stream_reset_breakpoint_tsns_.erase(stream_reset_breakpoint_tsns_.begin(), stream_reset_breakpoint_tsns_.upper_bound( cumulative_tsn_ack.next_value())); @@ -176,12 +196,13 @@ void OutstandingData::AckGapBlocks( // handled differently. for (auto& block : gap_ack_blocks) { - auto start = outstanding_data_.lower_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start)); - auto end = outstanding_data_.upper_bound( - UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end)); - for (auto iter = start; iter != end; ++iter) { - AckChunk(ack_info, iter); + UnwrappedTSN start = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); + UnwrappedTSN end = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); + for (UnwrappedTSN tsn = start; tsn <= end; tsn = tsn.next_value()) { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + Item& item = GetItem(tsn); + AckChunk(ack_info, tsn, item); + } } } } @@ -216,13 +237,12 @@ void OutstandingData::NackBetweenAckBlocks( for (auto& block : gap_ack_blocks) { UnwrappedTSN cur_block_first_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.start); - for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked); - iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) { - if (iter->first <= max_tsn_to_nack) { - ack_info.has_packet_loss |= - NackItem(iter->first, iter->second, /*retransmit_now=*/false, - /*do_fast_retransmit=*/!is_in_fast_recovery); - } + for (UnwrappedTSN tsn = prev_block_last_acked.next_value(); + tsn < cur_block_first_acked && tsn <= max_tsn_to_nack; + tsn = tsn.next_value()) { + ack_info.has_packet_loss |= + NackItem(tsn, /*retransmit_now=*/false, + /*do_fast_retransmit=*/!is_in_fast_recovery); } prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end); } @@ -234,12 +254,12 @@ void OutstandingData::NackBetweenAckBlocks( } bool OutstandingData::NackItem(UnwrappedTSN tsn, - Item& item, bool retransmit_now, bool do_fast_retransmit) { + Item& item = GetItem(tsn); if (item.is_outstanding()) { - outstanding_bytes_ -= GetSerializedChunkSize(item.data()); - --outstanding_items_; + unacked_bytes_ -= GetSerializedChunkSize(item.data()); + --unacked_items_; } switch (item.Nack(retransmit_now)) { @@ -272,28 +292,25 @@ void OutstandingData::AbandonAllFor(const Item& item) { // skipped over). So create a new fragment, representing the end, that the // received will never see as it is abandoned immediately and used as cum // TSN in the sent FORWARD-TSN. - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); Data message_end(item.data().stream_id, item.data().ssn, item.data().mid, item.data().fsn, item.data().ppid, std::vector<uint8_t>(), Data::IsBeginning(false), Data::IsEnd(true), item.data().is_unordered); - Item& added_item = - outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple( - item.message_id(), std::move(message_end), TimeMs(0), - MaxRetransmits(0), TimeMs::InfiniteFuture(), - LifecycleId::NotSet())) - .first->second; - // The added chunk shouldn't be included in `outstanding_bytes`, so set it + UnwrappedTSN tsn = next_tsn(); + Item& added_item = outstanding_data_.emplace_back( + item.message_id(), std::move(message_end), Timestamp::Zero(), + MaxRetransmits(0), Timestamp::PlusInfinity(), LifecycleId::NotSet()); + + // The added chunk shouldn't be included in `unacked_bytes`, so set it // as acked. added_item.Ack(); RTC_DLOG(LS_VERBOSE) << "Adding unsent end placeholder for message at tsn=" << *tsn.Wrap(); } - for (auto& [tsn, other] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (Item& other : outstanding_data_) { + tsn.Increment(); if (!other.is_abandoned() && other.data().stream_id == item.data().stream_id && other.message_id() == item.message_id()) { @@ -315,9 +332,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( for (auto it = chunks.begin(); it != chunks.end();) { UnwrappedTSN tsn = *it; - auto elem = outstanding_data_.find(tsn); - RTC_DCHECK(elem != outstanding_data_.end()); - Item& item = elem->second; + Item& item = GetItem(tsn); RTC_DCHECK(item.should_be_retransmitted()); RTC_DCHECK(!item.is_outstanding()); RTC_DCHECK(!item.is_abandoned()); @@ -328,8 +343,8 @@ std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit( item.MarkAsRetransmitted(); result.emplace_back(tsn.Wrap(), item.data().Clone()); max_size -= serialized_size; - outstanding_bytes_ += serialized_size; - ++outstanding_items_; + unacked_bytes_ += serialized_size; + ++unacked_items_; it = chunks.erase(it); } else { ++it; @@ -370,8 +385,10 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted( return ExtractChunksThatCanFit(to_be_retransmitted_, max_size); } -void OutstandingData::ExpireOutstandingChunks(TimeMs now) { - for (const auto& [tsn, item] : outstanding_data_) { +void OutstandingData::ExpireOutstandingChunks(Timestamp now) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); // Chunks that are nacked can be expired. Care should be taken not to expire // unacked (in-flight) chunks as they might have been received, but the SACK // is either delayed or in-flight and may be received later. @@ -391,38 +408,33 @@ void OutstandingData::ExpireOutstandingChunks(TimeMs now) { } UnwrappedTSN OutstandingData::highest_outstanding_tsn() const { - return outstanding_data_.empty() ? last_cumulative_tsn_ack_ - : outstanding_data_.rbegin()->first; + return UnwrappedTSN::AddTo(last_cumulative_tsn_ack_, + outstanding_data_.size()); } absl::optional<UnwrappedTSN> OutstandingData::Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + Timestamp expires_at, LifecycleId lifecycle_id) { - UnwrappedTSN tsn = next_tsn_; - next_tsn_.Increment(); - // All chunks are always padded to be even divisible by 4. size_t chunk_size = GetSerializedChunkSize(data); - outstanding_bytes_ += chunk_size; - ++outstanding_items_; - auto it = outstanding_data_ - .emplace(std::piecewise_construct, std::forward_as_tuple(tsn), - std::forward_as_tuple(message_id, data.Clone(), - time_sent, max_retransmissions, - expires_at, lifecycle_id)) - .first; - - if (it->second.has_expired(time_sent)) { + unacked_bytes_ += chunk_size; + ++unacked_items_; + UnwrappedTSN tsn = next_tsn(); + Item& item = outstanding_data_.emplace_back(message_id, data.Clone(), + time_sent, max_retransmissions, + expires_at, lifecycle_id); + + if (item.has_expired(time_sent)) { // No need to send it - it was expired when it was in the send // queue. - RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " - << *it->first.Wrap() << " and message " - << *it->second.data().mid << " as expired"; - AbandonAllFor(it->second); + RTC_DLOG(LS_VERBOSE) << "Marking freshly produced chunk " << *tsn.Wrap() + << " and message " << *item.data().mid + << " as expired"; + AbandonAllFor(item); RTC_DCHECK(IsConsistent()); return absl::nullopt; } @@ -432,34 +444,47 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert( } void OutstandingData::NackAll() { - for (auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + // A two-pass algorithm is needed, as NackItem will invalidate iterators. + std::vector<UnwrappedTSN> tsns_to_nack; + for (Item& item : outstanding_data_) { + tsn.Increment(); if (!item.is_acked()) { - NackItem(tsn, item, /*retransmit_now=*/true, - /*do_fast_retransmit=*/false); + tsns_to_nack.push_back(tsn); } } + + for (UnwrappedTSN tsn : tsns_to_nack) { + NackItem(tsn, /*retransmit_now=*/true, + /*do_fast_retransmit=*/false); + } + RTC_DCHECK(IsConsistent()); } -absl::optional<DurationMs> OutstandingData::MeasureRTT(TimeMs now, - UnwrappedTSN tsn) const { - auto it = outstanding_data_.find(tsn); - if (it != outstanding_data_.end() && !it->second.has_been_retransmitted()) { - // https://tools.ietf.org/html/rfc4960#section-6.3.1 - // "Karn's algorithm: RTT measurements MUST NOT be made using - // packets that were retransmitted (and thus for which it is ambiguous - // whether the reply was for the first instance of the chunk or for a - // later instance)" - return now - it->second.time_sent(); +webrtc::TimeDelta OutstandingData::MeasureRTT(Timestamp now, + UnwrappedTSN tsn) const { + if (tsn > last_cumulative_tsn_ack_ && tsn < next_tsn()) { + const Item& item = GetItem(tsn); + if (!item.has_been_retransmitted()) { + // https://tools.ietf.org/html/rfc4960#section-6.3.1 + // "Karn's algorithm: RTT measurements MUST NOT be made using + // packets that were retransmitted (and thus for which it is ambiguous + // whether the reply was for the first instance of the chunk or for a + // later instance)" + return now - item.time_sent(); + } } - return absl::nullopt; + return webrtc::TimeDelta::PlusInfinity(); } std::vector<std::pair<TSN, OutstandingData::State>> OutstandingData::GetChunkStatesForTesting() const { std::vector<std::pair<TSN, State>> states; states.emplace_back(last_cumulative_tsn_ack_.Wrap(), State::kAcked); - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); State state; if (item.is_abandoned()) { state = State::kAbandoned; @@ -480,9 +505,7 @@ OutstandingData::GetChunkStatesForTesting() const { bool OutstandingData::ShouldSendForwardTsn() const { if (!outstanding_data_.empty()) { - auto it = outstanding_data_.begin(); - return it->first == last_cumulative_tsn_ack_.next_value() && - it->second.is_abandoned(); + return outstanding_data_.front().is_abandoned(); } return false; } @@ -491,7 +514,9 @@ ForwardTsnChunk OutstandingData::CreateForwardTsn() const { std::map<StreamID, SSN> skipped_per_ordered_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -515,7 +540,9 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::map<std::pair<IsUnordered, StreamID>, MID> skipped_per_stream; UnwrappedTSN new_cumulative_ack = last_cumulative_tsn_ack_; - for (const auto& [tsn, item] : outstanding_data_) { + UnwrappedTSN tsn = last_cumulative_tsn_ack_; + for (const Item& item : outstanding_data_) { + tsn.Increment(); if (stream_reset_breakpoint_tsns_.contains(tsn) || (tsn != new_cumulative_ack.next_value()) || !item.is_abandoned()) { break; @@ -539,16 +566,12 @@ IForwardTsnChunk OutstandingData::CreateIForwardTsn() const { std::move(skipped_streams)); } -void OutstandingData::ResetSequenceNumbers(UnwrappedTSN next_tsn, - UnwrappedTSN last_cumulative_tsn) { +void OutstandingData::ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn) { RTC_DCHECK(outstanding_data_.empty()); - RTC_DCHECK(next_tsn_ == last_cumulative_tsn_ack_.next_value()); - RTC_DCHECK(next_tsn == last_cumulative_tsn.next_value()); - next_tsn_ = next_tsn; last_cumulative_tsn_ack_ = last_cumulative_tsn; } void OutstandingData::BeginResetStreams() { - stream_reset_breakpoint_tsns_.insert(next_tsn_); + stream_reset_breakpoint_tsns_.insert(next_tsn()); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h index f8e939661d..2a214975e6 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data.h @@ -10,12 +10,14 @@ #ifndef NET_DCSCTP_TX_OUTSTANDING_DATA_H_ #define NET_DCSCTP_TX_OUTSTANDING_DATA_H_ +#include <deque> #include <map> #include <set> #include <utility> #include <vector> #include "absl/types/optional.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" @@ -29,6 +31,9 @@ namespace dcsctp { // This class keeps track of outstanding data chunks (sent, not yet acked) and // handles acking, nacking, rescheduling and abandoning. +// +// Items are added to this queue as they are sent and will be removed when the +// peer acks them using the cumulative TSN ack. class OutstandingData { public: // State for DATA chunks (message fragments) in the queue - used in tests. @@ -74,11 +79,9 @@ class OutstandingData { OutstandingData( size_t data_chunk_header_size, - UnwrappedTSN next_tsn, UnwrappedTSN last_cumulative_tsn_ack, std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue) : data_chunk_header_size_(data_chunk_header_size), - next_tsn_(next_tsn), last_cumulative_tsn_ack_(last_cumulative_tsn_ack), discard_from_send_queue_(std::move(discard_from_send_queue)) {} @@ -98,14 +101,14 @@ class OutstandingData { // it? std::vector<std::pair<TSN, Data>> GetChunksToBeRetransmitted(size_t max_size); - size_t outstanding_bytes() const { return outstanding_bytes_; } + size_t unacked_bytes() const { return unacked_bytes_; } - // Returns the number of DATA chunks that are in-flight. - size_t outstanding_items() const { return outstanding_items_; } + // Returns the number of DATA chunks that are in-flight (not acked or nacked). + size_t unacked_items() const { return unacked_items_; } // Given the current time `now_ms`, expire and abandon outstanding (sent at // least once) chunks that have a limited lifetime. - void ExpireOutstandingChunks(TimeMs now); + void ExpireOutstandingChunks(webrtc::Timestamp now); bool empty() const { return outstanding_data_.empty(); } @@ -121,7 +124,9 @@ class OutstandingData { return last_cumulative_tsn_ack_; } - UnwrappedTSN next_tsn() const { return next_tsn_; } + UnwrappedTSN next_tsn() const { + return highest_outstanding_tsn().next_value(); + } UnwrappedTSN highest_outstanding_tsn() const; @@ -131,9 +136,9 @@ class OutstandingData { absl::optional<UnwrappedTSN> Insert( OutgoingMessageId message_id, const Data& data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(), - TimeMs expires_at = TimeMs::InfiniteFuture(), + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(), LifecycleId lifecycle_id = LifecycleId::NotSet()); // Nacks all outstanding data. @@ -147,8 +152,8 @@ class OutstandingData { // Given the current time and a TSN, it returns the measured RTT between when // the chunk was sent and now. It takes into acccount Karn's algorithm, so if - // the chunk has ever been retransmitted, it will return absl::nullopt. - absl::optional<DurationMs> MeasureRTT(TimeMs now, UnwrappedTSN tsn) const; + // the chunk has ever been retransmitted, it will return `PlusInfinity()`. + webrtc::TimeDelta MeasureRTT(webrtc::Timestamp now, UnwrappedTSN tsn) const; // Returns the internal state of all queued chunks. This is only used in // unit-tests. @@ -159,8 +164,7 @@ class OutstandingData { bool ShouldSendForwardTsn() const; // Sets the next TSN to be used. This is used in handover. - void ResetSequenceNumbers(UnwrappedTSN next_tsn, - UnwrappedTSN last_cumulative_tsn); + void ResetSequenceNumbers(UnwrappedTSN last_cumulative_tsn); // Called when an outgoing stream reset is sent, marking the last assigned TSN // as a breakpoint that a FORWARD-TSN shouldn't cross. @@ -179,9 +183,9 @@ class OutstandingData { Item(OutgoingMessageId message_id, Data data, - TimeMs time_sent, + webrtc::Timestamp time_sent, MaxRetransmits max_retransmissions, - TimeMs expires_at, + webrtc::Timestamp expires_at, LifecycleId lifecycle_id) : message_id_(message_id), time_sent_(time_sent), @@ -195,7 +199,7 @@ class OutstandingData { OutgoingMessageId message_id() const { return message_id_; } - TimeMs time_sent() const { return time_sent_; } + webrtc::Timestamp time_sent() const { return time_sent_; } const Data& data() const { return data_; } @@ -229,7 +233,7 @@ class OutstandingData { // Given the current time, and the current state of this DATA chunk, it will // indicate if it has expired (SCTP Partial Reliability Extension). - bool has_expired(TimeMs now) const; + bool has_expired(webrtc::Timestamp now) const; LifecycleId lifecycle_id() const { return lifecycle_id_; } @@ -258,7 +262,7 @@ class OutstandingData { const OutgoingMessageId message_id_; // When the packet was sent, and placed in this queue. - const TimeMs time_sent_; + const webrtc::Timestamp time_sent_; // If the message was sent with a maximum number of retransmissions, this is // set to that number. The value zero (0) means that it will never be // retransmitted. @@ -278,7 +282,7 @@ class OutstandingData { // At this exact millisecond, the item is considered expired. If the message // is not to be expired, this is set to the infinite future. - const TimeMs expires_at_; + const webrtc::Timestamp expires_at_; // An optional lifecycle id, which may only be set for the last fragment. const LifecycleId lifecycle_id_; @@ -290,6 +294,9 @@ class OutstandingData { // Returns how large a chunk will be, serialized, carrying the data size_t GetSerializedChunkSize(const Data& data) const; + Item& GetItem(UnwrappedTSN tsn); + const Item& GetItem(UnwrappedTSN tsn) const; + // Given a `cumulative_tsn_ack` from an incoming SACK, will remove those items // in the retransmission queue up until this value and will update `ack_info` // by setting `bytes_acked_by_cumulative_tsn_ack`. @@ -313,7 +320,7 @@ class OutstandingData { // Process the acknowledgement of the chunk referenced by `iter` and updates // state in `ack_info` and the object's state. - void AckChunk(AckInfo& ack_info, std::map<UnwrappedTSN, Item>::iterator iter); + void AckChunk(AckInfo& ack_info, UnwrappedTSN tsn, Item& item); // Helper method to process an incoming nack of an item and perform the // correct operations given the action indicated when nacking an item (e.g. @@ -323,10 +330,11 @@ class OutstandingData { // many times so that it should be retransmitted, this will schedule it to be // "fast retransmitted". This is only done just before going into fast // recovery. - bool NackItem(UnwrappedTSN tsn, - Item& item, - bool retransmit_now, - bool do_fast_retransmit); + // + // Note that since nacking an item may result in it becoming abandoned, which + // in turn could alter `outstanding_data_`, any iterators are invalidated + // after having called this method. + bool NackItem(UnwrappedTSN tsn, bool retransmit_now, bool do_fast_retransmit); // Given that a message fragment, `item` has been abandoned, abandon all other // fragments that share the same message - both never-before-sent fragments @@ -341,19 +349,20 @@ class OutstandingData { // The size of the data chunk (DATA/I-DATA) header that is used. const size_t data_chunk_header_size_; - // Next TSN to used. - UnwrappedTSN next_tsn_; // The last cumulative TSN ack number. UnwrappedTSN last_cumulative_tsn_ack_; // Callback when to discard items from the send queue. std::function<bool(StreamID, OutgoingMessageId)> discard_from_send_queue_; - std::map<UnwrappedTSN, Item> outstanding_data_; + // Outstanding items. If non-empty, the first element has + // `TSN=last_cumulative_tsn_ack_ + 1` and the following items are in strict + // increasing TSN order. The last item has `TSN=highest_outstanding_tsn()`. + std::deque<Item> outstanding_data_; // The number of bytes that are in-flight (sent but not yet acked or nacked). - size_t outstanding_bytes_ = 0; + size_t unacked_bytes_ = 0; // The number of DATA chunks that are in-flight (sent but not yet acked or // nacked). - size_t outstanding_items_ = 0; + size_t unacked_items_ = 0; // Data chunks that are eligible for fast retransmission. std::set<UnwrappedTSN> to_be_fast_retransmitted_; // Data chunks that are to be retransmitted. diff --git a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc index b8c2e593a1..e4bdb7ce7e 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/outstanding_data_test.cc @@ -37,8 +37,10 @@ using ::testing::Property; using ::testing::Return; using ::testing::StrictMock; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow(42); +constexpr Timestamp kNow = Timestamp::Millis(42); constexpr OutgoingMessageId kMessageId = OutgoingMessageId(17); class OutstandingDataTest : public testing::Test { @@ -46,7 +48,6 @@ class OutstandingDataTest : public testing::Test { OutstandingDataTest() : gen_(MID(42)), buf_(DataChunk::kHeaderSize, - unwrapper_.Unwrap(TSN(10)), unwrapper_.Unwrap(TSN(9)), on_discard_.AsStdFunction()) {} @@ -58,8 +59,8 @@ class OutstandingDataTest : public testing::Test { TEST_F(OutstandingDataTest, HasInitialState) { EXPECT_TRUE(buf_.empty()); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(10)); @@ -75,8 +76,8 @@ TEST_F(OutstandingDataTest, InsertChunk) { EXPECT_EQ(tsn.Wrap(), TSN(10)); - EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); - EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -95,8 +96,8 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) { EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(10)); EXPECT_FALSE(ack.has_packet_loss); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(10)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -109,8 +110,8 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) { buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false); - EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); - EXPECT_EQ(buf_.outstanding_items(), 1u); + EXPECT_EQ(buf_.unacked_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1)); + EXPECT_EQ(buf_.unacked_items(), 1u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(11)); @@ -131,8 +132,8 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) { EXPECT_EQ(ack.highest_tsn_acked.Wrap(), TSN(11)); EXPECT_FALSE(ack.has_packet_loss); - EXPECT_EQ(buf_.outstanding_bytes(), 0u); - EXPECT_EQ(buf_.outstanding_items(), 0u); + EXPECT_EQ(buf_.unacked_bytes(), 0u); + EXPECT_EQ(buf_.unacked_items(), 0u); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); EXPECT_EQ(buf_.last_cumulative_tsn_ack().Wrap(), TSN(9)); EXPECT_EQ(buf_.next_tsn().Wrap(), TSN(12)); @@ -277,20 +278,20 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) { } TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) { - static constexpr TimeMs kExpiresAt = kNow + DurationMs(1); + static constexpr Timestamp kExpiresAt = kNow + TimeDelta::Millis(1); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, "B"), kNow, MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_TRUE(buf_.Insert(kMessageId, gen_.Ordered({1}, ""), - kNow + DurationMs(0), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(0), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_CALL(on_discard_, Call(StreamID(1), kMessageId)) .WillOnce(Return(false)); EXPECT_FALSE(buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), - kNow + DurationMs(1), MaxRetransmits::NoLimit(), - kExpiresAt) + kNow + TimeDelta::Millis(1), + MaxRetransmits::NoLimit(), kExpiresAt) .has_value()); EXPECT_FALSE(buf_.has_data_to_be_retransmitted()); @@ -362,15 +363,14 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) { TEST_F(OutstandingDataTest, MeasureRTT) { buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(1)); - buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + DurationMs(2)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(1)); + buf_.Insert(kMessageId, gen_.Ordered({1}, "BE"), kNow + TimeDelta::Millis(2)); - static constexpr DurationMs kDuration(123); - ASSERT_HAS_VALUE_AND_ASSIGN( - DurationMs duration, - buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11)))); + static constexpr TimeDelta kDuration = TimeDelta::Millis(123); + TimeDelta duration = + buf_.MeasureRTT(kNow + kDuration, unwrapper_.Unwrap(TSN(11))); - EXPECT_EQ(duration, kDuration - DurationMs(1)); + EXPECT_EQ(duration, kDuration - TimeDelta::Millis(1)); } TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { @@ -453,13 +453,13 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) { TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) { buf_.Insert(OutgoingMessageId(1), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(42)); buf_.Insert(OutgoingMessageId(2), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(43)); buf_.Insert(OutgoingMessageId(3), gen_.Ordered({1}, "BE"), kNow, - MaxRetransmits::NoLimit(), TimeMs::InfiniteFuture(), + MaxRetransmits::NoLimit(), Timestamp::PlusInfinity(), LifecycleId(44)); OutstandingData::AckInfo ack1 = @@ -479,7 +479,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)}; EXPECT_FALSE( @@ -515,7 +515,7 @@ TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) { buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, ""), kNow, MaxRetransmits(0)); buf_.Insert(kMessageId, gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0), - TimeMs::InfiniteFuture(), LifecycleId(42)); + Timestamp::PlusInfinity(), LifecycleId(42)); EXPECT_THAT(buf_.GetChunkStatesForTesting(), testing::ElementsAre(Pair(TSN(9), State::kAcked), // diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc index 2b9843f4a7..8c0d227a36 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.cc @@ -25,7 +25,6 @@ #include "api/array_view.h" #include "net/dcsctp/common/math.h" #include "net/dcsctp/common/sequence_numbers.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/data_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_chunk.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" @@ -40,10 +39,13 @@ #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" #include "rtc_base/strings/string_builder.h" namespace dcsctp { namespace { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; // Allow sending only slightly less than an MTU, to account for headers. constexpr float kMinBytesRequiredToSendFactor = 0.9; @@ -55,7 +57,7 @@ RetransmissionQueue::RetransmissionQueue( TSN my_initial_tsn, size_t a_rwnd, SendQueue& send_queue, - std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void(TimeDelta rtt)> on_new_rtt, std::function<void()> on_clear_retransmission_counter, Timer& t3_rtx, const DcSctpOptions& options, @@ -84,7 +86,6 @@ RetransmissionQueue::RetransmissionQueue( send_queue_(send_queue), outstanding_data_( data_chunk_header_size_, - tsn_unwrapper_.Unwrap(my_initial_tsn), tsn_unwrapper_.Unwrap(TSN(*my_initial_tsn - 1)), [this](StreamID stream_id, OutgoingMessageId message_id) { return send_queue_.Discard(stream_id, message_id); @@ -114,12 +115,12 @@ void RetransmissionQueue::MaybeExitFastRecovery( } void RetransmissionQueue::HandleIncreasedCumulativeTsnAck( - size_t outstanding_bytes, + size_t unacked_bytes, size_t total_bytes_acked) { // Allow some margin for classifying as fully utilized, due to e.g. that too // small packets (less than kMinimumFragmentedPayload) are not sent + // overhead. - bool is_fully_utilized = outstanding_bytes + options_.mtu >= cwnd_; + bool is_fully_utilized = unacked_bytes + options_.mtu >= cwnd_; size_t old_cwnd = cwnd_; if (phase() == CongestionAlgorithmPhase::kSlowStart) { if (is_fully_utilized && !is_in_fast_recovery()) { @@ -204,13 +205,13 @@ void RetransmissionQueue::HandlePacketLoss(UnwrappedTSN highest_tsn_acked) { } void RetransmissionQueue::UpdateReceiverWindow(uint32_t a_rwnd) { - rwnd_ = outstanding_data_.outstanding_bytes() >= a_rwnd + rwnd_ = outstanding_data_.unacked_bytes() >= a_rwnd ? 0 - : a_rwnd - outstanding_data_.outstanding_bytes(); + : a_rwnd - outstanding_data_.unacked_bytes(); } void RetransmissionQueue::StartT3RtxTimerIfOutstandingData() { - // Note: Can't use `outstanding_bytes()` as that one doesn't count chunks to + // Note: Can't use `unacked_bytes()` as that one doesn't count chunks to // be retransmitted. if (outstanding_data_.empty()) { // https://tools.ietf.org/html/rfc4960#section-6.3.2 @@ -257,14 +258,14 @@ bool RetransmissionQueue::IsSackValid(const SackChunk& sack) const { return true; } -bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { +bool RetransmissionQueue::HandleSack(Timestamp now, const SackChunk& sack) { if (!IsSackValid(sack)) { return false; } UnwrappedTSN old_last_cumulative_tsn_ack = outstanding_data_.last_cumulative_tsn_ack(); - size_t old_outstanding_bytes = outstanding_data_.outstanding_bytes(); + size_t old_unacked_bytes = outstanding_data_.unacked_bytes(); size_t old_rwnd = rwnd_; UnwrappedTSN cumulative_tsn_ack = tsn_unwrapper_.Unwrap(sack.cumulative_tsn_ack()); @@ -301,9 +302,9 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Received SACK, cum_tsn_ack=" << *cumulative_tsn_ack.Wrap() << " (" << *old_last_cumulative_tsn_ack.Wrap() - << "), outstanding_bytes=" - << outstanding_data_.outstanding_bytes() << " (" - << old_outstanding_bytes << "), rwnd=" << rwnd_ << " (" + << "), unacked_bytes=" + << outstanding_data_.unacked_bytes() << " (" + << old_unacked_bytes << "), rwnd=" << rwnd_ << " (" << old_rwnd << ")"; if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) { @@ -315,8 +316,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { // Note: It may be started again in a bit further down. t3_rtx_.Stop(); - HandleIncreasedCumulativeTsnAck(old_outstanding_bytes, - ack_info.bytes_acked); + HandleIncreasedCumulativeTsnAck(old_unacked_bytes, ack_info.bytes_acked); } if (ack_info.has_packet_loss) { @@ -335,7 +335,7 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) { return true; } -void RetransmissionQueue::UpdateRTT(TimeMs now, +void RetransmissionQueue::UpdateRTT(Timestamp now, UnwrappedTSN cumulative_tsn_ack) { // RTT updating is flawed in SCTP, as explained in e.g. Pedersen J, Griwodz C, // Halvorsen P (2006) Considerations of SCTP retransmission delays for thin @@ -345,17 +345,16 @@ void RetransmissionQueue::UpdateRTT(TimeMs now, // TODO(boivie): Consider occasionally sending DATA chunks with I-bit set and // use only those packets for measurement. - absl::optional<DurationMs> rtt = - outstanding_data_.MeasureRTT(now, cumulative_tsn_ack); + TimeDelta rtt = outstanding_data_.MeasureRTT(now, cumulative_tsn_ack); - if (rtt.has_value()) { - on_new_rtt_(*rtt); + if (rtt.IsFinite()) { + on_new_rtt_(rtt); } } void RetransmissionQueue::HandleT3RtxTimerExpiry() { size_t old_cwnd = cwnd_; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); // https://tools.ietf.org/html/rfc4960#section-6.3.3 // "For the destination address for which the timer expires, adjust // its ssthresh with rules defined in Section 7.2.3 and set the cwnd <- MTU." @@ -392,8 +391,8 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() { RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_ << " (" << old_cwnd << "), ssthresh=" << ssthresh_ - << ", outstanding_bytes " << outstanding_bytes() << " (" - << old_outstanding_bytes << ")"; + << ", unacked_bytes " << unacked_bytes() << " (" + << old_unacked_bytes << ")"; RTC_DCHECK(IsConsistent()); } @@ -402,7 +401,7 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) { RTC_DCHECK(outstanding_data_.has_data_to_be_fast_retransmitted()); RTC_DCHECK(IsDivisibleBy4(bytes_in_packet)); std::vector<std::pair<TSN, Data>> to_be_sent; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); to_be_sent = outstanding_data_.GetChunksToBeFastRetransmitted(bytes_in_packet); @@ -441,21 +440,21 @@ RetransmissionQueue::GetChunksForFastRetransmit(size_t bytes_in_packet) { sb << *c.first; }) << " - " << bytes_retransmitted - << " bytes. outstanding_bytes=" << outstanding_bytes() - << " (" << old_outstanding_bytes << ")"; + << " bytes. unacked_bytes=" << unacked_bytes() << " (" + << old_unacked_bytes << ")"; RTC_DCHECK(IsConsistent()); return to_be_sent; } std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( - TimeMs now, + Timestamp now, size_t bytes_remaining_in_packet) { // Chunks are always padded to even divisible by four. RTC_DCHECK(IsDivisibleBy4(bytes_remaining_in_packet)); std::vector<std::pair<TSN, Data>> to_be_sent; - size_t old_outstanding_bytes = outstanding_bytes(); + size_t old_unacked_bytes = unacked_bytes(); size_t old_rwnd = rwnd_; // Calculate the bandwidth budget (how many bytes that is @@ -494,7 +493,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( chunk_opt->message_id, chunk_opt->data, now, partial_reliability_ ? chunk_opt->max_retransmissions : MaxRetransmits::NoLimit(), - partial_reliability_ ? chunk_opt->expires_at : TimeMs::InfiniteFuture(), + partial_reliability_ ? chunk_opt->expires_at + : Timestamp::PlusInfinity(), chunk_opt->lifecycle_id); if (tsn.has_value()) { @@ -526,8 +526,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend( [&](size_t r, const std::pair<TSN, Data>& d) { return r + GetSerializedChunkSize(d.second); }) - << " bytes. outstanding_bytes=" << outstanding_bytes() - << " (" << old_outstanding_bytes << "), cwnd=" << cwnd_ + << " bytes. unacked_bytes=" << unacked_bytes() << " (" + << old_unacked_bytes << "), cwnd=" << cwnd_ << ", rwnd=" << rwnd_ << " (" << old_rwnd << ")"; } RTC_DCHECK(IsConsistent()); @@ -539,7 +539,7 @@ bool RetransmissionQueue::can_send_data() const { max_bytes_to_send() >= min_bytes_required_to_send_; } -bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { +bool RetransmissionQueue::ShouldSendForwardTsn(Timestamp now) { if (!partial_reliability_) { return false; } @@ -550,9 +550,9 @@ bool RetransmissionQueue::ShouldSendForwardTsn(TimeMs now) { } size_t RetransmissionQueue::max_bytes_to_send() const { - size_t left = outstanding_bytes() >= cwnd_ ? 0 : cwnd_ - outstanding_bytes(); + size_t left = unacked_bytes() >= cwnd_ ? 0 : cwnd_ - unacked_bytes(); - if (outstanding_bytes() == 0) { + if (unacked_bytes() == 0) { // https://datatracker.ietf.org/doc/html/rfc4960#section-6.1 // ... However, regardless of the value of rwnd (including if it is 0), the // data sender can always have one DATA chunk in flight to the receiver if @@ -619,7 +619,6 @@ void RetransmissionQueue::RestoreFromState( partial_bytes_acked_ = state.tx.partial_bytes_acked; outstanding_data_.ResetSequenceNumbers( - tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn)), tsn_unwrapper_.Unwrap(TSN(state.tx.next_tsn - 1))); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h index b44db2a9a0..a0fbb33c47 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue.h @@ -60,7 +60,7 @@ class RetransmissionQueue { TSN my_initial_tsn, size_t a_rwnd, SendQueue& send_queue, - std::function<void(DurationMs rtt)> on_new_rtt, + std::function<void(webrtc::TimeDelta rtt)> on_new_rtt, std::function<void()> on_clear_retransmission_counter, Timer& t3_rtx, const DcSctpOptions& options, @@ -69,7 +69,7 @@ class RetransmissionQueue { // Handles a received SACK. Returns true if the `sack` was processed and // false if it was discarded due to received out-of-order and not relevant. - bool HandleSack(TimeMs now, const SackChunk& sack); + bool HandleSack(webrtc::Timestamp now, const SackChunk& sack); // Handles an expired retransmission timer. void HandleT3RtxTimerExpiry(); @@ -90,7 +90,7 @@ class RetransmissionQueue { // called prior to this method, to abandon expired chunks, as this method will // not expire any chunks. std::vector<std::pair<TSN, Data>> GetChunksToSend( - TimeMs now, + webrtc::Timestamp now, size_t bytes_remaining_in_packet); // Returns the internal state of all queued chunks. This is only used in @@ -121,14 +121,10 @@ class RetransmissionQueue { uint64_t rtx_bytes_count() const { return rtx_bytes_count_; } // Returns the number of bytes of packets that are in-flight. - size_t outstanding_bytes() const { - return outstanding_data_.outstanding_bytes(); - } + size_t unacked_bytes() const { return outstanding_data_.unacked_bytes(); } // Returns the number of DATA chunks that are in-flight. - size_t outstanding_items() const { - return outstanding_data_.outstanding_items(); - } + size_t unacked_items() const { return outstanding_data_.unacked_items(); } // Indicates if the congestion control algorithm allows data to be sent. bool can_send_data() const; @@ -136,7 +132,7 @@ class RetransmissionQueue { // Given the current time `now`, it will evaluate if there are chunks that // have expired and that need to be discarded. It returns true if a // FORWARD-TSN should be sent. - bool ShouldSendForwardTsn(TimeMs now); + bool ShouldSendForwardTsn(webrtc::Timestamp now); // Creates a FORWARD-TSN chunk. ForwardTsnChunk CreateForwardTsn() const { @@ -185,7 +181,7 @@ class RetransmissionQueue { // When a SACK chunk is received, this method will be called which _may_ call // into the `RetransmissionTimeout` to update the RTO. - void UpdateRTT(TimeMs now, UnwrappedTSN cumulative_tsn_ack); + void UpdateRTT(webrtc::Timestamp now, UnwrappedTSN cumulative_tsn_ack); // If the congestion control is in "fast recovery mode", this may be exited // now. @@ -197,7 +193,7 @@ class RetransmissionQueue { // Update the congestion control algorithm given as the cumulative ack TSN // value has increased, as reported in an incoming SACK chunk. - void HandleIncreasedCumulativeTsnAck(size_t outstanding_bytes, + void HandleIncreasedCumulativeTsnAck(size_t unacked_bytes, size_t total_bytes_acked); // Update the congestion control algorithm, given as packet loss has been // detected, as reported in an incoming SACK chunk. @@ -230,7 +226,7 @@ class RetransmissionQueue { // The size of the data chunk (DATA/I-DATA) header that is used. const size_t data_chunk_header_size_; // Called when a new RTT measurement has been done - const std::function<void(DurationMs rtt)> on_new_rtt_; + const std::function<void(webrtc::TimeDelta rtt)> on_new_rtt_; // Called when a SACK has been seen that cleared the retransmission counter. const std::function<void()> on_clear_retransmission_counter_; // The retransmission counter. diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc index d50494f084..eb1e04a5bb 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_queue_test.cc @@ -52,6 +52,8 @@ using ::testing::Pair; using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; constexpr uint32_t kArwnd = 100000; constexpr uint32_t kMaxMtu = 1191; @@ -74,12 +76,12 @@ class RetransmissionQueueTest : public testing::Test { }), timer_(timer_manager_.CreateTimer( "test/t3_rtx", - []() { return absl::nullopt; }, - TimerOptions(options_.rto_initial))) {} + []() { return TimeDelta::Zero(); }, + TimerOptions(options_.rto_initial.ToTimeDelta()))) {} - std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk( + std::function<SendQueue::DataToSend(Timestamp, size_t)> CreateChunk( OutgoingMessageId message_id) { - return [this, message_id](TimeMs now, size_t max_size) { + return [this, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend(message_id, gen_.Ordered({1, 2, 3, 4}, "BE")); }; @@ -127,10 +129,10 @@ class RetransmissionQueueTest : public testing::Test { MockDcSctpSocketCallbacks callbacks_; DcSctpOptions options_; DataGenerator gen_; - TimeMs now_ = TimeMs(0); + Timestamp now_ = Timestamp::Zero(); FakeTimeoutManager timeout_manager_; TimerManager timer_manager_; - NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_; + NiceMock<MockFunction<void(TimeDelta rtt_ms)>> on_rtt_; NiceMock<MockFunction<void()>> on_clear_retransmission_counter_; NiceMock<MockSendQueue> producer_; std::unique_ptr<Timer> timer_; @@ -146,7 +148,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunk) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -159,7 +161,7 @@ TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10))); @@ -175,7 +177,7 @@ TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); @@ -198,7 +200,7 @@ TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -229,7 +231,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -241,7 +243,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18))); // Ack 12, 14-15, 17-18 @@ -262,7 +264,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19))); // Ack 12, 14-15, 17-19 @@ -274,7 +276,7 @@ TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20))); // Ack 12, 14-15, 17-20 @@ -321,16 +323,16 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); - static constexpr TimeMs kStartTime(100000); + static constexpr Timestamp kStartTime = Timestamp::Seconds(100); now_ = kStartTime; EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12))); // Ack 10, 12, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {})); @@ -342,22 +344,22 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Send 13 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(3))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13))); // Ack 10, 12-13, after 100ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {})); // Send 14 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14))); // Ack 10, 12-14, after 100 ms. - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); queue.HandleSack( now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {})); @@ -383,11 +385,11 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { // Verify that the timer was really restarted when fast-retransmitting. The // timeout is `options_.rto_initial`, so advance the time just before that. - now_ += options_.rto_initial - DurationMs(1); + now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1); EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value()); // And ensure it really is running. - now_ += DurationMs(1); + now_ += TimeDelta::Millis(1); ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout, timeout_manager_.GetNextExpiredTimeout()); // An expired timeout has to be handled (asserts validate this). @@ -397,15 +399,15 @@ TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) { TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(1), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -420,11 +422,11 @@ TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) { TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -459,12 +461,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { RetransmissionQueue queue = CreateQueue(/*supports_partial_reliability=*/false); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -488,12 +490,12 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -529,12 +531,12 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(3); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -577,16 +579,16 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { static constexpr size_t kCwnd = 1200; queue.set_cwnd(kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); std::vector<uint8_t> payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1500); @@ -594,8 +596,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); - EXPECT_EQ(queue.outstanding_items(), 1u); + EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); + EXPECT_EQ(queue.unacked_items(), 1u); // Will force chunks to be retransmitted queue.HandleT3RtxTimerExpiry(); @@ -603,8 +605,8 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kToBeRetransmitted))); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); std::vector<std::pair<TSN, Data>> chunks_to_rtx = queue.GetChunksToSend(now_, 1500); @@ -612,30 +614,30 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) { EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(9), State::kAcked), // Pair(TSN(10), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); - EXPECT_EQ(queue.outstanding_items(), 1u); + EXPECT_EQ(queue.unacked_bytes(), payload.size() + DataChunk::kHeaderSize); + EXPECT_EQ(queue.unacked_items(), 1u); } TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -675,23 +677,23 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "E")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -729,7 +731,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(1); SendQueue::DataToSend dts(OutgoingMessageId(42), @@ -737,7 +739,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(2); SendQueue::DataToSend dts(OutgoingMessageId(43), @@ -745,7 +747,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(3); SendQueue::DataToSend dts(OutgoingMessageId(44), @@ -753,7 +755,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { DataGeneratorOptions opts; opts.stream_id = StreamID(4); SendQueue::DataToSend dts(OutgoingMessageId(45), @@ -761,7 +763,7 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); @@ -850,21 +852,21 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { TEST_F(RetransmissionQueueTest, MeasureRTT) { RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(OutgoingMessageId(0), gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1000); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); - now_ = now_ + DurationMs(123); + now_ = now_ + TimeDelta::Millis(123); - EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1); + EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); } @@ -888,7 +890,7 @@ TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -918,7 +920,7 @@ TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -965,7 +967,7 @@ TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), @@ -995,14 +997,14 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the // magic numbers in this test. EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize); std::vector<uint8_t> payload(183); return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillOnce([this](TimeMs, size_t size) { + .WillOnce([this](Timestamp, size_t size) { EXPECT_EQ(size, 976 - DataChunk::kHeaderSize); std::vector<uint8_t> payload(957); @@ -1018,23 +1020,23 @@ TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) { TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({9, 10, 11, 12}, "")); dts.max_retransmissions = MaxRetransmits(0); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Send and ack first chunk (TSN 10) std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1046,8 +1048,8 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { Pair(TSN(10), State::kInFlight), // Pair(TSN(11), State::kInFlight), // Pair(TSN(12), State::kInFlight))); - EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); - EXPECT_EQ(queue.outstanding_items(), 3u); + EXPECT_EQ(queue.unacked_bytes(), (16 + 4) * 3u); + EXPECT_EQ(queue.unacked_items(), 3u); // Mark the message as lost. EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1); @@ -1060,21 +1062,21 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) { Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned))); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); // Now ACK those, one at a time. queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); - EXPECT_EQ(queue.outstanding_bytes(), 0u); - EXPECT_EQ(queue.outstanding_items(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); + EXPECT_EQ(queue.unacked_items(), 0u); } TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { @@ -1082,21 +1084,21 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { DataGeneratorOptions options; options.stream_id = StreamID(17); options.mid = MID(42); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, "", options)); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 24); @@ -1104,7 +1106,7 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId)) .WillOnce(Return(true)); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); @@ -1118,38 +1120,38 @@ TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { RetransmissionQueue queue = CreateQueue(); - TimeMs test_start = now_; + Timestamp test_start = now_; EXPECT_CALL(producer_, Produce) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(42), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(43), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) // Stream reset - MID reset to zero again. - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillOnce([&](TimeMs, size_t) { + .WillOnce([&](Timestamp, size_t) { SendQueue::DataToSend dts( OutgoingMessageId(44), gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)})); - dts.expires_at = TimeMs(test_start + DurationMs(10)); + dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10)); return dts; }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44))) .WillOnce(Return(true)); @@ -1160,7 +1162,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { EXPECT_THAT(queue.GetChunksToSend(now_, 24), ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0))))); - now_ += DurationMs(100); + now_ += TimeDelta::Millis(100); EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty()); EXPECT_THAT( @@ -1176,7 +1178,7 @@ TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) { TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(0); return dts; @@ -1184,7 +1186,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) { .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) .WillOnce(CreateChunk(OutgoingMessageId(2))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1246,7 +1248,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { // This is a fairly long test. RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) - .WillOnce([this](TimeMs, size_t) { + .WillOnce([this](Timestamp, size_t) { SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE")); dts.max_retransmissions = MaxRetransmits(2); return dts; @@ -1260,7 +1262,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) { .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); @@ -1386,17 +1388,17 @@ TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) { std::vector<uint8_t> payload(1000); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); std::vector<std::pair<TSN, Data>> chunks_to_send = queue.GetChunksToSend(now_, 1500); EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _))); size_t serialized_size = payload.size() + DataChunk::kHeaderSize; - EXPECT_EQ(queue.outstanding_bytes(), serialized_size); + EXPECT_EQ(queue.unacked_bytes(), serialized_size); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); @@ -1414,12 +1416,12 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1433,7 +1435,7 @@ TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) { queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); EXPECT_TRUE(queue.can_send_data()); - EXPECT_EQ(queue.outstanding_bytes(), 0u); + EXPECT_EQ(queue.unacked_bytes(), 0u); EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu); } @@ -1447,12 +1449,12 @@ TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) { // Fill the congestion window almost - leaving 500 bytes. size_t chunk_size = intial_cwnd - 500; EXPECT_CALL(producer_, Produce) - .WillOnce([chunk_size, this](TimeMs, size_t) { + .WillOnce([chunk_size, this](Timestamp, size_t) { return SendQueue::DataToSend( OutgoingMessageId(0), gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_TRUE(queue.can_send_data()); std::vector<std::pair<TSN, Data>> chunks_to_send = @@ -1467,7 +1469,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) { RetransmissionQueue queue = CreateQueue(); EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); EXPECT_EQ( @@ -1490,7 +1492,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { .WillOnce(CreateChunk(OutgoingMessageId(5))) .WillOnce(CreateChunk(OutgoingMessageId(6))) .WillOnce(CreateChunk(OutgoingMessageId(7))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8)); EXPECT_EQ( queue.GetHandoverReadiness(), @@ -1503,7 +1505,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 18 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(8))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-18 @@ -1515,7 +1517,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 19 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(9))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-19 @@ -1527,7 +1529,7 @@ TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) { // Send 20 EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(10))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1)); // Ack 12, 14-15, 17-20 @@ -1563,7 +1565,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { EXPECT_CALL(producer_, Produce) .WillOnce(CreateChunk(OutgoingMessageId(0))) .WillOnce(CreateChunk(OutgoingMessageId(1))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2)); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); @@ -1574,7 +1576,7 @@ TEST_F(RetransmissionQueueTest, HandoverTest) { .WillOnce(CreateChunk(OutgoingMessageId(2))) .WillOnce(CreateChunk(OutgoingMessageId(3))) .WillOnce(CreateChunk(OutgoingMessageId(4))) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); EXPECT_THAT(GetSentPacketTSNs(*handedover_queue), testing::ElementsAre(TSN(12), TSN(13), TSN(14))); @@ -1592,27 +1594,27 @@ TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) { std::vector<uint8_t> payload(mtu - 100); EXPECT_CALL(producer_, Produce) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "B")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "")); }) - .WillOnce([this, payload](TimeMs, size_t) { + .WillOnce([this, payload](Timestamp, size_t) { return SendQueue::DataToSend(OutgoingMessageId(0), gen_.Ordered(payload, "E")); }) - .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; }); // Produce all chunks and put them in the retransmission queue. std::vector<std::pair<TSN, Data>> chunks_to_send = diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc index 7d8fb9761c..8af77041a5 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.cc @@ -12,28 +12,29 @@ #include <algorithm> #include <cstdint> +#include "api/units/time_delta.h" #include "net/dcsctp/public/dcsctp_options.h" namespace dcsctp { RetransmissionTimeout::RetransmissionTimeout(const DcSctpOptions& options) - : min_rto_(*options.rto_min), - max_rto_(*options.rto_max), - max_rtt_(*options.rtt_max), + : min_rto_(options.rto_min.ToTimeDelta()), + max_rto_(options.rto_max.ToTimeDelta()), + max_rtt_(options.rtt_max.ToTimeDelta()), min_rtt_variance_(*options.min_rtt_variance), scaled_srtt_(*options.rto_initial << kRttShift), rto_(*options.rto_initial) {} -void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { - const int32_t rtt = *measured_rtt; - +void RetransmissionTimeout::ObserveRTT(webrtc::TimeDelta measured_rtt) { // Unrealistic values will be skipped. If a wrongly measured (or otherwise // corrupt) value was processed, it could change the state in a way that would // take a very long time to recover. - if (rtt < 0 || rtt > max_rtt_) { + if (measured_rtt < webrtc::TimeDelta::Zero() || measured_rtt > max_rtt_) { return; } + const int64_t rtt = measured_rtt.ms(); + // From https://tools.ietf.org/html/rfc4960#section-6.3.1, but avoiding // floating point math by implementing algorithm from "V. Jacobson: Congestion // avoidance and control", but adapted for SCTP. @@ -42,7 +43,7 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { scaled_rtt_var_ = (rtt / 2) << kRttVarShift; first_measurement_ = false; } else { - int32_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift); + int64_t rtt_diff = rtt - (scaled_srtt_ >> kRttShift); scaled_srtt_ += rtt_diff; if (rtt_diff < 0) { rtt_diff = -rtt_diff; @@ -58,6 +59,6 @@ void RetransmissionTimeout::ObserveRTT(DurationMs measured_rtt) { rto_ = (scaled_srtt_ >> kRttShift) + scaled_rtt_var_; // Clamp RTO between min and max. - rto_ = std::min(std::max(rto_, min_rto_), max_rto_); + rto_ = std::min(std::max(rto_, min_rto_.ms()), max_rto_.ms()); } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h index 01530cb3b5..b4b0fd7fef 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout.h @@ -32,27 +32,29 @@ class RetransmissionTimeout { explicit RetransmissionTimeout(const DcSctpOptions& options); // To be called when a RTT has been measured, to update the RTO value. - void ObserveRTT(DurationMs measured_rtt); + void ObserveRTT(webrtc::TimeDelta measured_rtt); // Returns the Retransmission Timeout (RTO) value, in milliseconds. - DurationMs rto() const { return DurationMs(rto_); } + webrtc::TimeDelta rto() const { return webrtc::TimeDelta::Millis(rto_); } // Returns the smoothed RTT value, in milliseconds. - DurationMs srtt() const { return DurationMs(scaled_srtt_ >> kRttShift); } + webrtc::TimeDelta srtt() const { + return webrtc::TimeDelta::Millis(scaled_srtt_ >> kRttShift); + } private: - const int32_t min_rto_; - const int32_t max_rto_; - const int32_t max_rtt_; - const int32_t min_rtt_variance_; + const webrtc::TimeDelta min_rto_; + const webrtc::TimeDelta max_rto_; + const webrtc::TimeDelta max_rtt_; + const int64_t min_rtt_variance_; // If this is the first measurement bool first_measurement_ = true; // Smoothed Round-Trip Time, shifted by kRttShift - int32_t scaled_srtt_; + int64_t scaled_srtt_; // Round-Trip Time Variation, shifted by kRttVarShift - int32_t scaled_rtt_var_ = 0; + int64_t scaled_rtt_var_ = 0; // Retransmission Timeout - int32_t rto_; + int64_t rto_; }; } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc index b901995e97..7754578f32 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/retransmission_timeout_test.cc @@ -15,20 +15,21 @@ namespace dcsctp { namespace { +using ::webrtc::TimeDelta; -constexpr DurationMs kMaxRtt = DurationMs(8'000); -constexpr DurationMs kInitialRto = DurationMs(200); -constexpr DurationMs kMaxRto = DurationMs(800); -constexpr DurationMs kMinRto = DurationMs(120); -constexpr DurationMs kMinRttVariance = DurationMs(220); +constexpr TimeDelta kMaxRtt = TimeDelta::Millis(8'000); +constexpr TimeDelta kInitialRto = TimeDelta::Millis(200); +constexpr TimeDelta kMaxRto = TimeDelta::Millis(800); +constexpr TimeDelta kMinRto = TimeDelta::Millis(120); +constexpr TimeDelta kMinRttVariance = TimeDelta::Millis(220); DcSctpOptions MakeOptions() { DcSctpOptions options; - options.rtt_max = kMaxRtt; - options.rto_initial = kInitialRto; - options.rto_max = kMaxRto; - options.rto_min = kMinRto; - options.min_rtt_variance = kMinRttVariance; + options.rtt_max = DurationMs(kMaxRtt); + options.rto_initial = DurationMs(kInitialRto); + options.rto_max = DurationMs(kMaxRto); + options.rto_min = DurationMs(kMinRto); + options.min_rtt_variance = DurationMs(kMinRttVariance); return options; } @@ -45,31 +46,31 @@ TEST(RetransmissionTimeoutTest, HasValidInitialSrtt) { TEST(RetransmissionTimeoutTest, NegativeValuesDoNotAffectRTO) { RetransmissionTimeout rto_(MakeOptions()); // Initial negative value - rto_.ObserveRTT(DurationMs(-10)); + rto_.ObserveRTT(TimeDelta::Millis(-10)); EXPECT_EQ(rto_.rto(), kInitialRto); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); // Subsequent negative value - rto_.ObserveRTT(DurationMs(-10)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(-10)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); } TEST(RetransmissionTimeoutTest, TooLargeValuesDoNotAffectRTO) { RetransmissionTimeout rto_(MakeOptions()); // Initial too large value - rto_.ObserveRTT(kMaxRtt + DurationMs(100)); + rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100)); EXPECT_EQ(rto_.rto(), kInitialRto); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); // Subsequent too large value - rto_.ObserveRTT(kMaxRtt + DurationMs(100)); - EXPECT_EQ(*rto_.rto(), 372); + rto_.ObserveRTT(kMaxRtt + TimeDelta::Millis(100)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); } TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(1)); + rto_.ObserveRTT(TimeDelta::Millis(1)); } EXPECT_GE(rto_.rto(), kMinRto); } @@ -77,67 +78,67 @@ TEST(RetransmissionTimeoutTest, WillNeverGoBelowMinimumRto) { TEST(RetransmissionTimeoutTest, WillNeverGoAboveMaximumRto) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(kMaxRtt - DurationMs(1)); + rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(1)); // Adding jitter, which would make it RTO be well above RTT. - rto_.ObserveRTT(kMaxRtt - DurationMs(100)); + rto_.ObserveRTT(kMaxRtt - TimeDelta::Millis(100)); } EXPECT_LE(rto_.rto(), kMaxRto); } TEST(RetransmissionTimeoutTest, CalculatesRtoForStableRtt) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(128)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(123)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(125)); - EXPECT_EQ(*rto_.rto(), 344); - rto_.ObserveRTT(DurationMs(127)); - EXPECT_EQ(*rto_.rto(), 344); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(128)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(123)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(125)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); + rto_.ObserveRTT(TimeDelta::Millis(127)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); } TEST(RetransmissionTimeoutTest, CalculatesRtoForUnstableRtt) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(402)); - EXPECT_EQ(*rto_.rto(), 622); - rto_.ObserveRTT(DurationMs(728)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(89)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(126)); - EXPECT_EQ(*rto_.rto(), 800); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(402)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(622)); + rto_.ObserveRTT(TimeDelta::Millis(728)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(89)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(126)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); } TEST(RetransmissionTimeoutTest, WillStabilizeAfterAWhile) { RetransmissionTimeout rto_(MakeOptions()); - rto_.ObserveRTT(DurationMs(124)); - rto_.ObserveRTT(DurationMs(402)); - rto_.ObserveRTT(DurationMs(728)); - rto_.ObserveRTT(DurationMs(89)); - rto_.ObserveRTT(DurationMs(126)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 800); - rto_.ObserveRTT(DurationMs(122)); - EXPECT_EQ(*rto_.rto(), 710); - rto_.ObserveRTT(DurationMs(123)); - EXPECT_EQ(*rto_.rto(), 631); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 562); - rto_.ObserveRTT(DurationMs(122)); - EXPECT_EQ(*rto_.rto(), 505); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 454); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 410); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 372); - rto_.ObserveRTT(DurationMs(124)); - EXPECT_EQ(*rto_.rto(), 367); + rto_.ObserveRTT(TimeDelta::Millis(124)); + rto_.ObserveRTT(TimeDelta::Millis(402)); + rto_.ObserveRTT(TimeDelta::Millis(728)); + rto_.ObserveRTT(TimeDelta::Millis(89)); + rto_.ObserveRTT(TimeDelta::Millis(126)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(800)); + rto_.ObserveRTT(TimeDelta::Millis(122)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(710)); + rto_.ObserveRTT(TimeDelta::Millis(123)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(631)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(562)); + rto_.ObserveRTT(TimeDelta::Millis(122)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(505)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(454)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(410)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(372)); + rto_.ObserveRTT(TimeDelta::Millis(124)); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(367)); } TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) { @@ -149,31 +150,33 @@ TEST(RetransmissionTimeoutTest, WillAlwaysStayAboveRTT) { RetransmissionTimeout rto_(MakeOptions()); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 344); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(344)); } TEST(RetransmissionTimeoutTest, CanSpecifySmallerMinimumRttVariance) { DcSctpOptions options = MakeOptions(); - options.min_rtt_variance = kMinRttVariance - DurationMs(100); + options.min_rtt_variance = + DurationMs(kMinRttVariance - TimeDelta::Millis(100)); RetransmissionTimeout rto_(options); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 244); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(244)); } TEST(RetransmissionTimeoutTest, CanSpecifyLargerMinimumRttVariance) { DcSctpOptions options = MakeOptions(); - options.min_rtt_variance = kMinRttVariance + DurationMs(100); + options.min_rtt_variance = + DurationMs(kMinRttVariance + TimeDelta::Millis(100)); RetransmissionTimeout rto_(options); for (int i = 0; i < 1000; ++i) { - rto_.ObserveRTT(DurationMs(124)); + rto_.ObserveRTT(TimeDelta::Millis(124)); } - EXPECT_EQ(*rto_.rto(), 444); + EXPECT_EQ(rto_.rto(), TimeDelta::Millis(444)); } } // namespace diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc index facb432c59..7cbead296c 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc @@ -21,15 +21,17 @@ #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/common/internal_types.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, @@ -137,7 +139,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, } absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce( - TimeMs now, + Timestamp now, size_t max_size) { RTC_DCHECK(pause_state_ != PauseState::kPaused && pause_state_ != PauseState::kResetting); @@ -349,7 +351,7 @@ bool RRSendQueue::OutgoingStream::has_partially_sent_message() const { return items_.front().mid.has_value(); } -void RRSendQueue::Add(TimeMs now, +void RRSendQueue::Add(Timestamp now, DcSctpMessage message, const SendOptions& send_options) { RTC_DCHECK(!message.payload().empty()); @@ -366,8 +368,9 @@ void RRSendQueue::Add(TimeMs now, ? MaxRetransmits(send_options.max_retransmissions.value()) : MaxRetransmits::NoLimit(), .expires_at = send_options.lifetime.has_value() - ? now + *send_options.lifetime + DurationMs(1) - : TimeMs::InfiniteFuture(), + ? now + send_options.lifetime->ToTimeDelta() + + TimeDelta::Millis(1) + : Timestamp::PlusInfinity(), .lifecycle_id = send_options.lifecycle_id, }; GetOrCreateStreamInfo(message.stream_id()) @@ -383,7 +386,7 @@ bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; } -absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now, +absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(Timestamp now, size_t max_size) { return scheduler_.Produce(now, max_size); } diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h index bef5fe437d..b6c359dc1e 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h @@ -71,12 +71,13 @@ class RRSendQueue : public SendQueue { // time should be in `now`. Note that it's the responsibility of the caller to // ensure that the buffer is not full (by calling `IsFull`) before adding // messages to it. - void Add(TimeMs now, + void Add(webrtc::Timestamp now, DcSctpMessage message, const SendOptions& send_options = {}); // Implementation of `SendQueue`. - absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override; + absl::optional<DataToSend> Produce(webrtc::Timestamp now, + size_t max_size) override; bool Discard(StreamID stream_id, OutgoingMessageId message_id) override; void PrepareResetStream(StreamID streams) override; bool HasStreamsReadyToBeReset() const override; @@ -104,7 +105,7 @@ class RRSendQueue : public SendQueue { struct MessageAttributes { IsUnordered unordered; MaxRetransmits max_retransmissions; - TimeMs expires_at; + webrtc::Timestamp expires_at; LifecycleId lifecycle_id; }; @@ -154,7 +155,7 @@ class RRSendQueue : public SendQueue { void Add(DcSctpMessage message, MessageAttributes attributes); // Implementing `StreamScheduler::StreamProducer`. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, size_t max_size) override; size_t bytes_to_send_in_next_message() const override; @@ -265,7 +266,7 @@ class RRSendQueue : public SendQueue { OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); absl::optional<DataToSend> Produce( std::map<StreamID, OutgoingStream>::iterator it, - TimeMs now, + webrtc::Timestamp now, size_t max_size); const absl::string_view log_prefix_; diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc index 9d6da7bdff..632cd8fc19 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc @@ -29,8 +29,10 @@ namespace dcsctp { namespace { using ::testing::SizeIs; using ::testing::UnorderedElementsAre; +using ::webrtc::TimeDelta; +using ::webrtc::Timestamp; -constexpr TimeMs kNow = TimeMs(0); +constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); constexpr size_t kMaxQueueSize = 1000; @@ -181,9 +183,9 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { std::vector<uint8_t> payload(20); // Default is no expiry - TimeMs now = kNow; + Timestamp now = kNow; buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); SendOptions expires_2_seconds; @@ -191,17 +193,17 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { // Add and consume within lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2000); + now += TimeDelta::Millis(2000); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); // Add and consume just outside lifetime buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // A long time after expiry buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); - now += DurationMs(1000000); + now += TimeDelta::Seconds(1000); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); // Expire one message, but produce the second that is not expired. @@ -211,7 +213,7 @@ TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) { expires_4_seconds.lifetime = DurationMs(4000); buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); - now += DurationMs(2001); + now += TimeDelta::Millis(2001); ASSERT_TRUE(buf_.Produce(now, kOneFragmentPacketSize)); ASSERT_FALSE(buf_.Produce(now, kOneFragmentPacketSize)); @@ -846,8 +848,9 @@ TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) { EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1), /*maybe_delivered=*/false)); EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1))); - EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize) - .has_value()); + EXPECT_FALSE( + buf_.Produce(kNow + TimeDelta::Millis(1001), kOneFragmentPacketSize) + .has_value()); } TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) { diff --git a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h index 48eaefaf6a..d0d834c901 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h @@ -17,6 +17,7 @@ #include "absl/types/optional.h" #include "api/array_view.h" +#include "api/units/timestamp.h" #include "net/dcsctp/common/internal_types.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/types.h" @@ -37,7 +38,7 @@ class SendQueue { // Partial reliability - RFC3758 MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(); - TimeMs expires_at = TimeMs::InfiniteFuture(); + webrtc::Timestamp expires_at = webrtc::Timestamp::PlusInfinity(); // Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for // all other fragments. @@ -55,7 +56,8 @@ class SendQueue { // // `max_size` refers to how many payload bytes that may be produced, not // including any headers. - virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0; + virtual absl::optional<DataToSend> Produce(webrtc::Timestamp now, + size_t max_size) = 0; // Discards a partially sent message identified by the parameters // `stream_id` and `message_id`. The `message_id` comes from the returned diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc index c1d220aaa2..66c4457481 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc @@ -14,7 +14,6 @@ #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" -#include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" @@ -22,6 +21,7 @@ #include "net/dcsctp/tx/send_queue.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" +#include "rtc_base/strings/str_join.h" namespace dcsctp { @@ -31,7 +31,7 @@ void StreamScheduler::Stream::SetPriority(StreamPriority priority) { } absl::optional<SendQueue::DataToSend> StreamScheduler::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { // For non-interleaved streams, avoid rescheduling while still sending a // message as it needs to be sent in full. For interleaved messaging, @@ -127,7 +127,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( } absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce( - TimeMs now, + webrtc::Timestamp now, size_t max_size) { absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size); diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h index ce836a5826..9d76fc6f56 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h @@ -87,7 +87,7 @@ class StreamScheduler { // The parameter `max_size` specifies the maximum amount of actual payload // that may be returned. If these constraints prevents the stream from // sending some data, `absl::nullopt` should be returned. - virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now, + virtual absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, size_t max_size) = 0; // Returns the number of payload bytes that is scheduled to be sent in the @@ -132,7 +132,8 @@ class StreamScheduler { // Produces a message from this stream. This will only be called on streams // that have data. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, + size_t max_size); void MakeActive(size_t bytes_to_send_next); void ForceMarkInactive(); @@ -180,7 +181,8 @@ class StreamScheduler { // `now` and will be used to skip chunks with expired limited lifetime. The // parameter `max_size` specifies the maximum amount of actual payload that // may be returned. If no data can be produced, `absl::nullopt` is returned. - absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); + absl::optional<SendQueue::DataToSend> Produce(webrtc::Timestamp now, + size_t max_size); std::set<StreamID> ActiveStreamsForTesting() const; diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc index 4f5fb0fb84..42d0b3cd35 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler_test.cc @@ -19,9 +19,11 @@ namespace dcsctp { namespace { using ::testing::Return; using ::testing::StrictMock; +using ::webrtc::Timestamp; constexpr size_t kMtu = 1000; constexpr size_t kPayloadSize = 4; +constexpr Timestamp kNow = Timestamp::Zero(); MATCHER_P(HasDataWithMid, mid, "") { if (!arg.has_value()) { @@ -38,12 +40,12 @@ MATCHER_P(HasDataWithMid, mid, "") { return true; } -std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)> +std::function<absl::optional<SendQueue::DataToSend>(Timestamp, size_t)> CreateChunk(OutgoingMessageId message_id, StreamID sid, MID mid, size_t payload_size = kPayloadSize) { - return [sid, mid, payload_size, message_id](TimeMs now, size_t max_size) { + return [sid, mid, payload_size, message_id](Timestamp now, size_t max_size) { return SendQueue::DataToSend( message_id, Data(sid, SSN(0), mid, FSN(0), PPID(42), @@ -56,8 +58,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler, size_t packets_to_generate) { std::map<StreamID, size_t> packet_counts; for (size_t i = 0; i < packets_to_generate; ++i) { - absl::optional<SendQueue::DataToSend> data = - scheduler.Produce(TimeMs(0), kMtu); + absl::optional<SendQueue::DataToSend> data = scheduler.Produce(kNow, kMtu); if (data.has_value()) { ++packet_counts[data->data.stream_id]; } @@ -69,7 +70,7 @@ class MockStreamProducer : public StreamScheduler::StreamProducer { public: MOCK_METHOD(absl::optional<SendQueue::DataToSend>, Produce, - (TimeMs, size_t), + (Timestamp, size_t), (override)); MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override)); }; @@ -100,7 +101,7 @@ class TestStream { TEST(StreamSchedulerTest, HasNoActiveStreams) { StreamScheduler scheduler("", kMtu); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Stream properties can be set and retrieved @@ -132,8 +133,8 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2)); stream->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet. @@ -168,13 +169,13 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Switches between two streams after every packet, but keeps producing from the @@ -232,15 +233,15 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Deactivates a stream before it has finished producing all packets. @@ -259,12 +260,12 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // ... but the stream is made inactive before it can be produced. stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Resumes a paused stream - makes a stream active after inactivating it. @@ -287,14 +288,14 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2)); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); stream1->MakeInactive(); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Iterates between streams, where one is suddenly paused and later resumed. @@ -330,15 +331,15 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); stream1->MakeInactive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); stream1->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Verifies that packet counts are evenly distributed in round robin scheduling. @@ -427,18 +428,18 @@ TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { stream2->MaybeMakeActive(); // t = 30 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t = 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t = 70 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t = 90 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); // t = 140 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t = 210 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority. @@ -492,24 +493,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { stream3->MaybeMakeActive(); // t ~= 20 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 40 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 50 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 60 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 80 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 100 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 150 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 160 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 240 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Will do weighted fair queuing with three streams having different priority @@ -586,24 +587,24 @@ TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { stream3->MaybeMakeActive(); // t ~= 400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(300))); // t ~= 1400 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(301))); // t ~= 2500 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(200))); // t ~= 2800 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(302))); // t ~= 4000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(100))); // t ~= 5600 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(101))); // t ~= 6000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(201))); // t ~= 7000 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(202))); // t ~= 11200 - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { // A simple test with two streams of different priority, but sending packets @@ -723,11 +724,11 @@ TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } // Sending large messages with large MTU will not fragment messages and will @@ -756,9 +757,9 @@ TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { auto stream2 = scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); stream2->MaybeMakeActive(); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); - EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); - EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(kNow, kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(kNow, kMtu), absl::nullopt); } } // namespace |