diff options
Diffstat (limited to '')
25 files changed, 5109 insertions, 0 deletions
diff --git a/third_party/libwebrtc/webrtc/modules/pacing/BUILD.gn b/third_party/libwebrtc/webrtc/modules/pacing/BUILD.gn new file mode 100644 index 0000000000..2364432a63 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/BUILD.gn @@ -0,0 +1,90 @@ +# Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. +# +# Use of this source code is governed by a BSD-style license +# that can be found in the LICENSE file in the root of the source +# tree. An additional intellectual property rights grant can be found +# in the file PATENTS. All contributing project authors may +# be found in the AUTHORS file in the root of the source tree. + +import("../../webrtc.gni") + +rtc_static_library("pacing") { + sources = [ + "alr_detector.cc", + "alr_detector.h", + "bitrate_prober.cc", + "bitrate_prober.h", + "interval_budget.cc", + "interval_budget.h", + "paced_sender.cc", + "paced_sender.h", + "pacer.h", + "packet_queue.cc", + "packet_queue.h", + "packet_queue2.cc", + "packet_queue2.h", + "packet_router.cc", + "packet_router.h", + ] + + if (!build_with_chromium && is_clang) { + # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). + suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] + } + + deps = [ + "..:module_api", + "../../:webrtc_common", + "../../api:optional", + "../../logging:rtc_event_log_api", + "../../rtc_base:rtc_base_approved", + "../../system_wrappers", + "../remote_bitrate_estimator", + "../rtp_rtcp", + "../utility", + ] +} + +if (rtc_include_tests) { + rtc_source_set("pacing_unittests") { + testonly = true + + sources = [ + "alr_detector_unittest.cc", + "bitrate_prober_unittest.cc", + "interval_budget_unittest.cc", + "paced_sender_unittest.cc", + "packet_router_unittest.cc", + ] + deps = [ + ":pacing", + "../../rtc_base:rtc_base_approved", + "../../rtc_base:rtc_base_tests_utils", + "../../system_wrappers:system_wrappers", + "../../test:field_trial", + "../../test:test_support", + "../rtp_rtcp", + "../rtp_rtcp:mock_rtp_rtcp", + "//testing/gmock", + ] + + # TODO(jschuh): bugs.webrtc.org/1348: fix this warning. + configs += [ "//build/config/compiler:no_size_t_to_int_warning" ] + if (!build_with_chromium && is_clang) { + # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163). + suppressed_configs += [ "//build/config/clang:find_bad_constructs" ] + } + } + + rtc_source_set("mock_paced_sender") { + testonly = true + sources = [ + "mock/mock_paced_sender.h", + ] + deps = [ + ":pacing", + "../../system_wrappers", + "../../test:test_support", + ] + } +} diff --git a/third_party/libwebrtc/webrtc/modules/pacing/DEPS b/third_party/libwebrtc/webrtc/modules/pacing/DEPS new file mode 100644 index 0000000000..3088a75e68 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/DEPS @@ -0,0 +1,4 @@ +include_rules = [ + "+system_wrappers", + "+logging/rtc_event_log" +] diff --git a/third_party/libwebrtc/webrtc/modules/pacing/OWNERS b/third_party/libwebrtc/webrtc/modules/pacing/OWNERS new file mode 100644 index 0000000000..1528d7663f --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/OWNERS @@ -0,0 +1,9 @@ +stefan@webrtc.org +mflodman@webrtc.org +asapersson@webrtc.org +philipel@webrtc.org + +# These are for the common case of adding or renaming files. If you're doing +# structural changes, please get a review from a reviewer in this file. +per-file *.gn=* +per-file *.gni=* diff --git a/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.cc b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.cc new file mode 100644 index 0000000000..52fa2bc98e --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.cc @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/alr_detector.h" + +#include <string> + +#include "rtc_base/checks.h" +#include "rtc_base/format_macros.h" +#include "rtc_base/logging.h" +#include "rtc_base/timeutils.h" +#include "system_wrappers/include/field_trial.h" + +namespace webrtc { + +const char AlrDetector::kScreenshareProbingBweExperimentName[] = + "WebRTC-ProbingScreenshareBwe"; +const char AlrDetector::kStrictPacingAndProbingExperimentName[] = + "WebRTC-StrictPacingAndProbing"; +const char kDefaultProbingScreenshareBweSettings[] = "1.0,2875,80,40,-60,3"; + +AlrDetector::AlrDetector() + : bandwidth_usage_percent_(kDefaultAlrBandwidthUsagePercent), + alr_start_budget_level_percent_(kDefaultAlrStartBudgetLevelPercent), + alr_stop_budget_level_percent_(kDefaultAlrStopBudgetLevelPercent), + alr_budget_(0, true) { + RTC_CHECK( + field_trial::FindFullName(kStrictPacingAndProbingExperimentName) + .empty() || + field_trial::FindFullName(kScreenshareProbingBweExperimentName).empty()); + rtc::Optional<AlrExperimentSettings> experiment_settings = + ParseAlrSettingsFromFieldTrial(kScreenshareProbingBweExperimentName); + if (!experiment_settings) { + experiment_settings = + ParseAlrSettingsFromFieldTrial(kStrictPacingAndProbingExperimentName); + } + if (experiment_settings) { + alr_stop_budget_level_percent_ = + experiment_settings->alr_stop_budget_level_percent; + alr_start_budget_level_percent_ = + experiment_settings->alr_start_budget_level_percent; + bandwidth_usage_percent_ = experiment_settings->alr_bandwidth_usage_percent; + } +} + +AlrDetector::~AlrDetector() {} + +void AlrDetector::OnBytesSent(size_t bytes_sent, int64_t delta_time_ms) { + alr_budget_.UseBudget(bytes_sent); + alr_budget_.IncreaseBudget(delta_time_ms); + + if (alr_budget_.budget_level_percent() > alr_start_budget_level_percent_ && + !alr_started_time_ms_) { + alr_started_time_ms_.emplace(rtc::TimeMillis()); + } else if (alr_budget_.budget_level_percent() < + alr_stop_budget_level_percent_ && + alr_started_time_ms_) { + alr_started_time_ms_.reset(); + } +} + +void AlrDetector::SetEstimatedBitrate(int bitrate_bps) { + RTC_DCHECK(bitrate_bps); + const auto target_rate_kbps = int64_t{bitrate_bps} * + bandwidth_usage_percent_ / (1000 * 100); + alr_budget_.set_target_rate_kbps(rtc::dchecked_cast<int>(target_rate_kbps)); +} + +rtc::Optional<int64_t> AlrDetector::GetApplicationLimitedRegionStartTime() + const { + return alr_started_time_ms_; +} + +rtc::Optional<AlrDetector::AlrExperimentSettings> +AlrDetector::ParseAlrSettingsFromFieldTrial(const char* experiment_name) { + rtc::Optional<AlrExperimentSettings> ret; + std::string group_name = field_trial::FindFullName(experiment_name); + + const std::string kIgnoredSuffix = "_Dogfood"; + std::string::size_type suffix_pos = group_name.rfind(kIgnoredSuffix); + if (suffix_pos != std::string::npos && + suffix_pos == group_name.length() - kIgnoredSuffix.length()) { + group_name.resize(group_name.length() - kIgnoredSuffix.length()); + } + + if (experiment_name == kScreenshareProbingBweExperimentName) { + // This experiment is now default-on with fixed settings. + // TODO(sprang): Remove this kill-switch and clean up experiment code. + if (group_name != "Disabled") { + group_name = kDefaultProbingScreenshareBweSettings; + } + } + + if (group_name.empty()) + return ret; + + AlrExperimentSettings settings; + if (sscanf(group_name.c_str(), "%f,%" PRId64 ",%d,%d,%d,%d", + &settings.pacing_factor, &settings.max_paced_queue_time, + &settings.alr_bandwidth_usage_percent, + &settings.alr_start_budget_level_percent, + &settings.alr_stop_budget_level_percent, + &settings.group_id) == 6) { + ret.emplace(settings); + RTC_LOG(LS_INFO) << "Using ALR experiment settings: " + "pacing factor: " + << settings.pacing_factor << ", max pacer queue length: " + << settings.max_paced_queue_time + << ", ALR start bandwidth usage percent: " + << settings.alr_bandwidth_usage_percent + << ", ALR end budget level percent: " + << settings.alr_start_budget_level_percent + << ", ALR end budget level percent: " + << settings.alr_stop_budget_level_percent + << ", ALR experiment group ID: " << settings.group_id; + } else { + RTC_LOG(LS_INFO) << "Failed to parse ALR experiment: " << experiment_name; + } + + return ret; +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.h b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.h new file mode 100644 index 0000000000..1d4faf79ad --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.h @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_ALR_DETECTOR_H_ +#define MODULES_PACING_ALR_DETECTOR_H_ + +#include "api/optional.h" +#include "common_types.h" // NOLINT(build/include) +#include "modules/pacing/interval_budget.h" +#include "modules/pacing/paced_sender.h" +#include "rtc_base/rate_statistics.h" +#include "typedefs.h" // NOLINT(build/include) + +namespace webrtc { + +// Application limited region detector is a class that utilizes signals of +// elapsed time and bytes sent to estimate whether network traffic is +// currently limited by the application's ability to generate traffic. +// +// AlrDetector provides a signal that can be utilized to adjust +// estimate bandwidth. +// Note: This class is not thread-safe. +class AlrDetector { + public: + AlrDetector(); + ~AlrDetector(); + + void OnBytesSent(size_t bytes_sent, int64_t delta_time_ms); + + // Set current estimated bandwidth. + void SetEstimatedBitrate(int bitrate_bps); + + // Returns time in milliseconds when the current application-limited region + // started or empty result if the sender is currently not application-limited. + rtc::Optional<int64_t> GetApplicationLimitedRegionStartTime() const; + + struct AlrExperimentSettings { + float pacing_factor = PacedSender::kDefaultPaceMultiplier; + int64_t max_paced_queue_time = PacedSender::kMaxQueueLengthMs; + int alr_bandwidth_usage_percent = kDefaultAlrBandwidthUsagePercent; + int alr_start_budget_level_percent = kDefaultAlrStartBudgetLevelPercent; + int alr_stop_budget_level_percent = kDefaultAlrStopBudgetLevelPercent; + // Will be sent to the receive side for stats slicing. + // Can be 0..6, because it's sent as a 3 bits value and there's also + // reserved value to indicate absence of experiment. + int group_id = 0; + }; + static rtc::Optional<AlrExperimentSettings> ParseAlrSettingsFromFieldTrial( + const char* experiment_name); + + // Sent traffic percentage as a function of network capacity used to determine + // application-limited region. ALR region start when bandwidth usage drops + // below kAlrStartUsagePercent and ends when it raises above + // kAlrEndUsagePercent. NOTE: This is intentionally conservative at the moment + // until BW adjustments of application limited region is fine tuned. + static constexpr int kDefaultAlrBandwidthUsagePercent = 65; + static constexpr int kDefaultAlrStartBudgetLevelPercent = 80; + static constexpr int kDefaultAlrStopBudgetLevelPercent = 50; + static const char kScreenshareProbingBweExperimentName[]; + static const char kStrictPacingAndProbingExperimentName[]; + + void UpdateBudgetWithElapsedTime(int64_t delta_time_ms); + void UpdateBudgetWithBytesSent(size_t bytes_sent); + + private: + int bandwidth_usage_percent_; + int alr_start_budget_level_percent_; + int alr_stop_budget_level_percent_; + + IntervalBudget alr_budget_; + rtc::Optional<int64_t> alr_started_time_ms_; +}; + +} // namespace webrtc + +#endif // MODULES_PACING_ALR_DETECTOR_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/alr_detector_unittest.cc b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector_unittest.cc new file mode 100644 index 0000000000..506add506b --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/alr_detector_unittest.cc @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/alr_detector.h" + +#include "test/field_trial.h" +#include "test/gtest.h" + +namespace { + +constexpr int kEstimatedBitrateBps = 300000; + +} // namespace + +namespace webrtc { + +namespace { +class SimulateOutgoingTrafficIn { + public: + explicit SimulateOutgoingTrafficIn(AlrDetector* alr_detector) + : alr_detector_(alr_detector) { + RTC_CHECK(alr_detector_); + } + + SimulateOutgoingTrafficIn& ForTimeMs(int time_ms) { + interval_ms_ = rtc::Optional<int>(time_ms); + interval_ms_.emplace(time_ms); + ProduceTraffic(); + return *this; + } + + SimulateOutgoingTrafficIn& AtPercentOfEstimatedBitrate(int usage_percentage) { + usage_percentage_.emplace(usage_percentage); + ProduceTraffic(); + return *this; + } + + private: + void ProduceTraffic() { + if (!interval_ms_ || !usage_percentage_) + return; + const int kTimeStepMs = 10; + for (int t = 0; t < *interval_ms_; t += kTimeStepMs) { + alr_detector_->OnBytesSent(kEstimatedBitrateBps * *usage_percentage_ * + kTimeStepMs / (8 * 100 * 1000), + kTimeStepMs); + } + int remainder_ms = *interval_ms_ % kTimeStepMs; + if (remainder_ms > 0) { + alr_detector_->OnBytesSent(kEstimatedBitrateBps * *usage_percentage_ * + remainder_ms / (8 * 100 * 1000), + kTimeStepMs); + } + } + AlrDetector* const alr_detector_; + rtc::Optional<int> interval_ms_; + rtc::Optional<int> usage_percentage_; +}; +} // namespace + +class AlrDetectorTest : public testing::Test { + public: + void SetUp() override { + alr_detector_.SetEstimatedBitrate(kEstimatedBitrateBps); + } + + protected: + AlrDetector alr_detector_; +}; + +TEST_F(AlrDetectorTest, AlrDetection) { + // Start in non-ALR state. + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // Stay in non-ALR state when usage is close to 100%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(1000) + .AtPercentOfEstimatedBitrate(90); + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // Verify that we ALR starts when bitrate drops below 20%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(1500) + .AtPercentOfEstimatedBitrate(20); + EXPECT_TRUE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // Verify that ALR ends when usage is above 65%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(4000) + .AtPercentOfEstimatedBitrate(100); + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); +} + +TEST_F(AlrDetectorTest, ShortSpike) { + // Start in non-ALR state. + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // Verify that we ALR starts when bitrate drops below 20%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(1000) + .AtPercentOfEstimatedBitrate(20); + EXPECT_TRUE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // Verify that we stay in ALR region even after a short bitrate spike. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(100) + .AtPercentOfEstimatedBitrate(150); + EXPECT_TRUE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // ALR ends when usage is above 65%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(3000) + .AtPercentOfEstimatedBitrate(100); + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); +} + +TEST_F(AlrDetectorTest, BandwidthEstimateChanges) { + // Start in non-ALR state. + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // ALR starts when bitrate drops below 20%. + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(1000) + .AtPercentOfEstimatedBitrate(20); + EXPECT_TRUE(alr_detector_.GetApplicationLimitedRegionStartTime()); + + // When bandwidth estimate drops the detector should stay in ALR mode and quit + // it shortly afterwards as the sender continues sending the same amount of + // traffic. This is necessary to ensure that ProbeController can still react + // to the BWE drop by initiating a new probe. + alr_detector_.SetEstimatedBitrate(kEstimatedBitrateBps / 5); + EXPECT_TRUE(alr_detector_.GetApplicationLimitedRegionStartTime()); + SimulateOutgoingTrafficIn(&alr_detector_) + .ForTimeMs(1000) + .AtPercentOfEstimatedBitrate(50); + EXPECT_FALSE(alr_detector_.GetApplicationLimitedRegionStartTime()); +} + +TEST_F(AlrDetectorTest, ParseControlFieldTrial) { + webrtc::test::ScopedFieldTrials field_trial( + "WebRTC-ProbingScreenshareBwe/Control/"); + rtc::Optional<AlrDetector::AlrExperimentSettings> parsed_params = + AlrDetector::ParseAlrSettingsFromFieldTrial( + "WebRTC-ProbingScreenshareBwe"); + EXPECT_FALSE(static_cast<bool>(parsed_params)); +} + +TEST_F(AlrDetectorTest, ParseActiveFieldTrial) { + webrtc::test::ScopedFieldTrials field_trial( + "WebRTC-ProbingScreenshareBwe/1.1,2875,85,20,-20,1/"); + rtc::Optional<AlrDetector::AlrExperimentSettings> parsed_params = + AlrDetector::ParseAlrSettingsFromFieldTrial( + "WebRTC-ProbingScreenshareBwe"); + ASSERT_TRUE(static_cast<bool>(parsed_params)); + EXPECT_EQ(1.1f, parsed_params->pacing_factor); + EXPECT_EQ(2875, parsed_params->max_paced_queue_time); + EXPECT_EQ(85, parsed_params->alr_bandwidth_usage_percent); + EXPECT_EQ(20, parsed_params->alr_start_budget_level_percent); + EXPECT_EQ(-20, parsed_params->alr_stop_budget_level_percent); + EXPECT_EQ(1, parsed_params->group_id); +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.cc b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.cc new file mode 100644 index 0000000000..1dc77c5d60 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.cc @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/bitrate_prober.h" + +#include <algorithm> + +#include "logging/rtc_event_log/events/rtc_event_probe_cluster_created.h" +#include "logging/rtc_event_log/rtc_event_log.h" +#include "modules/pacing/paced_sender.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/ptr_util.h" + +namespace webrtc { + +namespace { + +// A minimum interval between probes to allow scheduling to be feasible. +constexpr int kMinProbeDeltaMs = 1; + +// The minimum number probing packets used. +constexpr int kMinProbePacketsSent = 5; + +// The minimum probing duration in ms. +constexpr int kMinProbeDurationMs = 15; + +// Maximum amount of time each probe can be delayed. Probe cluster is reset and +// retried from the start when this limit is reached. +constexpr int kMaxProbeDelayMs = 3; + +// Number of times probing is retried before the cluster is dropped. +constexpr int kMaxRetryAttempts = 3; + +// The min probe packet size is scaled with the bitrate we're probing at. +// This defines the max min probe packet size, meaning that on high bitrates +// we have a min probe packet size of 200 bytes. +constexpr size_t kMinProbePacketSize = 200; + +constexpr int64_t kProbeClusterTimeoutMs = 5000; + +} // namespace + +BitrateProber::BitrateProber() : BitrateProber(nullptr) {} + +BitrateProber::BitrateProber(RtcEventLog* event_log) + : probing_state_(ProbingState::kDisabled), + next_probe_time_ms_(-1), + next_cluster_id_(0), + event_log_(event_log) { + SetEnabled(true); +} + +void BitrateProber::SetEnabled(bool enable) { + if (enable) { + if (probing_state_ == ProbingState::kDisabled) { + probing_state_ = ProbingState::kInactive; + RTC_LOG(LS_INFO) << "Bandwidth probing enabled, set to inactive"; + } + } else { + probing_state_ = ProbingState::kDisabled; + RTC_LOG(LS_INFO) << "Bandwidth probing disabled"; + } +} + +bool BitrateProber::IsProbing() const { + return probing_state_ == ProbingState::kActive; +} + +void BitrateProber::OnIncomingPacket(size_t packet_size) { + // Don't initialize probing unless we have something large enough to start + // probing. + if (probing_state_ == ProbingState::kInactive && !clusters_.empty() && + packet_size >= + std::min<size_t>(RecommendedMinProbeSize(), kMinProbePacketSize)) { + // Send next probe right away. + next_probe_time_ms_ = -1; + probing_state_ = ProbingState::kActive; + } +} + +void BitrateProber::CreateProbeCluster(int bitrate_bps, int64_t now_ms) { + RTC_DCHECK(probing_state_ != ProbingState::kDisabled); + RTC_DCHECK_GT(bitrate_bps, 0); + while (!clusters_.empty() && + now_ms - clusters_.front().time_created_ms > kProbeClusterTimeoutMs) { + clusters_.pop(); + } + + ProbeCluster cluster; + cluster.time_created_ms = now_ms; + cluster.pace_info.probe_cluster_min_probes = kMinProbePacketsSent; + cluster.pace_info.probe_cluster_min_bytes = + bitrate_bps * kMinProbeDurationMs / 8000; + cluster.pace_info.send_bitrate_bps = bitrate_bps; + cluster.pace_info.probe_cluster_id = next_cluster_id_++; + clusters_.push(cluster); + if (event_log_) + event_log_->Log(rtc::MakeUnique<RtcEventProbeClusterCreated>( + cluster.pace_info.probe_cluster_id, cluster.pace_info.send_bitrate_bps, + cluster.pace_info.probe_cluster_min_probes, + cluster.pace_info.probe_cluster_min_bytes)); + + RTC_LOG(LS_INFO) << "Probe cluster (bitrate:min bytes:min packets): (" + << cluster.pace_info.send_bitrate_bps << ":" + << cluster.pace_info.probe_cluster_min_bytes << ":" + << cluster.pace_info.probe_cluster_min_probes << ")"; + // If we are already probing, continue to do so. Otherwise set it to + // kInactive and wait for OnIncomingPacket to start the probing. + if (probing_state_ != ProbingState::kActive) + probing_state_ = ProbingState::kInactive; +} + +void BitrateProber::ResetState(int64_t now_ms) { + RTC_DCHECK(probing_state_ == ProbingState::kActive); + + // Recreate all probing clusters. + std::queue<ProbeCluster> clusters; + clusters.swap(clusters_); + while (!clusters.empty()) { + if (clusters.front().retries < kMaxRetryAttempts) { + CreateProbeCluster(clusters.front().pace_info.send_bitrate_bps, now_ms); + clusters_.back().retries = clusters.front().retries + 1; + } + clusters.pop(); + } + + probing_state_ = ProbingState::kInactive; +} + +int BitrateProber::TimeUntilNextProbe(int64_t now_ms) { + // Probing is not active or probing is already complete. + if (probing_state_ != ProbingState::kActive || clusters_.empty()) + return -1; + + int time_until_probe_ms = 0; + if (next_probe_time_ms_ >= 0) { + time_until_probe_ms = next_probe_time_ms_ - now_ms; + if (time_until_probe_ms < -kMaxProbeDelayMs) { + ResetState(now_ms); + return -1; + } + } + + return std::max(time_until_probe_ms, 0); +} + +PacedPacketInfo BitrateProber::CurrentCluster() const { + RTC_DCHECK(!clusters_.empty()); + RTC_DCHECK(probing_state_ == ProbingState::kActive); + return clusters_.front().pace_info; +} + +// Probe size is recommended based on the probe bitrate required. We choose +// a minimum of twice |kMinProbeDeltaMs| interval to allow scheduling to be +// feasible. +size_t BitrateProber::RecommendedMinProbeSize() const { + RTC_DCHECK(!clusters_.empty()); + return clusters_.front().pace_info.send_bitrate_bps * 2 * kMinProbeDeltaMs / + (8 * 1000); +} + +void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) { + RTC_DCHECK(probing_state_ == ProbingState::kActive); + RTC_DCHECK_GT(bytes, 0); + + if (!clusters_.empty()) { + ProbeCluster* cluster = &clusters_.front(); + if (cluster->sent_probes == 0) { + RTC_DCHECK_EQ(cluster->time_started_ms, -1); + cluster->time_started_ms = now_ms; + } + cluster->sent_bytes += static_cast<int>(bytes); + cluster->sent_probes += 1; + next_probe_time_ms_ = GetNextProbeTime(*cluster); + if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes && + cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) { + clusters_.pop(); + } + if (clusters_.empty()) + probing_state_ = ProbingState::kSuspended; + } +} + +int64_t BitrateProber::GetNextProbeTime(const ProbeCluster& cluster) { + RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0); + RTC_CHECK_GE(cluster.time_started_ms, 0); + + // Compute the time delta from the cluster start to ensure probe bitrate stays + // close to the target bitrate. Result is in milliseconds. + int64_t delta_ms = + (8000ll * cluster.sent_bytes + cluster.pace_info.send_bitrate_bps / 2) / + cluster.pace_info.send_bitrate_bps; + return cluster.time_started_ms + delta_ms; +} + + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.h b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.h new file mode 100644 index 0000000000..0ec7720ca3 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_BITRATE_PROBER_H_ +#define MODULES_PACING_BITRATE_PROBER_H_ + +#include <queue> + +#include "modules/include/module_common_types.h" +#include "rtc_base/basictypes.h" +#include "typedefs.h" // NOLINT(build/include) + +namespace webrtc { +class RtcEventLog; + +// Note that this class isn't thread-safe by itself and therefore relies +// on being protected by the caller. +class BitrateProber { + public: + BitrateProber(); + explicit BitrateProber(RtcEventLog* event_log); + + void SetEnabled(bool enable); + + // Returns true if the prober is in a probing session, i.e., it currently + // wants packets to be sent out according to the time returned by + // TimeUntilNextProbe(). + bool IsProbing() const; + + // Initializes a new probing session if the prober is allowed to probe. Does + // not initialize the prober unless the packet size is large enough to probe + // with. + void OnIncomingPacket(size_t packet_size); + + // Create a cluster used to probe for |bitrate_bps| with |num_probes| number + // of probes. + void CreateProbeCluster(int bitrate_bps, int64_t now_ms); + + // Returns the number of milliseconds until the next probe should be sent to + // get accurate probing. + int TimeUntilNextProbe(int64_t now_ms); + + // Information about the current probing cluster. + PacedPacketInfo CurrentCluster() const; + + // Returns the minimum number of bytes that the prober recommends for + // the next probe. + size_t RecommendedMinProbeSize() const; + + // Called to report to the prober that a probe has been sent. In case of + // multiple packets per probe, this call would be made at the end of sending + // the last packet in probe. |probe_size| is the total size of all packets + // in probe. + void ProbeSent(int64_t now_ms, size_t probe_size); + + private: + enum class ProbingState { + // Probing will not be triggered in this state at all times. + kDisabled, + // Probing is enabled and ready to trigger on the first packet arrival. + kInactive, + // Probe cluster is filled with the set of data rates to be probed and + // probes are being sent. + kActive, + // Probing is enabled, but currently suspended until an explicit trigger + // to start probing again. + kSuspended, + }; + + // A probe cluster consists of a set of probes. Each probe in turn can be + // divided into a number of packets to accommodate the MTU on the network. + struct ProbeCluster { + PacedPacketInfo pace_info; + + int sent_probes = 0; + int sent_bytes = 0; + int64_t time_created_ms = -1; + int64_t time_started_ms = -1; + int retries = 0; + }; + + // Resets the state of the prober and clears any cluster/timing data tracked. + void ResetState(int64_t now_ms); + + int64_t GetNextProbeTime(const ProbeCluster& cluster); + + ProbingState probing_state_; + + // Probe bitrate per packet. These are used to compute the delta relative to + // the previous probe packet based on the size and time when that packet was + // sent. + std::queue<ProbeCluster> clusters_; + + // Time the next probe should be sent when in kActive state. + int64_t next_probe_time_ms_; + + int next_cluster_id_; + RtcEventLog* const event_log_; +}; + +} // namespace webrtc + +#endif // MODULES_PACING_BITRATE_PROBER_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober_unittest.cc b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober_unittest.cc new file mode 100644 index 0000000000..fda4adb234 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober_unittest.cc @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <limits> + +#include "modules/pacing/bitrate_prober.h" +#include "test/gtest.h" + +namespace webrtc { + +TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) { + BitrateProber prober; + EXPECT_FALSE(prober.IsProbing()); + int64_t now_ms = 0; + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + + const int kTestBitrate1 = 900000; + const int kTestBitrate2 = 1800000; + const int kClusterSize = 5; + const int kProbeSize = 1000; + const int kMinProbeDurationMs = 15; + + prober.CreateProbeCluster(kTestBitrate1, now_ms); + prober.CreateProbeCluster(kTestBitrate2, now_ms); + EXPECT_FALSE(prober.IsProbing()); + + prober.OnIncomingPacket(kProbeSize); + EXPECT_TRUE(prober.IsProbing()); + EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id); + + // First packet should probe as soon as possible. + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + + for (int i = 0; i < kClusterSize; ++i) { + now_ms += prober.TimeUntilNextProbe(now_ms); + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id); + prober.ProbeSent(now_ms, kProbeSize); + } + + EXPECT_GE(now_ms, kMinProbeDurationMs); + // Verify that the actual bitrate is withing 10% of the target. + double bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / now_ms; + EXPECT_GT(bitrate, kTestBitrate1 * 0.9); + EXPECT_LT(bitrate, kTestBitrate1 * 1.1); + + now_ms += prober.TimeUntilNextProbe(now_ms); + int64_t probe2_started = now_ms; + + for (int i = 0; i < kClusterSize; ++i) { + now_ms += prober.TimeUntilNextProbe(now_ms); + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + EXPECT_EQ(1, prober.CurrentCluster().probe_cluster_id); + prober.ProbeSent(now_ms, kProbeSize); + } + + // Verify that the actual bitrate is withing 10% of the target. + int duration = now_ms - probe2_started; + EXPECT_GE(duration, kMinProbeDurationMs); + bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / duration; + EXPECT_GT(bitrate, kTestBitrate2 * 0.9); + EXPECT_LT(bitrate, kTestBitrate2 * 1.1); + + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + EXPECT_FALSE(prober.IsProbing()); +} + +TEST(BitrateProberTest, DoesntProbeWithoutRecentPackets) { + BitrateProber prober; + EXPECT_FALSE(prober.IsProbing()); + int64_t now_ms = 0; + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + + prober.CreateProbeCluster(900000, now_ms); + EXPECT_FALSE(prober.IsProbing()); + + prober.OnIncomingPacket(1000); + EXPECT_TRUE(prober.IsProbing()); + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + prober.ProbeSent(now_ms, 1000); + // Let time pass, no large enough packets put into prober. + now_ms += 6000; + EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + // Insert a large-enough packet after downtime while probing should reset to + // perform a new probe since the requested one didn't finish. + prober.OnIncomingPacket(1000); + EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + prober.ProbeSent(now_ms, 1000); + // Next packet should be part of new probe and be sent with non-zero delay. + prober.OnIncomingPacket(1000); + EXPECT_GT(prober.TimeUntilNextProbe(now_ms), 0); +} + +TEST(BitrateProberTest, DoesntInitializeProbingForSmallPackets) { + BitrateProber prober; + prober.SetEnabled(true); + EXPECT_FALSE(prober.IsProbing()); + + prober.OnIncomingPacket(100); + EXPECT_FALSE(prober.IsProbing()); +} + +TEST(BitrateProberTest, VerifyProbeSizeOnHighBitrate) { + BitrateProber prober; + constexpr unsigned kHighBitrateBps = 10000000; // 10 Mbps + + prober.CreateProbeCluster(kHighBitrateBps, 0); + // Probe size should ensure a minimum of 1 ms interval. + EXPECT_GT(prober.RecommendedMinProbeSize(), kHighBitrateBps / 8000); +} + +TEST(BitrateProberTest, MinumumNumberOfProbingPackets) { + BitrateProber prober; + // Even when probing at a low bitrate we expect a minimum number + // of packets to be sent. + constexpr int kBitrateBps = 100000; // 100 kbps + constexpr int kPacketSizeBytes = 1000; + + prober.CreateProbeCluster(kBitrateBps, 0); + prober.OnIncomingPacket(kPacketSizeBytes); + for (int i = 0; i < 5; ++i) { + EXPECT_TRUE(prober.IsProbing()); + prober.ProbeSent(0, kPacketSizeBytes); + } + + EXPECT_FALSE(prober.IsProbing()); +} + +TEST(BitrateProberTest, ScaleBytesUsedForProbing) { + BitrateProber prober; + constexpr int kBitrateBps = 10000000; // 10 Mbps + constexpr int kPacketSizeBytes = 1000; + constexpr int kExpectedBytesSent = kBitrateBps * 15 / 8000; + + prober.CreateProbeCluster(kBitrateBps, 0); + prober.OnIncomingPacket(kPacketSizeBytes); + int bytes_sent = 0; + while (bytes_sent < kExpectedBytesSent) { + ASSERT_TRUE(prober.IsProbing()); + prober.ProbeSent(0, kPacketSizeBytes); + bytes_sent += kPacketSizeBytes; + } + + EXPECT_FALSE(prober.IsProbing()); +} + +TEST(BitrateProberTest, ProbeClusterTimeout) { + BitrateProber prober; + constexpr int kBitrateBps = 300000; // 300 kbps + constexpr int kSmallPacketSize = 20; + // Expecting two probe clusters of 5 packets each. + constexpr int kExpectedBytesSent = 20 * 2 * 5; + constexpr int64_t kTimeoutMs = 5000; + + int64_t now_ms = 0; + prober.CreateProbeCluster(kBitrateBps, now_ms); + prober.OnIncomingPacket(kSmallPacketSize); + EXPECT_FALSE(prober.IsProbing()); + now_ms += kTimeoutMs; + prober.CreateProbeCluster(kBitrateBps / 10, now_ms); + prober.OnIncomingPacket(kSmallPacketSize); + EXPECT_FALSE(prober.IsProbing()); + now_ms += 1; + prober.CreateProbeCluster(kBitrateBps / 10, now_ms); + prober.OnIncomingPacket(kSmallPacketSize); + EXPECT_TRUE(prober.IsProbing()); + int bytes_sent = 0; + while (bytes_sent < kExpectedBytesSent) { + ASSERT_TRUE(prober.IsProbing()); + prober.ProbeSent(0, kSmallPacketSize); + bytes_sent += kSmallPacketSize; + } + + EXPECT_FALSE(prober.IsProbing()); +} +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.cc b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.cc new file mode 100644 index 0000000000..b63bc37149 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.cc @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/interval_budget.h" + +#include <algorithm> + +namespace webrtc { +namespace { +constexpr int kWindowMs = 500; +constexpr int kDeltaTimeMs = 2000; +} + +IntervalBudget::IntervalBudget(int initial_target_rate_kbps) + : IntervalBudget(initial_target_rate_kbps, false) {} + +IntervalBudget::IntervalBudget(int initial_target_rate_kbps, + bool can_build_up_underuse) + : bytes_remaining_(0), can_build_up_underuse_(can_build_up_underuse) { + set_target_rate_kbps(initial_target_rate_kbps); +} + +void IntervalBudget::set_target_rate_kbps(int target_rate_kbps) { + target_rate_kbps_ = target_rate_kbps; + max_bytes_in_budget_ = (kWindowMs * target_rate_kbps_) / 8; + bytes_remaining_ = std::min(std::max(-max_bytes_in_budget_, bytes_remaining_), + max_bytes_in_budget_); +} + +void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) { + RTC_DCHECK_LT(delta_time_ms, kDeltaTimeMs); + int bytes = target_rate_kbps_ * delta_time_ms / 8; + if (bytes_remaining_ < 0 || can_build_up_underuse_) { + // We overused last interval, compensate this interval. + bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_); + } else { + // If we underused last interval we can't use it this interval. + bytes_remaining_ = std::min(bytes, max_bytes_in_budget_); + } +} + +void IntervalBudget::UseBudget(size_t bytes) { + bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes), + -max_bytes_in_budget_); +} + +size_t IntervalBudget::bytes_remaining() const { + return static_cast<size_t>(std::max(0, bytes_remaining_)); +} + +int IntervalBudget::budget_level_percent() const { + return bytes_remaining_ * 100 / max_bytes_in_budget_; +} + +int IntervalBudget::target_rate_kbps() const { + return target_rate_kbps_; +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.h b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.h new file mode 100644 index 0000000000..880fe784ba --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_INTERVAL_BUDGET_H_ +#define MODULES_PACING_INTERVAL_BUDGET_H_ + +#include "common_types.h" // NOLINT(build/include) + +namespace webrtc { + +// TODO(tschumim): Reflector IntervalBudget so that we can set a under- and +// over-use budget in ms. +class IntervalBudget { + public: + explicit IntervalBudget(int initial_target_rate_kbps); + IntervalBudget(int initial_target_rate_kbps, bool can_build_up_underuse); + void set_target_rate_kbps(int target_rate_kbps); + + // TODO(tschumim): Unify IncreaseBudget and UseBudget to one function. + void IncreaseBudget(int64_t delta_time_ms); + void UseBudget(size_t bytes); + + size_t bytes_remaining() const; + int budget_level_percent() const; + int target_rate_kbps() const; + + private: + int target_rate_kbps_; + int max_bytes_in_budget_; + int bytes_remaining_; + bool can_build_up_underuse_; +}; + +} // namespace webrtc + +#endif // MODULES_PACING_INTERVAL_BUDGET_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/interval_budget_unittest.cc b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget_unittest.cc new file mode 100644 index 0000000000..cf02cb6af2 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/interval_budget_unittest.cc @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/interval_budget.h" + +#include "test/gtest.h" + +namespace webrtc { + +namespace { +constexpr int kWindowMs = 500; +constexpr int kBitrateKbps = 100; +constexpr bool kCanBuildUpUnderuse = true; +constexpr bool kCanNotBuildUpUnderuse = false; +size_t TimeToBytes(int bitrate_kbps, int time_ms) { + return static_cast<size_t>(bitrate_kbps * time_ms / 8); +} +} // namespace + +TEST(IntervalBudgetTest, InitailState) { + IntervalBudget interval_budget(kBitrateKbps); + EXPECT_EQ(interval_budget.budget_level_percent(), 0); + EXPECT_EQ(interval_budget.bytes_remaining(), 0u); +} + +TEST(IntervalBudgetTest, Underuse) { + IntervalBudget interval_budget(kBitrateKbps); + int delta_time_ms = 50; + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), kWindowMs / delta_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, delta_time_ms)); +} + +TEST(IntervalBudgetTest, DontUnderuseMoreThanMaxWindow) { + IntervalBudget interval_budget(kBitrateKbps); + int delta_time_ms = 1000; + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), 100); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, kWindowMs)); +} + +TEST(IntervalBudgetTest, DontUnderuseMoreThanMaxWindowWhenChangeBitrate) { + IntervalBudget interval_budget(kBitrateKbps); + int delta_time_ms = kWindowMs / 2; + interval_budget.IncreaseBudget(delta_time_ms); + interval_budget.set_target_rate_kbps(kBitrateKbps / 10); + EXPECT_EQ(interval_budget.budget_level_percent(), 100); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps / 10, kWindowMs)); +} + +TEST(IntervalBudgetTest, BalanceChangeOnBitrateChange) { + IntervalBudget interval_budget(kBitrateKbps); + int delta_time_ms = kWindowMs; + interval_budget.IncreaseBudget(delta_time_ms); + interval_budget.set_target_rate_kbps(kBitrateKbps * 2); + EXPECT_EQ(interval_budget.budget_level_percent(), 50); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, kWindowMs)); +} + +TEST(IntervalBudgetTest, Overuse) { + IntervalBudget interval_budget(kBitrateKbps); + int overuse_time_ms = 50; + int used_bytes = TimeToBytes(kBitrateKbps, overuse_time_ms); + interval_budget.UseBudget(used_bytes); + EXPECT_EQ(interval_budget.budget_level_percent(), + -kWindowMs / overuse_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), 0u); +} + +TEST(IntervalBudgetTest, DontOveruseMoreThanMaxWindow) { + IntervalBudget interval_budget(kBitrateKbps); + int overuse_time_ms = 1000; + int used_bytes = TimeToBytes(kBitrateKbps, overuse_time_ms); + interval_budget.UseBudget(used_bytes); + EXPECT_EQ(interval_budget.budget_level_percent(), -100); + EXPECT_EQ(interval_budget.bytes_remaining(), 0u); +} + +TEST(IntervalBudgetTest, CanBuildUpUnderuseWhenConfigured) { + IntervalBudget interval_budget(kBitrateKbps, kCanBuildUpUnderuse); + int delta_time_ms = 50; + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), kWindowMs / delta_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, delta_time_ms)); + + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), + 2 * kWindowMs / delta_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, 2 * delta_time_ms)); +} + +TEST(IntervalBudgetTest, CanNotBuildUpUnderuseWhenConfigured) { + IntervalBudget interval_budget(kBitrateKbps, kCanNotBuildUpUnderuse); + int delta_time_ms = 50; + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), kWindowMs / delta_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, delta_time_ms)); + + interval_budget.IncreaseBudget(delta_time_ms); + EXPECT_EQ(interval_budget.budget_level_percent(), kWindowMs / delta_time_ms); + EXPECT_EQ(interval_budget.bytes_remaining(), + TimeToBytes(kBitrateKbps, delta_time_ms)); +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/mock/mock_paced_sender.h b/third_party/libwebrtc/webrtc/modules/pacing/mock/mock_paced_sender.h new file mode 100644 index 0000000000..3366aa85ed --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/mock/mock_paced_sender.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_MOCK_MOCK_PACED_SENDER_H_ +#define MODULES_PACING_MOCK_MOCK_PACED_SENDER_H_ + +#include <vector> + +#include "modules/pacing/paced_sender.h" +#include "system_wrappers/include/clock.h" +#include "test/gmock.h" + +namespace webrtc { + +class MockPacedSender : public PacedSender { + public: + MockPacedSender() + : PacedSender(Clock::GetRealTimeClock(), nullptr, nullptr) {} + MOCK_METHOD6(SendPacket, bool(Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission)); + MOCK_METHOD1(CreateProbeCluster, void(int)); + MOCK_METHOD1(SetEstimatedBitrate, void(uint32_t)); + MOCK_CONST_METHOD0(QueueInMs, int64_t()); + MOCK_CONST_METHOD0(QueueInPackets, int()); + MOCK_CONST_METHOD0(ExpectedQueueTimeMs, int64_t()); + MOCK_CONST_METHOD0(GetApplicationLimitedRegionStartTime, + rtc::Optional<int64_t>()); + MOCK_METHOD0(Process, void()); +}; + +} // namespace webrtc + +#endif // MODULES_PACING_MOCK_MOCK_PACED_SENDER_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.cc b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.cc new file mode 100644 index 0000000000..f3b63debe2 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.cc @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/paced_sender.h" + +#include <algorithm> +#include <map> +#include <queue> +#include <set> +#include <vector> +#include <utility> + +#include "modules/include/module_common_types.h" +#include "modules/pacing/alr_detector.h" +#include "modules/pacing/bitrate_prober.h" +#include "modules/pacing/interval_budget.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "rtc_base/ptr_util.h" +#include "system_wrappers/include/clock.h" +#include "system_wrappers/include/field_trial.h" + +namespace { +// Time limit in milliseconds between packet bursts. +const int64_t kMinPacketLimitMs = 5; +const int64_t kPausedPacketIntervalMs = 500; + +// Upper cap on process interval, in case process has not been called in a long +// time. +const int64_t kMaxIntervalTimeMs = 30; + +} // namespace + +namespace webrtc { + +const int64_t PacedSender::kMaxQueueLengthMs = 2000; +const float PacedSender::kDefaultPaceMultiplier = 2.5f; + +PacedSender::PacedSender(const Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log) : + PacedSender(clock, packet_sender, event_log, + webrtc::field_trial::IsEnabled("WebRTC-RoundRobinPacing") + ? rtc::MakeUnique<PacketQueue2>(clock) + : rtc::MakeUnique<PacketQueue>(clock)) {} + +PacedSender::PacedSender(const Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log, + std::unique_ptr<PacketQueue> packets) + : clock_(clock), + packet_sender_(packet_sender), + alr_detector_(rtc::MakeUnique<AlrDetector>()), + paused_(false), + media_budget_(rtc::MakeUnique<IntervalBudget>(0)), + padding_budget_(rtc::MakeUnique<IntervalBudget>(0)), + prober_(rtc::MakeUnique<BitrateProber>(event_log)), + probing_send_failure_(false), + estimated_bitrate_bps_(0), + min_send_bitrate_kbps_(0u), + max_padding_bitrate_kbps_(0u), + pacing_bitrate_kbps_(0), + time_last_update_us_(clock->TimeInMicroseconds()), + first_sent_packet_ms_(-1), + packets_(std::move(packets)), + packet_counter_(0), + pacing_factor_(kDefaultPaceMultiplier), + queue_time_limit(kMaxQueueLengthMs), + account_for_audio_(false) { + UpdateBudgetWithElapsedTime(kMinPacketLimitMs); +} + +PacedSender::~PacedSender() {} + +void PacedSender::CreateProbeCluster(int bitrate_bps) { + rtc::CritScope cs(&critsect_); + prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); +} + +void PacedSender::Pause() { + { + rtc::CritScope cs(&critsect_); + if (!paused_) + RTC_LOG(LS_INFO) << "PacedSender paused."; + paused_ = true; + packets_->SetPauseState(true, clock_->TimeInMilliseconds()); + } + // Tell the process thread to call our TimeUntilNextProcess() method to get + // a new (longer) estimate for when to call Process(). + if (process_thread_) + process_thread_->WakeUp(this); +} + +void PacedSender::Resume() { + { + rtc::CritScope cs(&critsect_); + if (paused_) + RTC_LOG(LS_INFO) << "PacedSender resumed."; + paused_ = false; + packets_->SetPauseState(false, clock_->TimeInMilliseconds()); + } + // Tell the process thread to call our TimeUntilNextProcess() method to + // refresh the estimate for when to call Process(). + if (process_thread_) + process_thread_->WakeUp(this); +} + +void PacedSender::SetProbingEnabled(bool enabled) { + rtc::CritScope cs(&critsect_); + RTC_CHECK_EQ(0, packet_counter_); + prober_->SetEnabled(enabled); +} + +void PacedSender::SetEstimatedBitrate(uint32_t bitrate_bps) { + if (bitrate_bps == 0) + RTC_LOG(LS_ERROR) << "PacedSender is not designed to handle 0 bitrate."; + rtc::CritScope cs(&critsect_); + estimated_bitrate_bps_ = bitrate_bps; + padding_budget_->set_target_rate_kbps( + std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_)); + pacing_bitrate_kbps_ = + std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) * + pacing_factor_; + alr_detector_->SetEstimatedBitrate(bitrate_bps); +} + +void PacedSender::SetSendBitrateLimits(int min_send_bitrate_bps, + int padding_bitrate) { + rtc::CritScope cs(&critsect_); + min_send_bitrate_kbps_ = min_send_bitrate_bps / 1000; + pacing_bitrate_kbps_ = + std::max(min_send_bitrate_kbps_, estimated_bitrate_bps_ / 1000) * + pacing_factor_; + max_padding_bitrate_kbps_ = padding_bitrate / 1000; + padding_budget_->set_target_rate_kbps( + std::min(estimated_bitrate_bps_ / 1000, max_padding_bitrate_kbps_)); +} + +void PacedSender::InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) { + rtc::CritScope cs(&critsect_); + RTC_DCHECK(estimated_bitrate_bps_ > 0) + << "SetEstimatedBitrate must be called before InsertPacket."; + + int64_t now_ms = clock_->TimeInMilliseconds(); + prober_->OnIncomingPacket(bytes); + + if (capture_time_ms < 0) + capture_time_ms = now_ms; + + packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number, + capture_time_ms, now_ms, bytes, + retransmission, packet_counter_++)); +} + +void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { + rtc::CritScope cs(&critsect_); + account_for_audio_ = account_for_audio; +} + +int64_t PacedSender::ExpectedQueueTimeMs() const { + rtc::CritScope cs(&critsect_); + RTC_DCHECK_GT(pacing_bitrate_kbps_, 0); + return static_cast<int64_t>(packets_->SizeInBytes() * 8 / + pacing_bitrate_kbps_); +} + +rtc::Optional<int64_t> PacedSender::GetApplicationLimitedRegionStartTime() + const { + rtc::CritScope cs(&critsect_); + return alr_detector_->GetApplicationLimitedRegionStartTime(); +} + +size_t PacedSender::QueueSizePackets() const { + rtc::CritScope cs(&critsect_); + return packets_->SizeInPackets(); +} + +int64_t PacedSender::FirstSentPacketTimeMs() const { + rtc::CritScope cs(&critsect_); + return first_sent_packet_ms_; +} + +int64_t PacedSender::QueueInMs() const { + rtc::CritScope cs(&critsect_); + + int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); + if (oldest_packet == 0) + return 0; + + return clock_->TimeInMilliseconds() - oldest_packet; +} + +int64_t PacedSender::TimeUntilNextProcess() { + rtc::CritScope cs(&critsect_); + int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; + int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; + // When paused we wake up every 500 ms to send a padding packet to ensure + // we won't get stuck in the paused state due to no feedback being received. + if (paused_) + return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0); + + if (prober_->IsProbing()) { + int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); + if (ret > 0 || (ret == 0 && !probing_send_failure_)) + return ret; + } + return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); +} + +void PacedSender::Process() { + int64_t now_us = clock_->TimeInMicroseconds(); + rtc::CritScope cs(&critsect_); + int64_t elapsed_time_ms = std::min( + kMaxIntervalTimeMs, (now_us - time_last_update_us_ + 500) / 1000); + int target_bitrate_kbps = pacing_bitrate_kbps_; + + if (paused_) { + PacedPacketInfo pacing_info; + time_last_update_us_ = now_us; + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. + if (packet_counter_ == 0) + return; + size_t bytes_sent = SendPadding(1, pacing_info); + alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); + return; + } + + if (elapsed_time_ms > 0) { + size_t queue_size_bytes = packets_->SizeInBytes(); + if (queue_size_bytes > 0) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); + int64_t avg_time_left_ms = std::max<int64_t>( + 1, queue_time_limit - packets_->AverageQueueTimeMs()); + int min_bitrate_needed_kbps = + static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); + if (min_bitrate_needed_kbps > target_bitrate_kbps) + target_bitrate_kbps = min_bitrate_needed_kbps; + } + + media_budget_->set_target_rate_kbps(target_bitrate_kbps); + UpdateBudgetWithElapsedTime(elapsed_time_ms); + } + + time_last_update_us_ = now_us; + + bool is_probing = prober_->IsProbing(); + PacedPacketInfo pacing_info; + size_t bytes_sent = 0; + size_t recommended_probe_size = 0; + if (is_probing) { + pacing_info = prober_->CurrentCluster(); + recommended_probe_size = prober_->RecommendedMinProbeSize(); + } + // We need to check paused_ here because the critical section protecting + // it is released during the call to SendPacket. This has been fixed in + // a similar way upstream, so these changes can be dropped the next time + // we update. + while (!paused_ && !packets_->Empty()) { + // Since we need to release the lock in order to send, we first pop the + // element from the priority queue but keep it in storage, so that we can + // reinsert it if send fails. + const PacketQueue::Packet& packet = packets_->BeginPop(); + + if (SendPacket(packet, pacing_info)) { + // Send succeeded, remove it from the queue. + if (first_sent_packet_ms_ == -1) + first_sent_packet_ms_ = clock_->TimeInMilliseconds(); + bytes_sent += packet.bytes; + packets_->FinalizePop(packet); + if (is_probing && bytes_sent > recommended_probe_size) + break; + } else { + // Send failed, put it back into the queue. + packets_->CancelPop(packet); + break; + } + } + + if (packets_->Empty()) { + // We can not send padding unless a normal packet has first been sent. If we + // do, timestamps get messed up. + if (packet_counter_ > 0) { + int padding_needed = + static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) + : padding_budget_->bytes_remaining()); + if (padding_needed > 0) + bytes_sent += SendPadding(padding_needed, pacing_info); + } + } + if (is_probing) { + probing_send_failure_ = bytes_sent == 0; + if (!probing_send_failure_) + prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent); + } + alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); +} + +void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { + RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread; + process_thread_ = process_thread; +} + +bool PacedSender::SendPacket(const PacketQueue::Packet& packet, + const PacedPacketInfo& pacing_info) { + RTC_DCHECK(!paused_); + if (media_budget_->bytes_remaining() == 0 && + pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) { + return false; + } + + critsect_.Leave(); + const bool success = packet_sender_->TimeToSendPacket( + packet.ssrc, packet.sequence_number, packet.capture_time_ms, + packet.retransmission, pacing_info); + critsect_.Enter(); + + if (success) { + if (packet.priority != kHighPriority || account_for_audio_) { + // Update media bytes sent. + // TODO(eladalon): TimeToSendPacket() can also return |true| in some + // situations where nothing actually ended up being sent to the network, + // and we probably don't want to update the budget in such cases. + // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 + UpdateBudgetWithBytesSent(packet.bytes); + } + } + + return success; +} + +size_t PacedSender::SendPadding(size_t padding_needed, + const PacedPacketInfo& pacing_info) { + RTC_DCHECK_GT(packet_counter_, 0); + critsect_.Leave(); + size_t bytes_sent = + packet_sender_->TimeToSendPadding(padding_needed, pacing_info); + critsect_.Enter(); + + if (bytes_sent > 0) { + UpdateBudgetWithBytesSent(bytes_sent); + } + return bytes_sent; +} + +void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { + media_budget_->IncreaseBudget(delta_time_ms); + padding_budget_->IncreaseBudget(delta_time_ms); +} + +void PacedSender::UpdateBudgetWithBytesSent(size_t bytes_sent) { + media_budget_->UseBudget(bytes_sent); + padding_budget_->UseBudget(bytes_sent); +} + +void PacedSender::SetPacingFactor(float pacing_factor) { + rtc::CritScope cs(&critsect_); + pacing_factor_ = pacing_factor; + // Make sure new padding factor is applied immediately, otherwise we need to + // wait for the send bitrate estimate to be updated before this takes effect. + SetEstimatedBitrate(estimated_bitrate_bps_); +} + +float PacedSender::GetPacingFactor() const { + rtc::CritScope cs(&critsect_); + return pacing_factor_; +} + +void PacedSender::SetQueueTimeLimit(int limit_ms) { + rtc::CritScope cs(&critsect_); + queue_time_limit = limit_ms; +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.h b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.h new file mode 100644 index 0000000000..c1090ecbc0 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.h @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACED_SENDER_H_ +#define MODULES_PACING_PACED_SENDER_H_ + +#include <memory> + +#include "api/optional.h" +#include "modules/pacing/pacer.h" +#include "modules/pacing/packet_queue2.h" +#include "rtc_base/criticalsection.h" +#include "rtc_base/thread_annotations.h" +#include "typedefs.h" // NOLINT(build/include) + +namespace webrtc { +class AlrDetector; +class BitrateProber; +class Clock; +class ProbeClusterCreatedObserver; +class RtcEventLog; +class IntervalBudget; + +class PacedSender : public Pacer { + public: + class PacketSender { + public: + // Note: packets sent as a result of a callback should not pass by this + // module again. + // Called when it's time to send a queued packet. + // Returns false if packet cannot be sent. + virtual bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& cluster_info) = 0; + // Called when it's a good time to send a padding data. + // Returns the number of bytes sent. + virtual size_t TimeToSendPadding(size_t bytes, + const PacedPacketInfo& cluster_info) = 0; + + protected: + virtual ~PacketSender() {} + }; + + // Expected max pacer delay in ms. If ExpectedQueueTimeMs() is higher than + // this value, the packet producers should wait (eg drop frames rather than + // encoding them). Bitrate sent may temporarily exceed target set by + // UpdateBitrate() so that this limit will be upheld. + static const int64_t kMaxQueueLengthMs; + // Pacing-rate relative to our target send rate. + // Multiplicative factor that is applied to the target bitrate to calculate + // the number of bytes that can be transmitted per interval. + // Increasing this factor will result in lower delays in cases of bitrate + // overshoots from the encoder. + static const float kDefaultPaceMultiplier; + + PacedSender(const Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log); + + PacedSender(const Clock* clock, + PacketSender* packet_sender, + RtcEventLog* event_log, + std::unique_ptr<PacketQueue> packets); + + ~PacedSender() override; + + virtual void CreateProbeCluster(int bitrate_bps); + + // Temporarily pause all sending. + void Pause(); + + // Resume sending packets. + void Resume(); + + // Enable bitrate probing. Enabled by default, mostly here to simplify + // testing. Must be called before any packets are being sent to have an + // effect. + void SetProbingEnabled(bool enabled); + + // Sets the estimated capacity of the network. Must be called once before + // packets can be sent. + // |bitrate_bps| is our estimate of what we are allowed to send on average. + // We will pace out bursts of packets at a bitrate of + // |bitrate_bps| * kDefaultPaceMultiplier. + void SetEstimatedBitrate(uint32_t bitrate_bps) override; + + // Sets the minimum send bitrate and maximum padding bitrate requested by send + // streams. + // |min_send_bitrate_bps| might be higher that the estimated available network + // bitrate and if so, the pacer will send with |min_send_bitrate_bps|. + // |max_padding_bitrate_bps| might be higher than the estimate available + // network bitrate and if so, the pacer will send padding packets to reach + // the min of the estimated available bitrate and |max_padding_bitrate_bps|. + void SetSendBitrateLimits(int min_send_bitrate_bps, + int max_padding_bitrate_bps); + + // Returns true if we send the packet now, else it will add the packet + // information to the queue and call TimeToSendPacket when it's time to send. + void InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) override; + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio) override; + + // Returns the time since the oldest queued packet was enqueued. + virtual int64_t QueueInMs() const; + + virtual size_t QueueSizePackets() const; + + // Returns the time when the first packet was sent, or -1 if no packet is + // sent. + virtual int64_t FirstSentPacketTimeMs() const; + + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + virtual int64_t ExpectedQueueTimeMs() const; + + // Returns time in milliseconds when the current application-limited region + // started or empty result if the sender is currently not application-limited. + // + // Application Limited Region (ALR) refers to operating in a state where the + // traffic on network is limited due to application not having enough + // traffic to meet the current channel capacity. + virtual rtc::Optional<int64_t> GetApplicationLimitedRegionStartTime() const; + + // Returns the number of milliseconds until the module want a worker thread + // to call Process. + int64_t TimeUntilNextProcess() override; + + // Process any pending packets in the queue(s). + void Process() override; + + // Called when the prober is associated with a process thread. + void ProcessThreadAttached(ProcessThread* process_thread) override; + void SetPacingFactor(float pacing_factor); + float GetPacingFactor() const; + void SetQueueTimeLimit(int limit_ms); + + private: + // Updates the number of bytes that can be sent for the next time interval. + void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + void UpdateBudgetWithBytesSent(size_t bytes) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + + bool SendPacket(const PacketQueue::Packet& packet, + const PacedPacketInfo& cluster_info) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) + RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); + + const Clock* const clock_; + PacketSender* const packet_sender_; + const std::unique_ptr<AlrDetector> alr_detector_ RTC_PT_GUARDED_BY(critsect_); + + rtc::CriticalSection critsect_; + bool paused_ RTC_GUARDED_BY(critsect_); + // This is the media budget, keeping track of how many bits of media + // we can pace out during the current interval. + const std::unique_ptr<IntervalBudget> media_budget_ + RTC_PT_GUARDED_BY(critsect_); + // This is the padding budget, keeping track of how many bits of padding we're + // allowed to send out during the current interval. This budget will be + // utilized when there's no media to send. + const std::unique_ptr<IntervalBudget> padding_budget_ + RTC_PT_GUARDED_BY(critsect_); + + const std::unique_ptr<BitrateProber> prober_ RTC_PT_GUARDED_BY(critsect_); + bool probing_send_failure_ RTC_GUARDED_BY(critsect_); + // Actual configured bitrates (media_budget_ may temporarily be higher in + // order to meet pace time constraint). + uint32_t estimated_bitrate_bps_ RTC_GUARDED_BY(critsect_); + uint32_t min_send_bitrate_kbps_ RTC_GUARDED_BY(critsect_); + uint32_t max_padding_bitrate_kbps_ RTC_GUARDED_BY(critsect_); + uint32_t pacing_bitrate_kbps_ RTC_GUARDED_BY(critsect_); + + int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_); + int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_); + + const std::unique_ptr<PacketQueue> packets_ RTC_PT_GUARDED_BY(critsect_); + uint64_t packet_counter_ RTC_GUARDED_BY(critsect_); + ProcessThread* process_thread_ = nullptr; + + float pacing_factor_ RTC_GUARDED_BY(critsect_); + int64_t queue_time_limit RTC_GUARDED_BY(critsect_); + bool account_for_audio_ RTC_GUARDED_BY(critsect_); +}; +} // namespace webrtc +#endif // MODULES_PACING_PACED_SENDER_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/paced_sender_unittest.cc b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender_unittest.cc new file mode 100644 index 0000000000..4281ec220a --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/paced_sender_unittest.cc @@ -0,0 +1,1147 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <list> +#include <memory> +#include <string> + +#include "modules/pacing/paced_sender.h" +#include "system_wrappers/include/clock.h" +#include "system_wrappers/include/field_trial.h" +#include "test/field_trial.h" +#include "test/gmock.h" +#include "test/gtest.h" + +using testing::_; +using testing::Field; +using testing::Return; + +namespace { +constexpr unsigned kFirstClusterBps = 900000; +constexpr unsigned kSecondClusterBps = 1800000; + +// The error stems from truncating the time interval of probe packets to integer +// values. This results in probing slightly higher than the target bitrate. +// For 1.8 Mbps, this comes to be about 120 kbps with 1200 probe packets. +constexpr int kBitrateProbingError = 150000; +} // namespace + +namespace webrtc { +namespace test { + +static const int kTargetBitrateBps = 800000; + +class MockPacedSenderCallback : public PacedSender::PacketSender { + public: + MOCK_METHOD5(TimeToSendPacket, + bool(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info)); + MOCK_METHOD2(TimeToSendPadding, + size_t(size_t bytes, const PacedPacketInfo& pacing_info)); +}; + +class PacedSenderPadding : public PacedSender::PacketSender { + public: + PacedSenderPadding() : padding_sent_(0) {} + + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info) override { + return true; + } + + size_t TimeToSendPadding(size_t bytes, + const PacedPacketInfo& pacing_info) override { + const size_t kPaddingPacketSize = 224; + size_t num_packets = (bytes + kPaddingPacketSize - 1) / kPaddingPacketSize; + padding_sent_ += kPaddingPacketSize * num_packets; + return kPaddingPacketSize * num_packets; + } + + size_t padding_sent() { return padding_sent_; } + + private: + size_t padding_sent_; +}; + +class PacedSenderProbing : public PacedSender::PacketSender { + public: + PacedSenderProbing() : packets_sent_(0), padding_sent_(0) {} + + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + bool retransmission, + const PacedPacketInfo& pacing_info) override { + packets_sent_++; + return true; + } + + size_t TimeToSendPadding(size_t bytes, + const PacedPacketInfo& pacing_info) override { + padding_sent_ += bytes; + return padding_sent_; + } + + int packets_sent() const { return packets_sent_; } + + int padding_sent() const { return padding_sent_; } + + private: + int packets_sent_; + int padding_sent_; +}; + +class PacedSenderTest : public testing::TestWithParam<std::string> { + protected: + PacedSenderTest() : clock_(123456), field_trial_(GetParam()) { + srand(0); + // Need to initialize PacedSender after we initialize clock. + send_bucket_.reset(new PacedSender(&clock_, &callback_, nullptr)); + send_bucket_->CreateProbeCluster(kFirstClusterBps); + send_bucket_->CreateProbeCluster(kSecondClusterBps); + // Default to bitrate probing disabled for testing purposes. Probing tests + // have to enable probing, either by creating a new PacedSender instance or + // by calling SetProbingEnabled(true). + send_bucket_->SetProbingEnabled(false); + send_bucket_->SetEstimatedBitrate(kTargetBitrateBps); + + clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); + } + + void SendAndExpectPacket(PacedSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t size, + bool retransmission) { + send_bucket_->InsertPacket(priority, ssrc, sequence_number, capture_time_ms, + size, retransmission); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + capture_time_ms, retransmission, _)) + .Times(1) + .WillRepeatedly(Return(true)); + } + SimulatedClock clock_; + MockPacedSenderCallback callback_; + std::unique_ptr<PacedSender> send_bucket_; + test::ScopedFieldTrials field_trial_; +}; + +INSTANTIATE_TEST_CASE_P(RoundRobin, + PacedSenderTest, + ::testing::Values("WebRTC-RoundRobinPacing/Disabled/", + "WebRTC-RoundRobinPacing/Enabled/")); + +TEST_P(PacedSenderTest, FirstSentPacketTimeIsSet) { + uint16_t sequence_number = 1234; + const uint32_t kSsrc = 12345; + const size_t kSizeBytes = 250; + const size_t kPacketToSend = 3; + const int64_t kStartMs = clock_.TimeInMilliseconds(); + + // No packet sent. + EXPECT_EQ(-1, send_bucket_->FirstSentPacketTimeMs()); + + for (size_t i = 0; i < kPacketToSend; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, kSsrc, sequence_number++, + clock_.TimeInMilliseconds(), kSizeBytes, false); + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); + } + EXPECT_EQ(kStartMs, send_bucket_->FirstSentPacketTimeMs()); +} + +TEST_P(PacedSenderTest, QueuePacket) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + + int64_t queued_packet_timestamp = clock_.TimeInMilliseconds(); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, queued_packet_timestamp, 250, + false); + EXPECT_EQ(packets_to_send + 1, send_bucket_->QueueSizePackets()); + send_bucket_->Process(); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + clock_.AdvanceTimeMilliseconds(4); + EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(1); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(1u, send_bucket_->QueueSizePackets()); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number++, + queued_packet_timestamp, false, _)) + .Times(1) + .WillRepeatedly(Return(true)); + send_bucket_->Process(); + sequence_number++; + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + + // We can send packets_to_send -1 packets of size 250 during the current + // interval since one packet has already been sent. + for (size_t i = 0; i < packets_to_send - 1; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + EXPECT_EQ(packets_to_send, send_bucket_->QueueSizePackets()); + send_bucket_->Process(); + EXPECT_EQ(1u, send_bucket_->QueueSizePackets()); +} + +TEST_P(PacedSenderTest, PaceQueuedPackets) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + + for (size_t j = 0; j < packets_to_send_per_interval * 10; ++j) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + 250, false); + } + EXPECT_EQ(packets_to_send_per_interval + packets_to_send_per_interval * 10, + send_bucket_->QueueSizePackets()); + send_bucket_->Process(); + EXPECT_EQ(packets_to_send_per_interval * 10, + send_bucket_->QueueSizePackets()); + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + for (int k = 0; k < 10; ++k) { + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, _, false, _)) + .Times(packets_to_send_per_interval) + .WillRepeatedly(Return(true)); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + } + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + send_bucket_->Process(); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, clock_.TimeInMilliseconds(), 250, + false); + send_bucket_->Process(); + EXPECT_EQ(1u, send_bucket_->QueueSizePackets()); +} + +TEST_P(PacedSenderTest, RepeatedRetransmissionsAllowed) { + // Send one packet, then two retransmissions of that packet. + for (size_t i = 0; i < 3; i++) { + constexpr uint32_t ssrc = 333; + constexpr uint16_t sequence_number = 444; + constexpr size_t bytes = 250; + bool is_retransmission = (i != 0); // Original followed by retransmissions. + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number, + clock_.TimeInMilliseconds(), bytes, is_retransmission); + clock_.AdvanceTimeMilliseconds(5); + } + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number, + clock_.TimeInMilliseconds(), + 250, + false); + + // Expect packet on second ssrc to be queued and sent as well. + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc + 1, + sequence_number, + clock_.TimeInMilliseconds(), + 250, + false); + + clock_.AdvanceTimeMilliseconds(1000); + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, Padding) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + + send_bucket_->SetEstimatedBitrate(kTargetBitrateBps); + send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps); + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + // No padding is expected since we have sent too much already. + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + + // 5 milliseconds later should not send padding since we filled the buffers + // initially. + EXPECT_CALL(callback_, TimeToSendPadding(250, _)).Times(0); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + + // 5 milliseconds later we have enough budget to send some padding. + EXPECT_CALL(callback_, TimeToSendPadding(250, _)) + .Times(1) + .WillOnce(Return(250)); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, NoPaddingBeforeNormalPacket) { + send_bucket_->SetEstimatedBitrate(kTargetBitrateBps); + send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps); + + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); + + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); + + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + capture_time_ms, 250, false); + EXPECT_CALL(callback_, TimeToSendPadding(250, _)) + .Times(1) + .WillOnce(Return(250)); + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, VerifyPaddingUpToBitrate) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 100; + send_bucket_->SetEstimatedBitrate(kTargetBitrateBps); + send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps); + + int64_t start_time = clock_.TimeInMilliseconds(); + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number++, + capture_time_ms, + 250, + false); + EXPECT_CALL(callback_, TimeToSendPadding(250, _)) + .Times(1) + .WillOnce(Return(250)); + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(kTimeStep); + } +} + +TEST_P(PacedSenderTest, VerifyAverageBitrateVaryingMediaPayload) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + const int kTimeStep = 5; + const int64_t kBitrateWindow = 10000; + PacedSenderPadding callback; + send_bucket_.reset(new PacedSender(&clock_, &callback, nullptr)); + send_bucket_->SetProbingEnabled(false); + send_bucket_->SetEstimatedBitrate(kTargetBitrateBps); + + send_bucket_->SetSendBitrateLimits( + 0 /*allocated_bitrate_bps*/, + kTargetBitrateBps * 2 /* max_padding_bitrate_bps */); + + int64_t start_time = clock_.TimeInMilliseconds(); + size_t media_bytes = 0; + while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) { + int rand_value = rand(); // NOLINT (rand_r instead of rand) + size_t media_payload = rand_value % 100 + 200; // [200, 300] bytes. + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, + media_payload, false); + media_bytes += media_payload; + clock_.AdvanceTimeMilliseconds(kTimeStep); + send_bucket_->Process(); + } + EXPECT_NEAR(kTargetBitrateBps / 1000, + static_cast<int>(8 * (media_bytes + callback.padding_sent()) / + kBitrateWindow), + 1); +} + +TEST_P(PacedSenderTest, Priority) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + int64_t capture_time_ms_low_priority = 1234567; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + send_bucket_->Process(); + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + + // Expect normal and low priority to be queued and high to pass through. + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority, + sequence_number++, capture_time_ms_low_priority, + 250, false); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + } + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + + // Expect all high and normal priority to be sent out first. + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false, _)) + .Times(packets_to_send_per_interval + 1) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + EXPECT_EQ(1u, send_bucket_->QueueSizePackets()); + + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc_low_priority, _, + capture_time_ms_low_priority, false, _)) + .Times(1) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, RetransmissionPriority) { + uint32_t ssrc = 12345; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 45678; + int64_t capture_time_ms_retransmission = 56789; + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + send_bucket_->Process(); + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); + + // Alternate retransmissions and normal packets. + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, + capture_time_ms_retransmission, 250, true); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + } + EXPECT_EQ(2 * packets_to_send_per_interval, send_bucket_->QueueSizePackets()); + + // Expect all retransmissions to be sent out first despite having a later + // capture time. + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, false, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket( + ssrc, _, capture_time_ms_retransmission, true, _)) + .Times(packets_to_send_per_interval) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + EXPECT_EQ(packets_to_send_per_interval, send_bucket_->QueueSizePackets()); + + // Expect the remaining (non-retransmission) packets to be sent. + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, true, _)).Times(0); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, false, _)) + .Times(packets_to_send_per_interval) + .WillRepeatedly(Return(true)); + + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); +} + +TEST_P(PacedSenderTest, HighPrioDoesntAffectBudget) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = 56789; + + // As high prio packets doesn't affect the budget, we should be able to send + // a high number of them at once. + for (int i = 0; i < 25; ++i) { + SendAndExpectPacket(PacedSender::kHighPriority, ssrc, sequence_number++, + capture_time_ms, 250, false); + } + send_bucket_->Process(); + // Low prio packets does affect the budget. + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kLowPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc, sequence_number, + capture_time_ms, 250, false); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + EXPECT_EQ(1u, send_bucket_->QueueSizePackets()); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number++, + capture_time_ms, false, _)) + .Times(1) + .WillRepeatedly(Return(true)); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->Process(); + EXPECT_EQ(0u, send_bucket_->QueueSizePackets()); +} + +TEST_P(PacedSenderTest, Pause) { + uint32_t ssrc_low_priority = 12345; + uint32_t ssrc = 12346; + uint32_t ssrc_high_priority = 12347; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + // Due to the multiplicative factor we can send 5 packets during a send + // interval. (network capacity * multiplier / (8 bits per byte * + // (packet size * #send intervals per second) + const size_t packets_to_send_per_interval = + kTargetBitrateBps * PacedSender::kDefaultPaceMultiplier / (8 * 250 * 200); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), 250, false); + } + + send_bucket_->Process(); + + send_bucket_->Pause(); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, capture_time_ms, 250, false); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc_high_priority, + sequence_number++, capture_time_ms, 250, false); + } + clock_.AdvanceTimeMilliseconds(10000); + int64_t second_capture_time_ms = clock_.TimeInMilliseconds(); + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + send_bucket_->InsertPacket(PacedSender::kLowPriority, ssrc_low_priority, + sequence_number++, second_capture_time_ms, 250, + false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, second_capture_time_ms, 250, + false); + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc_high_priority, + sequence_number++, second_capture_time_ms, 250, + false); + } + + // Expect everything to be queued. + EXPECT_EQ(second_capture_time_ms - capture_time_ms, + send_bucket_->QueueInMs()); + + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + send_bucket_->Process(); + + int64_t expected_time_until_send = 500; + EXPECT_CALL(callback_, TimeToSendPadding(1, _)).Times(1); + while (expected_time_until_send >= 0) { + // TimeUntilNextProcess must not return 0 when paused. If it does, + // we risk running a busy loop, so ideally it should return a large value. + EXPECT_EQ(expected_time_until_send, send_bucket_->TimeUntilNextProcess()); + if (expected_time_until_send == 0) + send_bucket_->Process(); + clock_.AdvanceTimeMilliseconds(5); + expected_time_until_send -= 5; + } + + // Expect high prio packets to come out first followed by normal + // prio packets and low prio packets (all in capture order). + { + ::testing::InSequence sequence; + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc_high_priority, _, capture_time_ms, _, _)) + .Times(packets_to_send_per_interval) + .WillRepeatedly(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc_high_priority, _, + second_capture_time_ms, _, _)) + .Times(packets_to_send_per_interval) + .WillRepeatedly(Return(true)); + + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms, _, _)) + .Times(1) + .WillRepeatedly(Return(true)); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, _, second_capture_time_ms, _, _)) + .Times(1) + .WillRepeatedly(Return(true)); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc_low_priority, _, capture_time_ms, _, _)) + .Times(1) + .WillRepeatedly(Return(true)); + } + for (size_t i = 0; i < packets_to_send_per_interval; ++i) { + EXPECT_CALL(callback_, TimeToSendPacket(ssrc_low_priority, _, + second_capture_time_ms, _, _)) + .Times(1) + .WillRepeatedly(Return(true)); + } + } + send_bucket_->Resume(); + + for (size_t i = 0; i < 4; i++) { + EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess()); + send_bucket_->Process(); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(5); + } + + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_P(PacedSenderTest, ResendPacket) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + int64_t capture_time_ms = clock_.TimeInMilliseconds(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, capture_time_ms, 250, false); + clock_.AdvanceTimeMilliseconds(1); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number + 1, capture_time_ms + 1, 250, + false); + clock_.AdvanceTimeMilliseconds(9999); + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, + send_bucket_->QueueInMs()); + // Fails to send first packet so only one call. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + capture_time_ms, false, _)) + .Times(1) + .WillOnce(Return(false)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + + // Queue remains unchanged. + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms, + send_bucket_->QueueInMs()); + + // Fails to send second packet. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + capture_time_ms, false, _)) + .Times(1) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + capture_time_ms + 1, false, _)) + .Times(1) + .WillOnce(Return(false)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + + // Queue is reduced by 1 packet. + EXPECT_EQ(clock_.TimeInMilliseconds() - capture_time_ms - 1, + send_bucket_->QueueInMs()); + + // Send second packet and queue becomes empty. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + capture_time_ms + 1, false, _)) + .Times(1) + .WillOnce(Return(true)); + clock_.AdvanceTimeMilliseconds(10000); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_P(PacedSenderTest, ExpectedQueueTimeMs) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kNumPackets = 60; + const size_t kPacketSize = 1200; + const int32_t kMaxBitrate = PacedSender::kDefaultPaceMultiplier * 30000; + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + + send_bucket_->SetEstimatedBitrate(30000); + for (size_t i = 0; i < kNumPackets; ++i) { + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + } + + // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second) + int64_t queue_in_ms = + static_cast<int64_t>(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate); + EXPECT_EQ(queue_in_ms, send_bucket_->ExpectedQueueTimeMs()); + + int64_t time_start = clock_.TimeInMilliseconds(); + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + int64_t duration = clock_.TimeInMilliseconds() - time_start; + + EXPECT_EQ(0, send_bucket_->ExpectedQueueTimeMs()); + + // Allow for aliasing, duration should be within one pack of max time limit. + EXPECT_NEAR(duration, PacedSender::kMaxQueueLengthMs, + static_cast<int64_t>(1000 * kPacketSize * 8 / kMaxBitrate)); +} + +TEST_P(PacedSenderTest, QueueTimeGrowsOverTime) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + EXPECT_EQ(0, send_bucket_->QueueInMs()); + + send_bucket_->SetEstimatedBitrate(30000); + SendAndExpectPacket(PacedSender::kNormalPriority, + ssrc, + sequence_number, + clock_.TimeInMilliseconds(), + 1200, + false); + + clock_.AdvanceTimeMilliseconds(500); + EXPECT_EQ(500, send_bucket_->QueueInMs()); + send_bucket_->Process(); + EXPECT_EQ(0, send_bucket_->QueueInMs()); +} + +TEST_P(PacedSenderTest, ProbingWithInsertedPackets) { + const size_t kPacketSize = 1200; + const int kInitialBitrateBps = 300000; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + + PacedSenderProbing packet_sender; + send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr)); + send_bucket_->CreateProbeCluster(kFirstClusterBps); + send_bucket_->CreateProbeCluster(kSecondClusterBps); + send_bucket_->SetEstimatedBitrate(kInitialBitrateBps); + + for (int i = 0; i < 10; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + } + + int64_t start = clock_.TimeInMilliseconds(); + while (packet_sender.packets_sent() < 5) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + clock_.AdvanceTimeMilliseconds(time_until_process); + send_bucket_->Process(); + } + int packets_sent = packet_sender.packets_sent(); + // Validate first cluster bitrate. Note that we have to account for number + // of intervals and hence (packets_sent - 1) on the first cluster. + EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 / + (clock_.TimeInMilliseconds() - start), + kFirstClusterBps, kBitrateProbingError); + EXPECT_EQ(0, packet_sender.padding_sent()); + + clock_.AdvanceTimeMilliseconds(send_bucket_->TimeUntilNextProcess()); + start = clock_.TimeInMilliseconds(); + while (packet_sender.packets_sent() < 10) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + clock_.AdvanceTimeMilliseconds(time_until_process); + send_bucket_->Process(); + } + packets_sent = packet_sender.packets_sent() - packets_sent; + // Validate second cluster bitrate. + EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 / + (clock_.TimeInMilliseconds() - start), + kSecondClusterBps, kBitrateProbingError); +} + +TEST_P(PacedSenderTest, ProbingWithPaddingSupport) { + const size_t kPacketSize = 1200; + const int kInitialBitrateBps = 300000; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + + PacedSenderProbing packet_sender; + send_bucket_.reset(new PacedSender(&clock_, &packet_sender, nullptr)); + send_bucket_->CreateProbeCluster(kFirstClusterBps); + send_bucket_->SetEstimatedBitrate(kInitialBitrateBps); + + for (int i = 0; i < 3; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + } + + int64_t start = clock_.TimeInMilliseconds(); + int process_count = 0; + while (process_count < 5) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + clock_.AdvanceTimeMilliseconds(time_until_process); + send_bucket_->Process(); + ++process_count; + } + int packets_sent = packet_sender.packets_sent(); + int padding_sent = packet_sender.padding_sent(); + EXPECT_GT(packets_sent, 0); + EXPECT_GT(padding_sent, 0); + // Note that the number of intervals here for kPacketSize is + // packets_sent due to padding in the same cluster. + EXPECT_NEAR((packets_sent * kPacketSize * 8000 + padding_sent) / + (clock_.TimeInMilliseconds() - start), + kFirstClusterBps, kBitrateProbingError); +} + +TEST_P(PacedSenderTest, PriorityInversion) { + // In this test capture timestamps are used to order packets, capture + // timestamps are not used in PacketQueue2. + if (webrtc::field_trial::IsEnabled("WebRTC-RoundRobinPacing")) + return; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + send_bucket_->InsertPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, kPacketSize, true); + + send_bucket_->InsertPacket( + PacedSender::kHighPriority, ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, kPacketSize, true); + + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, sequence_number, + clock_.TimeInMilliseconds(), kPacketSize, true); + + send_bucket_->InsertPacket(PacedSender::kHighPriority, ssrc, + sequence_number + 1, clock_.TimeInMilliseconds(), + kPacketSize, true); + + // Packets from earlier frames should be sent first. + { + ::testing::InSequence sequence; + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number, + clock_.TimeInMilliseconds(), true, _)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number + 1, + clock_.TimeInMilliseconds(), true, _)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number + 3, + clock_.TimeInMilliseconds() + 33, true, _)) + .WillOnce(Return(true)); + EXPECT_CALL(callback_, + TimeToSendPacket(ssrc, sequence_number + 2, + clock_.TimeInMilliseconds() + 33, true, _)) + .WillOnce(Return(true)); + + while (send_bucket_->QueueSizePackets() > 0) { + int time_until_process = send_bucket_->TimeUntilNextProcess(); + if (time_until_process <= 0) { + send_bucket_->Process(); + } else { + clock_.AdvanceTimeMilliseconds(time_until_process); + } + } + } +} + +TEST_P(PacedSenderTest, PaddingOveruse) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + send_bucket_->Process(); + send_bucket_->SetEstimatedBitrate(60000); + send_bucket_->SetSendBitrateLimits(60000, 0); + + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + send_bucket_->Process(); + + // Add 30kbit padding. When increasing budget, media budget will increase from + // negative (overuse) while padding budget will increase from 0. + clock_.AdvanceTimeMilliseconds(5); + send_bucket_->SetSendBitrateLimits(60000, 30000); + + SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize, false); + EXPECT_LT(5u, send_bucket_->ExpectedQueueTimeMs()); + // Don't send padding if queue is non-empty, even if padding budget > 0. + EXPECT_CALL(callback_, TimeToSendPadding(_, _)).Times(0); + send_bucket_->Process(); +} + +// TODO(philipel): Move to PacketQueue2 unittests. +#if 0 +TEST_F(PacedSenderTest, AverageQueueTime) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + const int kBitrateBps = 10 * kPacketSize * 8; // 10 packets per second. + + send_bucket_->SetEstimatedBitrate(kBitrateBps); + + EXPECT_EQ(0, send_bucket_->AverageQueueTimeMs()); + + int64_t first_capture_time = clock_.TimeInMilliseconds(); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, first_capture_time, kPacketSize, + false); + clock_.AdvanceTimeMilliseconds(10); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number + 1, clock_.TimeInMilliseconds(), + kPacketSize, false); + clock_.AdvanceTimeMilliseconds(10); + + EXPECT_EQ((20 + 10) / 2, send_bucket_->AverageQueueTimeMs()); + + // Only first packet (queued for 20ms) should be removed, leave the second + // packet (queued for 10ms) alone in the queue. + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number, + first_capture_time, false, _)) + .Times(1) + .WillRepeatedly(Return(true)); + send_bucket_->Process(); + + EXPECT_EQ(10, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(10); + EXPECT_CALL(callback_, TimeToSendPacket(ssrc, sequence_number + 1, + first_capture_time + 10, false, _)) + .Times(1) + .WillRepeatedly(Return(true)); + for (int i = 0; i < 3; ++i) { + clock_.AdvanceTimeMilliseconds(30); // Max delta. + send_bucket_->Process(); + } + + EXPECT_EQ(0, send_bucket_->AverageQueueTimeMs()); +} +#endif + +TEST_P(PacedSenderTest, ProbeClusterId) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = 1200; + + send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps); + send_bucket_->SetProbingEnabled(true); + for (int i = 0; i < 10; ++i) { + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number + i, clock_.TimeInMilliseconds(), + kPacketSize, false); + } + + // First probing cluster. + EXPECT_CALL(callback_, + TimeToSendPacket(_, _, _, _, + Field(&PacedPacketInfo::probe_cluster_id, 0))) + .Times(5) + .WillRepeatedly(Return(true)); + for (int i = 0; i < 5; ++i) { + clock_.AdvanceTimeMilliseconds(20); + send_bucket_->Process(); + } + + // Second probing cluster. + EXPECT_CALL(callback_, + TimeToSendPacket(_, _, _, _, + Field(&PacedPacketInfo::probe_cluster_id, 1))) + .Times(5) + .WillRepeatedly(Return(true)); + for (int i = 0; i < 5; ++i) { + clock_.AdvanceTimeMilliseconds(20); + send_bucket_->Process(); + } + + // Needed for the Field comparer below. + const int kNotAProbe = PacedPacketInfo::kNotAProbe; + // No more probing packets. + EXPECT_CALL(callback_, + TimeToSendPadding( + _, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe))) + .Times(1) + .WillRepeatedly(Return(500)); + send_bucket_->Process(); +} + +TEST_P(PacedSenderTest, AvoidBusyLoopOnSendFailure) { + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + const size_t kPacketSize = kFirstClusterBps / (8000 / 10); + + send_bucket_->SetSendBitrateLimits(kTargetBitrateBps, kTargetBitrateBps); + send_bucket_->SetProbingEnabled(true); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc, + sequence_number, clock_.TimeInMilliseconds(), + kPacketSize, false); + + EXPECT_CALL(callback_, TimeToSendPacket(_, _, _, _, _)) + .WillOnce(Return(true)); + send_bucket_->Process(); + EXPECT_EQ(10, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(9); + + EXPECT_CALL(callback_, TimeToSendPadding(_, _)) + .Times(2) + .WillRepeatedly(Return(0)); + send_bucket_->Process(); + EXPECT_EQ(1, send_bucket_->TimeUntilNextProcess()); + clock_.AdvanceTimeMilliseconds(1); + send_bucket_->Process(); + EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); +} + +// TODO(philipel): Move to PacketQueue2 unittests. +#if 0 +TEST_F(PacedSenderTest, QueueTimeWithPause) { + const size_t kPacketSize = 1200; + const uint32_t kSsrc = 12346; + uint16_t sequence_number = 1234; + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Pause(); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + // In paused state, queue time should not increase. + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Resume(); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(200, send_bucket_->AverageQueueTimeMs()); +} + +TEST_P(PacedSenderTest, QueueTimePausedDuringPush) { + const size_t kPacketSize = 1200; + const uint32_t kSsrc = 12346; + uint16_t sequence_number = 1234; + + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + clock_.AdvanceTimeMilliseconds(100); + send_bucket_->Pause(); + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(100, send_bucket_->AverageQueueTimeMs()); + + // Add a new packet during paused phase. + send_bucket_->InsertPacket(PacedSender::kNormalPriority, kSsrc, + sequence_number++, clock_.TimeInMilliseconds(), + kPacketSize, false); + // From a queue time perspective, packet inserted during pause will have zero + // queue time. Average queue time will then be (0 + 100) / 2 = 50. + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + send_bucket_->Resume(); + EXPECT_EQ(50, send_bucket_->AverageQueueTimeMs()); + + clock_.AdvanceTimeMilliseconds(100); + EXPECT_EQ(150, send_bucket_->AverageQueueTimeMs()); +} +#endif + +// TODO(sprang): Extract PacketQueue from PacedSender so that we can test +// removing elements while paused. (This is possible, but only because of semi- +// racy condition so can't easily be tested). + +} // namespace test +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/pacer.h b/third_party/libwebrtc/webrtc/modules/pacing/pacer.h new file mode 100644 index 0000000000..8b43e1851e --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/pacer.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACER_H_ +#define MODULES_PACING_PACER_H_ + +#include "modules/include/module.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { +class Pacer : public Module, public RtpPacketSender { + public: + virtual void SetEstimatedBitrate(uint32_t bitrate_bps) {} + virtual void SetEstimatedBitrateAndCongestionWindow( + uint32_t bitrate_bps, + bool in_probe_rtt, + uint64_t congestion_window) {} + virtual void OnBytesAcked(size_t bytes) {} + void InsertPacket(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_time_ms, + size_t bytes, + bool retransmission) override = 0; + int64_t TimeUntilNextProcess() override = 0; + void Process() override = 0; + ~Pacer() override {} +}; +} // namespace webrtc +#endif // MODULES_PACING_PACER_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/pacing_gn/moz.build b/third_party/libwebrtc/webrtc/modules/pacing/pacing_gn/moz.build new file mode 100644 index 0000000000..1112278dd4 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/pacing_gn/moz.build @@ -0,0 +1,231 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + + ### This moz.build was AUTOMATICALLY GENERATED from a GN config, ### + ### DO NOT edit it by hand. ### + +COMPILE_FLAGS["OS_INCLUDES"] = [] +AllowCompilerWarnings() + +DEFINES["CHROMIUM_BUILD"] = True +DEFINES["V8_DEPRECATION_WARNINGS"] = True +DEFINES["WEBRTC_ENABLE_PROTOBUF"] = "0" +DEFINES["WEBRTC_MOZILLA_BUILD"] = True +DEFINES["WEBRTC_NON_STATIC_TRACE_EVENT_HANDLERS"] = "0" +DEFINES["WEBRTC_RESTRICT_LOGGING"] = True + +FINAL_LIBRARY = "webrtc" + + +LOCAL_INCLUDES += [ + "!/ipc/ipdl/_ipdlheaders", + "/ipc/chromium/src", + "/ipc/glue", + "/third_party/libwebrtc/webrtc/" +] + +UNIFIED_SOURCES += [ + "/third_party/libwebrtc/webrtc/modules/pacing/alr_detector.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/bitrate_prober.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/interval_budget.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/paced_sender.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.cc", + "/third_party/libwebrtc/webrtc/modules/pacing/packet_router.cc" +] + +if not CONFIG["MOZ_DEBUG"]: + + DEFINES["DYNAMIC_ANNOTATIONS_ENABLED"] = "0" + DEFINES["NDEBUG"] = True + DEFINES["NVALGRIND"] = True + +if CONFIG["MOZ_DEBUG"] == "1": + + DEFINES["DYNAMIC_ANNOTATIONS_ENABLED"] = "1" + DEFINES["WTF_USE_DYNAMIC_ANNOTATIONS"] = "1" + +if CONFIG["OS_TARGET"] == "Android": + + DEFINES["ANDROID"] = True + DEFINES["ANDROID_NDK_VERSION"] = "r12b" + DEFINES["DISABLE_NACL"] = True + DEFINES["HAVE_SYS_UIO_H"] = True + DEFINES["NO_TCMALLOC"] = True + DEFINES["USE_OPENSSL_CERTS"] = "1" + DEFINES["WEBRTC_ANDROID"] = True + DEFINES["WEBRTC_ANDROID_OPENSLES"] = True + DEFINES["WEBRTC_LINUX"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + DEFINES["__GNU_SOURCE"] = "1" + + OS_LIBS += [ + "log" + ] + +if CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["NO_TCMALLOC"] = True + DEFINES["WEBRTC_MAC"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["__ASSERT_MACROS_DEFINE_VERSIONS_WITHOUT_UNDERSCORE"] = "0" + + OS_LIBS += [ + "-framework Foundation" + ] + +if CONFIG["OS_TARGET"] == "DragonFly": + + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_BSD"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + +if CONFIG["OS_TARGET"] == "FreeBSD": + + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_BSD"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + +if CONFIG["OS_TARGET"] == "Linux": + + DEFINES["USE_NSS_CERTS"] = "1" + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_LINUX"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + + OS_LIBS += [ + "rt" + ] + +if CONFIG["OS_TARGET"] == "NetBSD": + + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_BSD"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + +if CONFIG["OS_TARGET"] == "OpenBSD": + + DEFINES["USE_X11"] = "1" + DEFINES["WEBRTC_BSD"] = True + DEFINES["WEBRTC_POSIX"] = True + DEFINES["_FILE_OFFSET_BITS"] = "64" + +if CONFIG["OS_TARGET"] == "WINNT": + + DEFINES["CERT_CHAIN_PARA_HAS_EXTRA_FIELDS"] = True + DEFINES["NOMINMAX"] = True + DEFINES["NO_TCMALLOC"] = True + DEFINES["NTDDI_VERSION"] = "0x0A000000" + DEFINES["PSAPI_VERSION"] = "1" + DEFINES["UNICODE"] = True + DEFINES["WEBRTC_WIN"] = True + DEFINES["WIN32"] = True + DEFINES["WIN32_LEAN_AND_MEAN"] = True + DEFINES["WINVER"] = "0x0A00" + DEFINES["_ATL_NO_OPENGL"] = True + DEFINES["_CRT_RAND_S"] = True + DEFINES["_CRT_SECURE_NO_DEPRECATE"] = True + DEFINES["_CRT_SECURE_NO_WARNINGS"] = True + DEFINES["_HAS_EXCEPTIONS"] = "0" + DEFINES["_SCL_SECURE_NO_DEPRECATE"] = True + DEFINES["_SECURE_ATL"] = True + DEFINES["_UNICODE"] = True + DEFINES["_USING_V110_SDK71_"] = True + DEFINES["_WIN32_WINNT"] = "0x0A00" + DEFINES["_WINDOWS"] = True + DEFINES["__STD_C"] = True + + OS_LIBS += [ + "winmm" + ] + +if CONFIG["CPU_ARCH"] == "aarch64": + + DEFINES["WEBRTC_ARCH_ARM64"] = True + DEFINES["WEBRTC_HAS_NEON"] = True + +if CONFIG["CPU_ARCH"] == "arm": + + CXXFLAGS += [ + "-mfpu=neon" + ] + + DEFINES["WEBRTC_ARCH_ARM"] = True + DEFINES["WEBRTC_ARCH_ARM_V7"] = True + DEFINES["WEBRTC_HAS_NEON"] = True + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "Android": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "DragonFly": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "FreeBSD": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "NetBSD": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if not CONFIG["MOZ_DEBUG"] and CONFIG["OS_TARGET"] == "OpenBSD": + + DEFINES["_FORTIFY_SOURCE"] = "2" + +if CONFIG["CPU_ARCH"] == "x86" and CONFIG["OS_TARGET"] == "Android": + + CXXFLAGS += [ + "-msse2" + ] + +if CONFIG["CPU_ARCH"] == "aarch64" and CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["CR_XCODE_VERSION"] = "0120" + +if CONFIG["CPU_ARCH"] == "x86_64" and CONFIG["OS_TARGET"] == "Darwin": + + DEFINES["CR_XCODE_VERSION"] = "0920" + +if CONFIG["CPU_ARCH"] == "x86" and CONFIG["OS_TARGET"] == "FreeBSD": + + CXXFLAGS += [ + "-msse2" + ] + +if CONFIG["CPU_ARCH"] == "aarch64" and CONFIG["OS_TARGET"] == "Linux": + + DEFINES["DISABLE_NACL"] = True + DEFINES["NO_TCMALLOC"] = True + +if CONFIG["CPU_ARCH"] == "x86" and CONFIG["OS_TARGET"] == "Linux": + + CXXFLAGS += [ + "-msse2" + ] + +if CONFIG["CPU_ARCH"] == "x86" and CONFIG["OS_TARGET"] == "NetBSD": + + CXXFLAGS += [ + "-msse2" + ] + +if CONFIG["CPU_ARCH"] == "x86" and CONFIG["OS_TARGET"] == "OpenBSD": + + CXXFLAGS += [ + "-msse2" + ] + +Library("pacing_gn") diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.cc b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.cc new file mode 100644 index 0000000000..e0308d160c --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.cc @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_queue.h" + +#include <algorithm> +#include <list> +#include <vector> + +#include "modules/include/module_common_types.h" +#include "modules/pacing/alr_detector.h" +#include "modules/pacing/bitrate_prober.h" +#include "modules/pacing/interval_budget.h" +#include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" +#include "system_wrappers/include/clock.h" +#include "system_wrappers/include/field_trial.h" + +namespace webrtc { + +PacketQueue::Packet::Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order) + : priority(priority), + ssrc(ssrc), + sequence_number(seq_number), + capture_time_ms(capture_time_ms), + enqueue_time_ms(enqueue_time_ms), + sum_paused_ms(0), + bytes(length_in_bytes), + retransmission(retransmission), + enqueue_order(enqueue_order) {} + +PacketQueue::Packet::Packet(const Packet& other) = default; + +PacketQueue::Packet::~Packet() {} + +PacketQueue::PacketQueue(const Clock* clock) + : bytes_(0), + clock_(clock), + queue_time_sum_(0), + time_last_updated_(clock_->TimeInMilliseconds()), + paused_(false) {} + +PacketQueue::~PacketQueue() {} + +void PacketQueue::Push(const Packet& packet) { + UpdateQueueTime(packet.enqueue_time_ms); + + // Store packet in list, use pointers in priority queue for cheaper moves. + // Packets have a handle to its own iterator in the list, for easy removal + // when popping from queue. + packet_list_.push_front(packet); + std::list<Packet>::iterator it = packet_list_.begin(); + it->this_it = it; // Handle for direct removal from list. + prio_queue_.push(&(*it)); // Pointer into list. + bytes_ += packet.bytes; +} + +const PacketQueue::Packet& PacketQueue::BeginPop() { + const PacketQueue::Packet& packet = *prio_queue_.top(); + prio_queue_.pop(); + return packet; +} + +void PacketQueue::CancelPop(const PacketQueue::Packet& packet) { + prio_queue_.push(&(*packet.this_it)); +} + +void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) { + bytes_ -= packet.bytes; + int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; + RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); + packet_queue_time_ms -= packet.sum_paused_ms; + RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); + queue_time_sum_ -= packet_queue_time_ms; + packet_list_.erase(packet.this_it); + RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); + if (packet_list_.empty()) + RTC_DCHECK_EQ(0, queue_time_sum_); +} + +bool PacketQueue::Empty() const { + return prio_queue_.empty(); +} + +size_t PacketQueue::SizeInPackets() const { + return prio_queue_.size(); +} + +uint64_t PacketQueue::SizeInBytes() const { + return bytes_; +} + +int64_t PacketQueue::OldestEnqueueTimeMs() const { + auto it = packet_list_.rbegin(); + if (it == packet_list_.rend()) + return 0; + return it->enqueue_time_ms; +} + +void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) { + RTC_DCHECK_GE(timestamp_ms, time_last_updated_); + if (timestamp_ms == time_last_updated_) + return; + + int64_t delta_ms = timestamp_ms - time_last_updated_; + + if (paused_) { + // Increase per-packet accumulators of time spent in queue while paused, + // so that we can disregard that when subtracting main accumulator when + // popping packet from the queue. + for (auto& it : packet_list_) { + it.sum_paused_ms += delta_ms; + } + } else { + // Use packet packet_list_.size() not prio_queue_.size() here, as there + // might be an outstanding element popped from prio_queue_ currently in + // the SendPacket() call, while packet_list_ will always be correct. + queue_time_sum_ += delta_ms * packet_list_.size(); + } + time_last_updated_ = timestamp_ms; +} + +void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { + if (paused_ == paused) + return; + UpdateQueueTime(timestamp_ms); + paused_ = paused; +} + +int64_t PacketQueue::AverageQueueTimeMs() const { + if (prio_queue_.empty()) + return 0; + return queue_time_sum_ / packet_list_.size(); +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.h b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.h new file mode 100644 index 0000000000..240961601c --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_QUEUE_H_ +#define MODULES_PACING_PACKET_QUEUE_H_ + +#include <list> +#include <queue> +#include <set> +#include <vector> + +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { + +class PacketQueue { + public: + explicit PacketQueue(const Clock* clock); + virtual ~PacketQueue(); + + struct Packet { + Packet(RtpPacketSender::Priority priority, + uint32_t ssrc, + uint16_t seq_number, + int64_t capture_time_ms, + int64_t enqueue_time_ms, + size_t length_in_bytes, + bool retransmission, + uint64_t enqueue_order); + + Packet(const Packet& other); + + virtual ~Packet(); + + bool operator<(const Packet& other) const { + if (priority != other.priority) + return priority > other.priority; + if (retransmission != other.retransmission) + return other.retransmission; + + return enqueue_order > other.enqueue_order; + } + + RtpPacketSender::Priority priority; + uint32_t ssrc; + uint16_t sequence_number; + int64_t capture_time_ms; // Absolute time of frame capture. + int64_t enqueue_time_ms; // Absolute time of pacer queue entry. + int64_t sum_paused_ms; + size_t bytes; + bool retransmission; + uint64_t enqueue_order; + std::list<Packet>::iterator this_it; + std::multiset<int64_t>::iterator enqueue_time_it; + }; + + virtual void Push(const Packet& packet); + virtual const Packet& BeginPop(); + virtual void CancelPop(const Packet& packet); + virtual void FinalizePop(const Packet& packet); + virtual bool Empty() const; + virtual size_t SizeInPackets() const; + virtual uint64_t SizeInBytes() const; + virtual int64_t OldestEnqueueTimeMs() const; + virtual void UpdateQueueTime(int64_t timestamp_ms); + virtual void SetPauseState(bool paused, int64_t timestamp_ms); + virtual int64_t AverageQueueTimeMs() const; + + private: + // Try to add a packet to the set of ssrc/seqno identifiers currently in the + // queue. Return true if inserted, false if this is a duplicate. + bool AddToDupeSet(const Packet& packet); + + void RemoveFromDupeSet(const Packet& packet); + + // Used by priority queue to sort packets. + struct Comparator { + bool operator()(const Packet* first, const Packet* second) { + // Highest prio = 0. + if (first->priority != second->priority) + return first->priority > second->priority; + + // Retransmissions go first. + if (second->retransmission != first->retransmission) + return second->retransmission; + + // Older frames have higher prio. + if (first->capture_time_ms != second->capture_time_ms) + return first->capture_time_ms > second->capture_time_ms; + + return first->enqueue_order > second->enqueue_order; + } + }; + + // List of packets, in the order the were enqueued. Since dequeueing may + // occur out of order, use list instead of vector. + std::list<Packet> packet_list_; + // Priority queue of the packets, sorted according to Comparator. + // Use pointers into list, to avodi moving whole struct within heap. + std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_; + // Total number of bytes in the queue. + uint64_t bytes_; + const Clock* const clock_; + int64_t queue_time_sum_; + int64_t time_last_updated_; + bool paused_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACKET_QUEUE_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.cc b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.cc new file mode 100644 index 0000000000..6aee807af3 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.cc @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_queue2.h" + +#include <algorithm> + +#include "rtc_base/checks.h" +#include "system_wrappers/include/clock.h" + +namespace webrtc { + +PacketQueue2::Stream::Stream() : bytes(0) {} +PacketQueue2::Stream::~Stream() {} + +PacketQueue2::PacketQueue2(const Clock* clock) + : PacketQueue(clock), + clock_(clock), + time_last_updated_(clock_->TimeInMilliseconds()) {} + +PacketQueue2::~PacketQueue2() {} + +void PacketQueue2::Push(const Packet& packet_to_insert) { + Packet packet(packet_to_insert); + + auto stream_info_it = streams_.find(packet.ssrc); + if (stream_info_it == streams_.end()) { + stream_info_it = streams_.emplace(packet.ssrc, Stream()).first; + stream_info_it->second.priority_it = stream_priorities_.end(); + stream_info_it->second.ssrc = packet.ssrc; + } + + Stream* streams_ = &stream_info_it->second; + + if (streams_->priority_it == stream_priorities_.end()) { + // If the SSRC is not currently scheduled, add it to |stream_priorities_|. + RTC_CHECK(!IsSsrcScheduled(streams_->ssrc)); + streams_->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); + } else if (packet.priority < streams_->priority_it->first.priority) { + // If the priority of this SSRC increased, remove the outdated StreamPrioKey + // and insert a new one with the new priority. Note that + // RtpPacketSender::Priority uses lower ordinal for higher priority. + stream_priorities_.erase(streams_->priority_it); + streams_->priority_it = stream_priorities_.emplace( + StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); + } + RTC_CHECK(streams_->priority_it != stream_priorities_.end()); + + packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms); + + // In order to figure out how much time a packet has spent in the queue while + // not in a paused state, we subtract the total amount of time the queue has + // been paused so far, and when the packet is poped we subtract the total + // amount of time the queue has been paused at that moment. This way we + // subtract the total amount of time the packet has spent in the queue while + // in a paused state. + UpdateQueueTime(packet.enqueue_time_ms); + packet.enqueue_time_ms -= pause_time_sum_ms_; + streams_->packet_queue.push(packet); + + size_packets_ += 1; + size_bytes_ += packet.bytes; +} + +const PacketQueue2::Packet& PacketQueue2::BeginPop() { + RTC_CHECK(!pop_packet_ && !pop_stream_); + + Stream* stream = GetHighestPriorityStream(); + pop_stream_.emplace(stream); + pop_packet_.emplace(stream->packet_queue.top()); + stream->packet_queue.pop(); + + return *pop_packet_; +} + +void PacketQueue2::CancelPop(const Packet& packet) { + RTC_CHECK(pop_packet_ && pop_stream_); + (*pop_stream_)->packet_queue.push(*pop_packet_); + pop_packet_.reset(); + pop_stream_.reset(); +} + +void PacketQueue2::FinalizePop(const Packet& packet) { + RTC_CHECK(!paused_); + if (!Empty()) { + RTC_CHECK(pop_packet_ && pop_stream_); + Stream* stream = *pop_stream_; + stream_priorities_.erase(stream->priority_it); + const Packet& packet = *pop_packet_; + + // Calculate the total amount of time spent by this packet in the queue + // while in a non-paused state. Note that the |pause_time_sum_ms_| was + // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and + // by subtracting it now we effectively remove the time spent in in the + // queue while in a paused state. + int64_t time_in_non_paused_state_ms = + time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_; + queue_time_sum_ms_ -= time_in_non_paused_state_ms; + + RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end()); + enqueue_times_.erase(packet.enqueue_time_it); + + // Update |bytes| of this stream. The general idea is that the stream that + // has sent the least amount of bytes should have the highest priority. + // The problem with that is if streams send with different rates, in which + // case a "budget" will be built up for the stream sending at the lower + // rate. To avoid building a too large budget we limit |bytes| to be within + // kMaxLeading bytes of the stream that has sent the most amount of bytes. + stream->bytes = + std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes); + max_bytes_ = std::max(max_bytes_, stream->bytes); + + size_bytes_ -= packet.bytes; + size_packets_ -= 1; + RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); + + // If there are packets left to be sent, schedule the stream again. + RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); + if (stream->packet_queue.empty()) { + stream->priority_it = stream_priorities_.end(); + } else { + RtpPacketSender::Priority priority = stream->packet_queue.top().priority; + stream->priority_it = stream_priorities_.emplace( + StreamPrioKey(priority, stream->bytes), stream->ssrc); + } + + pop_packet_.reset(); + pop_stream_.reset(); + } +} + +bool PacketQueue2::Empty() const { + RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || + (stream_priorities_.empty() && size_packets_ == 0)); + return stream_priorities_.empty(); +} + +size_t PacketQueue2::SizeInPackets() const { + return size_packets_; +} + +uint64_t PacketQueue2::SizeInBytes() const { + return size_bytes_; +} + +int64_t PacketQueue2::OldestEnqueueTimeMs() const { + if (Empty()) + return 0; + RTC_CHECK(!enqueue_times_.empty()); + return *enqueue_times_.begin(); +} + +void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) { + RTC_CHECK_GE(timestamp_ms, time_last_updated_); + if (timestamp_ms == time_last_updated_) + return; + + int64_t delta_ms = timestamp_ms - time_last_updated_; + + if (paused_) { + pause_time_sum_ms_ += delta_ms; + } else { + queue_time_sum_ms_ += delta_ms * size_packets_; + } + + time_last_updated_ = timestamp_ms; +} + +void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) { + if (paused_ == paused) + return; + UpdateQueueTime(timestamp_ms); + paused_ = paused; +} + +int64_t PacketQueue2::AverageQueueTimeMs() const { + if (Empty()) + return 0; + return queue_time_sum_ms_ / size_packets_; +} + +PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() { + RTC_CHECK(!stream_priorities_.empty()); + uint32_t ssrc = stream_priorities_.begin()->second; + + auto stream_info_it = streams_.find(ssrc); + RTC_CHECK(stream_info_it != streams_.end()); + RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin()); + RTC_CHECK(!stream_info_it->second.packet_queue.empty()); + return &stream_info_it->second; +} + +bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const { + for (const auto& scheduled_stream : stream_priorities_) { + if (scheduled_stream.second == ssrc) + return true; + } + return false; +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.h b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.h new file mode 100644 index 0000000000..06e0f08b38 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_queue2.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_QUEUE2_H_ +#define MODULES_PACING_PACKET_QUEUE2_H_ + +#include <map> +#include <queue> +#include <set> + +#include "modules/pacing/packet_queue.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" + +namespace webrtc { + +class PacketQueue2 : public PacketQueue { + public: + explicit PacketQueue2(const Clock* clock); + ~PacketQueue2() override; + + using Packet = PacketQueue::Packet; + + void Push(const Packet& packet) override; + const Packet& BeginPop() override; + void CancelPop(const Packet& packet) override; + void FinalizePop(const Packet& packet) override; + + bool Empty() const override; + size_t SizeInPackets() const override; + uint64_t SizeInBytes() const override; + + int64_t OldestEnqueueTimeMs() const override; + int64_t AverageQueueTimeMs() const override; + void UpdateQueueTime(int64_t timestamp_ms) override; + void SetPauseState(bool paused, int64_t timestamp_ms) override; + + struct StreamPrioKey { + StreamPrioKey() = default; + StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes) + : priority(priority), bytes(bytes) {} + + bool operator<(const StreamPrioKey& other) const { + if (priority != other.priority) + return priority < other.priority; + return bytes > other.bytes; + } + + const RtpPacketSender::Priority priority; + const size_t bytes; + }; + + struct Stream { + Stream(); + + virtual ~Stream(); + + size_t bytes; + uint32_t ssrc; + std::priority_queue<Packet> packet_queue; + + // Whenever a packet is inserted for this stream we check if |priority_it| + // points to an element in |stream_priorities_|, and if it does it means + // this stream has already been scheduled, and if the scheduled priority is + // lower than the priority of the incoming packet we reschedule this stream + // with the higher priority. + std::multimap<StreamPrioKey, uint32_t>::iterator priority_it; + }; + + private: + static constexpr size_t kMaxLeadingBytes = 1400; + + Stream* GetHighestPriorityStream(); + + // Just used to verify correctness. + bool IsSsrcScheduled(uint32_t ssrc) const; + + const Clock* const clock_; + int64_t time_last_updated_; + rtc::Optional<Packet> pop_packet_; + rtc::Optional<Stream*> pop_stream_; + + bool paused_ = false; + size_t size_packets_ = 0; + size_t size_bytes_ = 0; + size_t max_bytes_ = kMaxLeadingBytes; + int64_t queue_time_sum_ms_ = 0; + int64_t pause_time_sum_ms_ = 0; + + // A map of streams used to prioritize from which stream to send next. We use + // a multimap instead of a priority_queue since the priority of a stream can + // change as a new packet is inserted, and a multimap allows us to remove and + // then reinsert a StreamPrioKey if the priority has increased. + std::multimap<StreamPrioKey, uint32_t> stream_priorities_; + + // A map of SSRCs to Streams. + std::map<uint32_t, Stream> streams_; + + // The enqueue time of every packet currently in the queue. Used to figure out + // the age of the oldest packet in the queue. + std::multiset<int64_t> enqueue_times_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACKET_QUEUE2_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_router.cc b/third_party/libwebrtc/webrtc/modules/pacing/packet_router.cc new file mode 100644 index 0000000000..8b3ffc19a6 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_router.cc @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "modules/pacing/packet_router.h" + +#include <algorithm> +#include <limits> + +#include "modules/rtp_rtcp/include/rtp_rtcp.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" +#include "rtc_base/atomicops.h" +#include "rtc_base/checks.h" +#include "rtc_base/timeutils.h" + +namespace webrtc { +namespace { + +constexpr int kRembSendIntervalMs = 200; + +} // namespace + +PacketRouter::PacketRouter() + : last_remb_time_ms_(rtc::TimeMillis()), + last_send_bitrate_bps_(0), + bitrate_bps_(0), + max_bitrate_bps_(std::numeric_limits<decltype(max_bitrate_bps_)>::max()), + active_remb_module_(nullptr), + transport_seq_(0) {} + +PacketRouter::~PacketRouter() { + RTC_DCHECK(rtp_send_modules_.empty()); + RTC_DCHECK(rtp_receive_modules_.empty()); + RTC_DCHECK(sender_remb_candidates_.empty()); + RTC_DCHECK(receiver_remb_candidates_.empty()); + RTC_DCHECK(active_remb_module_ == nullptr); +} + +void PacketRouter::AddSendRtpModule(RtpRtcp* rtp_module, bool remb_candidate) { + rtc::CritScope cs(&modules_crit_); + RTC_DCHECK(std::find(rtp_send_modules_.begin(), rtp_send_modules_.end(), + rtp_module) == rtp_send_modules_.end()); + // Put modules which can use regular payload packets (over rtx) instead of + // padding first as it's less of a waste + if ((rtp_module->RtxSendStatus() & kRtxRedundantPayloads) > 0) { + rtp_send_modules_.push_front(rtp_module); + } else { + rtp_send_modules_.push_back(rtp_module); + } + + if (remb_candidate) { + AddRembModuleCandidate(rtp_module, true); + } +} + +void PacketRouter::RemoveSendRtpModule(RtpRtcp* rtp_module) { + rtc::CritScope cs(&modules_crit_); + MaybeRemoveRembModuleCandidate(rtp_module, /* sender = */ true); + auto it = + std::find(rtp_send_modules_.begin(), rtp_send_modules_.end(), rtp_module); + RTC_DCHECK(it != rtp_send_modules_.end()); + rtp_send_modules_.erase(it); +} + +void PacketRouter::AddReceiveRtpModule(RtpRtcp* rtp_module, + bool remb_candidate) { + rtc::CritScope cs(&modules_crit_); + RTC_DCHECK(std::find(rtp_receive_modules_.begin(), rtp_receive_modules_.end(), + rtp_module) == rtp_receive_modules_.end()); + + rtp_receive_modules_.push_back(rtp_module); + + if (remb_candidate) { + AddRembModuleCandidate(rtp_module, false); + } +} + +void PacketRouter::RemoveReceiveRtpModule(RtpRtcp* rtp_module) { + rtc::CritScope cs(&modules_crit_); + MaybeRemoveRembModuleCandidate(rtp_module, /* sender = */ false); + const auto& it = std::find(rtp_receive_modules_.begin(), + rtp_receive_modules_.end(), rtp_module); + RTC_DCHECK(it != rtp_receive_modules_.end()); + rtp_receive_modules_.erase(it); +} + +bool PacketRouter::TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& pacing_info) { + RTC_DCHECK_RUNS_SERIALIZED(&pacer_race_); + rtc::CritScope cs(&modules_crit_); + for (auto* rtp_module : rtp_send_modules_) { + if (!rtp_module->SendingMedia()) + continue; + if (ssrc == rtp_module->SSRC() || ssrc == rtp_module->FlexfecSsrc()) { + return rtp_module->TimeToSendPacket(ssrc, sequence_number, + capture_timestamp, retransmission, + pacing_info); + } + } + return true; +} + +size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send, + const PacedPacketInfo& pacing_info) { + RTC_DCHECK_RUNS_SERIALIZED(&pacer_race_); + size_t total_bytes_sent = 0; + rtc::CritScope cs(&modules_crit_); + // Rtp modules are ordered by which stream can most benefit from padding. + for (RtpRtcp* module : rtp_send_modules_) { + if (module->SendingMedia() && module->HasBweExtensions()) { + size_t bytes_sent = module->TimeToSendPadding( + bytes_to_send - total_bytes_sent, pacing_info); + total_bytes_sent += bytes_sent; + if (total_bytes_sent >= bytes_to_send) + break; + } + } + return total_bytes_sent; +} + +void PacketRouter::SetTransportWideSequenceNumber(uint16_t sequence_number) { + rtc::AtomicOps::ReleaseStore(&transport_seq_, sequence_number); +} + +uint16_t PacketRouter::AllocateSequenceNumber() { + int prev_seq = rtc::AtomicOps::AcquireLoad(&transport_seq_); + int desired_prev_seq; + int new_seq; + do { + desired_prev_seq = prev_seq; + new_seq = (desired_prev_seq + 1) & 0xFFFF; + // Note: CompareAndSwap returns the actual value of transport_seq at the + // time the CAS operation was executed. Thus, if prev_seq is returned, the + // operation was successful - otherwise we need to retry. Saving the + // return value saves us a load on retry. + prev_seq = rtc::AtomicOps::CompareAndSwap(&transport_seq_, desired_prev_seq, + new_seq); + } while (prev_seq != desired_prev_seq); + + return new_seq; +} + +void PacketRouter::OnReceiveBitrateChanged(const std::vector<uint32_t>& ssrcs, + uint32_t bitrate_bps) { + // % threshold for if we should send a new REMB asap. + const uint32_t kSendThresholdPercent = 97; + + int64_t now_ms = rtc::TimeMillis(); + { + rtc::CritScope lock(&remb_crit_); + + // If we already have an estimate, check if the new total estimate is below + // kSendThresholdPercent of the previous estimate. + if (last_send_bitrate_bps_ > 0) { + uint32_t new_remb_bitrate_bps = + last_send_bitrate_bps_ - bitrate_bps_ + bitrate_bps; + + if (new_remb_bitrate_bps < + kSendThresholdPercent * last_send_bitrate_bps_ / 100) { + // The new bitrate estimate is less than kSendThresholdPercent % of the + // last report. Send a REMB asap. + last_remb_time_ms_ = now_ms - kRembSendIntervalMs; + } + } + bitrate_bps_ = bitrate_bps; + + if (now_ms - last_remb_time_ms_ < kRembSendIntervalMs) { + return; + } + // NOTE: Updated if we intend to send the data; we might not have + // a module to actually send it. + last_remb_time_ms_ = now_ms; + last_send_bitrate_bps_ = bitrate_bps; + // Cap the value to send in remb with configured value. + bitrate_bps = std::min(bitrate_bps, max_bitrate_bps_); + } + SendRemb(bitrate_bps, ssrcs); +} + +void PacketRouter::SetMaxDesiredReceiveBitrate(uint32_t bitrate_bps) { + { + rtc::CritScope lock(&remb_crit_); + max_bitrate_bps_ = bitrate_bps; + if (rtc::TimeMillis() - last_remb_time_ms_ < kRembSendIntervalMs && + last_send_bitrate_bps_ > 0 && + last_send_bitrate_bps_ <= max_bitrate_bps_) { + // Recent measured bitrate is already below the cap. + return; + } + } + SendRemb(bitrate_bps, /*ssrcs=*/{}); +} + +bool PacketRouter::SendRemb(uint32_t bitrate_bps, + const std::vector<uint32_t>& ssrcs) { + rtc::CritScope lock(&modules_crit_); + + if (!active_remb_module_) { + return false; + } + + // The Add* and Remove* methods above ensure that REMB is disabled on all + // other modules, because otherwise, they will send REMB with stale info. + active_remb_module_->SetRemb(bitrate_bps, ssrcs); + + return true; +} + +bool PacketRouter::SendTransportFeedback(rtcp::TransportFeedback* packet) { + RTC_DCHECK_RUNS_SERIALIZED(&pacer_race_); + rtc::CritScope cs(&modules_crit_); + // Prefer send modules. + for (auto* rtp_module : rtp_send_modules_) { + packet->SetSenderSsrc(rtp_module->SSRC()); + if (rtp_module->SendFeedbackPacket(*packet)) + return true; + } + for (auto* rtp_module : rtp_receive_modules_) { + packet->SetSenderSsrc(rtp_module->SSRC()); + if (rtp_module->SendFeedbackPacket(*packet)) + return true; + } + return false; +} + +void PacketRouter::AddRembModuleCandidate(RtpRtcp* candidate_module, + bool sender) { + RTC_DCHECK(candidate_module); + std::vector<RtpRtcp*>& candidates = + sender ? sender_remb_candidates_ : receiver_remb_candidates_; + RTC_DCHECK(std::find(candidates.cbegin(), candidates.cend(), + candidate_module) == candidates.cend()); + candidates.push_back(candidate_module); + DetermineActiveRembModule(); +} + +void PacketRouter::MaybeRemoveRembModuleCandidate(RtpRtcp* candidate_module, + bool sender) { + RTC_DCHECK(candidate_module); + std::vector<RtpRtcp*>& candidates = + sender ? sender_remb_candidates_ : receiver_remb_candidates_; + auto it = std::find(candidates.begin(), candidates.end(), candidate_module); + + if (it == candidates.end()) { + return; // Function called due to removal of non-REMB-candidate module. + } + + if (*it == active_remb_module_) { + UnsetActiveRembModule(); + } + candidates.erase(it); + DetermineActiveRembModule(); +} + +void PacketRouter::UnsetActiveRembModule() { + RTC_CHECK(active_remb_module_); + active_remb_module_->UnsetRemb(); + active_remb_module_ = nullptr; +} + +void PacketRouter::DetermineActiveRembModule() { + // Sender modules take precedence over receiver modules, because SRs (sender + // reports) are sent more frequently than RR (receiver reports). + // When adding the first sender module, we should change the active REMB + // module to be that. Otherwise, we remain with the current active module. + + RtpRtcp* new_active_remb_module; + + if (!sender_remb_candidates_.empty()) { + new_active_remb_module = sender_remb_candidates_.front(); + } else if (!receiver_remb_candidates_.empty()) { + new_active_remb_module = receiver_remb_candidates_.front(); + } else { + new_active_remb_module = nullptr; + } + + if (new_active_remb_module != active_remb_module_ && active_remb_module_) { + UnsetActiveRembModule(); + } + + active_remb_module_ = new_active_remb_module; +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_router.h b/third_party/libwebrtc/webrtc/modules/pacing/packet_router.h new file mode 100644 index 0000000000..da8c177276 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_router.h @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACKET_ROUTER_H_ +#define MODULES_PACING_PACKET_ROUTER_H_ + +#include <list> +#include <vector> + +#include "common_types.h" // NOLINT(build/include) +#include "modules/pacing/paced_sender.h" +#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" +#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" +#include "rtc_base/constructormagic.h" +#include "rtc_base/criticalsection.h" +#include "rtc_base/race_checker.h" +#include "rtc_base/thread_annotations.h" + +namespace webrtc { + +class RtpRtcp; +namespace rtcp { +class TransportFeedback; +} // namespace rtcp + +// PacketRouter keeps track of rtp send modules to support the pacer. +// In addition, it handles feedback messages, which are sent on a send +// module if possible (sender report), otherwise on receive module +// (receiver report). For the latter case, we also keep track of the +// receive modules. +class PacketRouter : public PacedSender::PacketSender, + public TransportSequenceNumberAllocator, + public RemoteBitrateObserver, + public TransportFeedbackSenderInterface { + public: + PacketRouter(); + ~PacketRouter() override; + + // TODO(nisse): Delete, as soon as downstream app is updated. + RTC_DEPRECATED void AddRtpModule(RtpRtcp* rtp_module) { + AddReceiveRtpModule(rtp_module); + } + RTC_DEPRECATED void RemoveRtpModule(RtpRtcp* rtp_module) { + RemoveReceiveRtpModule(rtp_module); + } + + void AddSendRtpModule(RtpRtcp* rtp_module, bool remb_candidate); + void RemoveSendRtpModule(RtpRtcp* rtp_module); + RTC_DEPRECATED void AddSendRtpModule(RtpRtcp* rtp_module) { + AddSendRtpModule(rtp_module, true); + } + + void AddReceiveRtpModule(RtpRtcp* rtp_module, bool remb_candidate); + void RemoveReceiveRtpModule(RtpRtcp* rtp_module); + RTC_DEPRECATED void AddReceiveRtpModule(RtpRtcp* rtp_module) { + AddReceiveRtpModule(rtp_module, true); + } + + // Implements PacedSender::Callback. + bool TimeToSendPacket(uint32_t ssrc, + uint16_t sequence_number, + int64_t capture_timestamp, + bool retransmission, + const PacedPacketInfo& packet_info) override; + + size_t TimeToSendPadding(size_t bytes, + const PacedPacketInfo& packet_info) override; + + void SetTransportWideSequenceNumber(uint16_t sequence_number); + uint16_t AllocateSequenceNumber() override; + + // Called every time there is a new bitrate estimate for a receive channel + // group. This call will trigger a new RTCP REMB packet if the bitrate + // estimate has decreased or if no RTCP REMB packet has been sent for + // a certain time interval. + // Implements RtpReceiveBitrateUpdate. + void OnReceiveBitrateChanged(const std::vector<uint32_t>& ssrcs, + uint32_t bitrate_bps) override; + + // Ensures remote party notified of the receive bitrate limit no larger than + // |bitrate_bps|. + void SetMaxDesiredReceiveBitrate(uint32_t bitrate_bps); + + // Send REMB feedback. + virtual bool SendRemb(uint32_t bitrate_bps, + const std::vector<uint32_t>& ssrcs); + + // Send transport feedback packet to send-side. + bool SendTransportFeedback(rtcp::TransportFeedback* packet) override; + + private: + void AddRembModuleCandidate(RtpRtcp* candidate_module, bool sender) + RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); + void MaybeRemoveRembModuleCandidate(RtpRtcp* candidate_module, bool sender) + RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); + void UnsetActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); + void DetermineActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); + + rtc::RaceChecker pacer_race_; + rtc::CriticalSection modules_crit_; + std::list<RtpRtcp*> rtp_send_modules_ RTC_GUARDED_BY(modules_crit_); + std::vector<RtpRtcp*> rtp_receive_modules_ RTC_GUARDED_BY(modules_crit_); + + // TODO(eladalon): remb_crit_ only ever held from one function, and it's not + // clear if that function can actually be called from more than one thread. + rtc::CriticalSection remb_crit_; + // The last time a REMB was sent. + int64_t last_remb_time_ms_ RTC_GUARDED_BY(remb_crit_); + uint32_t last_send_bitrate_bps_ RTC_GUARDED_BY(remb_crit_); + // The last bitrate update. + uint32_t bitrate_bps_ RTC_GUARDED_BY(remb_crit_); + uint32_t max_bitrate_bps_ RTC_GUARDED_BY(remb_crit_); + + // Candidates for the REMB module can be RTP sender/receiver modules, with + // the sender modules taking precedence. + std::vector<RtpRtcp*> sender_remb_candidates_ RTC_GUARDED_BY(modules_crit_); + std::vector<RtpRtcp*> receiver_remb_candidates_ RTC_GUARDED_BY(modules_crit_); + RtpRtcp* active_remb_module_ RTC_GUARDED_BY(modules_crit_); + + volatile int transport_seq_; + + RTC_DISALLOW_COPY_AND_ASSIGN(PacketRouter); +}; +} // namespace webrtc +#endif // MODULES_PACING_PACKET_ROUTER_H_ diff --git a/third_party/libwebrtc/webrtc/modules/pacing/packet_router_unittest.cc b/third_party/libwebrtc/webrtc/modules/pacing/packet_router_unittest.cc new file mode 100644 index 0000000000..54c8f12c05 --- /dev/null +++ b/third_party/libwebrtc/webrtc/modules/pacing/packet_router_unittest.cc @@ -0,0 +1,829 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include <list> +#include <memory> + +#include "modules/pacing/packet_router.h" +#include "modules/rtp_rtcp/include/rtp_rtcp.h" +#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" +#include "rtc_base/checks.h" +#include "rtc_base/fakeclock.h" +#include "test/gmock.h" +#include "test/gtest.h" + +namespace webrtc { + +// TODO(eladalon): Restructure and/or replace the existing monolithic tests +// (only some of the test are monolithic) according to the new +// guidelines - small tests for one thing at a time. +// (I'm not removing any tests during CL, so as to demonstrate no regressions.) + +namespace { + +using ::testing::_; +using ::testing::AnyNumber; +using ::testing::AtLeast; +using ::testing::Field; +using ::testing::Gt; +using ::testing::Le; +using ::testing::NiceMock; +using ::testing::Return; +using ::testing::ReturnPointee; +using ::testing::SaveArg; + +constexpr int kProbeMinProbes = 5; +constexpr int kProbeMinBytes = 1000; + +} // namespace + +TEST(PacketRouterTest, Sanity_NoModuleRegistered_TimeToSendPacket) { + PacketRouter packet_router; + + constexpr uint16_t ssrc = 1234; + constexpr uint16_t sequence_number = 17; + constexpr uint64_t timestamp = 7890; + constexpr bool retransmission = false; + const PacedPacketInfo paced_info(1, kProbeMinProbes, kProbeMinBytes); + + // TODO(eladalon): TimeToSendPacket() returning true when nothing was + // sent, because no modules were registered, is sub-optimal. + // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 + EXPECT_TRUE(packet_router.TimeToSendPacket(ssrc, sequence_number, timestamp, + retransmission, paced_info)); +} + +TEST(PacketRouterTest, Sanity_NoModuleRegistered_TimeToSendPadding) { + PacketRouter packet_router; + + constexpr size_t bytes = 300; + const PacedPacketInfo paced_info(1, kProbeMinProbes, kProbeMinBytes); + + EXPECT_EQ(packet_router.TimeToSendPadding(bytes, paced_info), 0u); +} + +TEST(PacketRouterTest, Sanity_NoModuleRegistered_OnReceiveBitrateChanged) { + PacketRouter packet_router; + + const std::vector<uint32_t> ssrcs = {1, 2, 3}; + constexpr uint32_t bitrate_bps = 10000; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_bps); +} + +TEST(PacketRouterTest, Sanity_NoModuleRegistered_SendRemb) { + PacketRouter packet_router; + + const std::vector<uint32_t> ssrcs = {1, 2, 3}; + constexpr uint32_t bitrate_bps = 10000; + + EXPECT_FALSE(packet_router.SendRemb(bitrate_bps, ssrcs)); +} + +TEST(PacketRouterTest, Sanity_NoModuleRegistered_SendTransportFeedback) { + PacketRouter packet_router; + + rtcp::TransportFeedback feedback; + + EXPECT_FALSE(packet_router.SendTransportFeedback(&feedback)); +} + +TEST(PacketRouterTest, TimeToSendPacket) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> rtp_1; + NiceMock<MockRtpRtcp> rtp_2; + + packet_router.AddSendRtpModule(&rtp_1, false); + packet_router.AddSendRtpModule(&rtp_2, false); + + const uint16_t kSsrc1 = 1234; + uint16_t sequence_number = 17; + uint64_t timestamp = 7890; + bool retransmission = false; + + // Send on the first module by letting rtp_1 be sending with correct ssrc. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1)); + EXPECT_CALL(rtp_1, TimeToSendPacket( + kSsrc1, sequence_number, timestamp, retransmission, + Field(&PacedPacketInfo::probe_cluster_id, 1))) + .Times(1) + .WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc1, sequence_number, timestamp, retransmission, + PacedPacketInfo(1, kProbeMinProbes, kProbeMinBytes))); + + // Send on the second module by letting rtp_2 be sending, but not rtp_1. + ++sequence_number; + timestamp += 30; + retransmission = true; + const uint16_t kSsrc2 = 4567; + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, TimeToSendPacket( + kSsrc2, sequence_number, timestamp, retransmission, + Field(&PacedPacketInfo::probe_cluster_id, 2))) + .Times(1) + .WillOnce(Return(true)); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc2, sequence_number, timestamp, retransmission, + PacedPacketInfo(2, kProbeMinProbes, kProbeMinBytes))); + + // No module is sending, hence no packet should be sent. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc1, sequence_number, timestamp, retransmission, + PacedPacketInfo(1, kProbeMinProbes, kProbeMinBytes))); + + // Add a packet with incorrect ssrc and test it's dropped in the router. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, SSRC()).Times(1).WillOnce(Return(kSsrc1)); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_1, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc1 + kSsrc2, sequence_number, timestamp, retransmission, + PacedPacketInfo(1, kProbeMinProbes, kProbeMinBytes))); + + packet_router.RemoveSendRtpModule(&rtp_1); + + // rtp_1 has been removed, try sending a packet on that ssrc and make sure + // it is dropped as expected by not expecting any calls to rtp_1. + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, SSRC()).Times(1).WillOnce(Return(kSsrc2)); + EXPECT_CALL(rtp_2, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc1, sequence_number, timestamp, retransmission, + PacedPacketInfo(PacedPacketInfo::kNotAProbe, kProbeMinBytes, + kProbeMinBytes))); + + packet_router.RemoveSendRtpModule(&rtp_2); +} + +TEST(PacketRouterTest, TimeToSendPadding) { + PacketRouter packet_router; + + const uint16_t kSsrc1 = 1234; + const uint16_t kSsrc2 = 4567; + + NiceMock<MockRtpRtcp> rtp_1; + EXPECT_CALL(rtp_1, RtxSendStatus()).WillOnce(Return(kRtxOff)); + EXPECT_CALL(rtp_1, SSRC()).WillRepeatedly(Return(kSsrc1)); + NiceMock<MockRtpRtcp> rtp_2; + // rtp_2 will be prioritized for padding. + EXPECT_CALL(rtp_2, RtxSendStatus()).WillOnce(Return(kRtxRedundantPayloads)); + EXPECT_CALL(rtp_2, SSRC()).WillRepeatedly(Return(kSsrc2)); + packet_router.AddSendRtpModule(&rtp_1, false); + packet_router.AddSendRtpModule(&rtp_2, false); + + // Default configuration, sending padding on all modules sending media, + // ordered by priority (based on rtx mode). + const size_t requested_padding_bytes = 1000; + const size_t sent_padding_bytes = 890; + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, HasBweExtensions()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, + TimeToSendPadding(requested_padding_bytes, + Field(&PacedPacketInfo::probe_cluster_id, 111))) + .Times(1) + .WillOnce(Return(sent_padding_bytes)); + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, HasBweExtensions()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, + TimeToSendPadding(requested_padding_bytes - sent_padding_bytes, + Field(&PacedPacketInfo::probe_cluster_id, 111))) + .Times(1) + .WillOnce(Return(requested_padding_bytes - sent_padding_bytes)); + EXPECT_EQ(requested_padding_bytes, + packet_router.TimeToSendPadding( + requested_padding_bytes, + PacedPacketInfo(111, kProbeMinBytes, kProbeMinBytes))); + + // Let only the lower priority module be sending and verify the padding + // request is routed there. + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, TimeToSendPadding(requested_padding_bytes, _)).Times(0); + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, HasBweExtensions()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, TimeToSendPadding(_, _)) + .Times(1) + .WillOnce(Return(sent_padding_bytes)); + EXPECT_EQ(sent_padding_bytes, + packet_router.TimeToSendPadding( + requested_padding_bytes, + PacedPacketInfo(PacedPacketInfo::kNotAProbe, kProbeMinBytes, + kProbeMinBytes))); + + // No sending module at all. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes, _)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_2, TimeToSendPadding(_, _)).Times(0); + EXPECT_EQ(0u, + packet_router.TimeToSendPadding( + requested_padding_bytes, + PacedPacketInfo(PacedPacketInfo::kNotAProbe, kProbeMinBytes, + kProbeMinBytes))); + + // Only one module has BWE extensions. + EXPECT_CALL(rtp_1, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_1, HasBweExtensions()).Times(1).WillOnce(Return(false)); + EXPECT_CALL(rtp_1, TimeToSendPadding(requested_padding_bytes, _)).Times(0); + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, HasBweExtensions()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPadding(requested_padding_bytes, _)) + .Times(1) + .WillOnce(Return(sent_padding_bytes)); + EXPECT_EQ(sent_padding_bytes, + packet_router.TimeToSendPadding( + requested_padding_bytes, + PacedPacketInfo(PacedPacketInfo::kNotAProbe, kProbeMinBytes, + kProbeMinBytes))); + + packet_router.RemoveSendRtpModule(&rtp_1); + + // rtp_1 has been removed, try sending padding and make sure rtp_1 isn't asked + // to send by not expecting any calls. Instead verify rtp_2 is called. + EXPECT_CALL(rtp_2, SendingMedia()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, HasBweExtensions()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(rtp_2, TimeToSendPadding(requested_padding_bytes, _)).Times(1); + EXPECT_EQ(0u, + packet_router.TimeToSendPadding( + requested_padding_bytes, + PacedPacketInfo(PacedPacketInfo::kNotAProbe, kProbeMinBytes, + kProbeMinBytes))); + + packet_router.RemoveSendRtpModule(&rtp_2); +} + +TEST(PacketRouterTest, SenderOnlyFunctionsRespectSendingMedia) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> rtp; + packet_router.AddSendRtpModule(&rtp, false); + static const uint16_t kSsrc = 1234; + EXPECT_CALL(rtp, SSRC()).WillRepeatedly(Return(kSsrc)); + EXPECT_CALL(rtp, SendingMedia()).WillRepeatedly(Return(false)); + + // Verify that TimeToSendPacket does not end up in a receiver. + EXPECT_CALL(rtp, TimeToSendPacket(_, _, _, _, _)).Times(0); + EXPECT_TRUE(packet_router.TimeToSendPacket( + kSsrc, 1, 1, false, PacedPacketInfo(PacedPacketInfo::kNotAProbe, + kProbeMinBytes, kProbeMinBytes))); + // Verify that TimeToSendPadding does not end up in a receiver. + EXPECT_CALL(rtp, TimeToSendPadding(_, _)).Times(0); + EXPECT_EQ(0u, + packet_router.TimeToSendPadding( + 200, PacedPacketInfo(PacedPacketInfo::kNotAProbe, + kProbeMinBytes, kProbeMinBytes))); + + packet_router.RemoveSendRtpModule(&rtp); +} + +TEST(PacketRouterTest, AllocateSequenceNumbers) { + PacketRouter packet_router; + + const uint16_t kStartSeq = 0xFFF0; + const size_t kNumPackets = 32; + + packet_router.SetTransportWideSequenceNumber(kStartSeq - 1); + + for (size_t i = 0; i < kNumPackets; ++i) { + uint16_t seq = packet_router.AllocateSequenceNumber(); + uint32_t expected_unwrapped_seq = static_cast<uint32_t>(kStartSeq) + i; + EXPECT_EQ(static_cast<uint16_t>(expected_unwrapped_seq & 0xFFFF), seq); + } +} + +TEST(PacketRouterTest, SendTransportFeedback) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> rtp_1; + NiceMock<MockRtpRtcp> rtp_2; + + packet_router.AddSendRtpModule(&rtp_1, false); + packet_router.AddReceiveRtpModule(&rtp_2, false); + + rtcp::TransportFeedback feedback; + EXPECT_CALL(rtp_1, SendFeedbackPacket(_)).Times(1).WillOnce(Return(true)); + packet_router.SendTransportFeedback(&feedback); + packet_router.RemoveSendRtpModule(&rtp_1); + EXPECT_CALL(rtp_2, SendFeedbackPacket(_)).Times(1).WillOnce(Return(true)); + packet_router.SendTransportFeedback(&feedback); + packet_router.RemoveReceiveRtpModule(&rtp_2); +} + +#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) +TEST(PacketRouterTest, DoubleRegistrationOfSendModuleDisallowed) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = false; // Value irrelevant. + packet_router.AddSendRtpModule(&module, remb_candidate); + EXPECT_DEATH(packet_router.AddSendRtpModule(&module, remb_candidate), ""); + + // Test tear-down + packet_router.RemoveSendRtpModule(&module); +} + +TEST(PacketRouterTest, DoubleRegistrationOfReceiveModuleDisallowed) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = false; // Value irrelevant. + packet_router.AddReceiveRtpModule(&module, remb_candidate); + EXPECT_DEATH(packet_router.AddReceiveRtpModule(&module, remb_candidate), ""); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&module); +} + +TEST(PacketRouterTest, RemovalOfNeverAddedSendModuleDisallowed) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + EXPECT_DEATH(packet_router.RemoveSendRtpModule(&module), ""); +} + +TEST(PacketRouterTest, RemovalOfNeverAddedReceiveModuleDisallowed) { + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + EXPECT_DEATH(packet_router.RemoveReceiveRtpModule(&module), ""); +} +#endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) + +TEST(PacketRouterRembTest, LowerEstimateToSendRemb) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp; + PacketRouter packet_router; + + packet_router.AddSendRtpModule(&rtp, true); + + uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + EXPECT_CALL(rtp, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Lower the estimate with more than 3% to trigger a call to SetRemb right + // away. + bitrate_estimate = bitrate_estimate - 100; + EXPECT_CALL(rtp, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + packet_router.RemoveSendRtpModule(&rtp); +} + +TEST(PacketRouterRembTest, VerifyIncreasingAndDecreasing) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp; + PacketRouter packet_router; + packet_router.AddSendRtpModule(&rtp, true); + + uint32_t bitrate_estimate[] = {456, 789}; + std::vector<uint32_t> ssrcs = {1234, 5678}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate[0]); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + EXPECT_CALL(rtp, SetRemb(bitrate_estimate[0], ssrcs)).Times(1); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate[0]); + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate[1] + 100); + + // Lower the estimate to trigger a callback. + EXPECT_CALL(rtp, SetRemb(bitrate_estimate[1], ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate[1]); + + packet_router.RemoveSendRtpModule(&rtp); +} + +TEST(PacketRouterRembTest, NoRembForIncreasedBitrate) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp; + PacketRouter packet_router; + packet_router.AddSendRtpModule(&rtp, true); + + uint32_t bitrate_estimate = 456; + std::vector<uint32_t> ssrcs = {1234, 5678}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + EXPECT_CALL(rtp, SetRemb(bitrate_estimate, ssrcs)).Times(1); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Increased estimate shouldn't trigger a callback right away. + EXPECT_CALL(rtp, SetRemb(_, _)).Times(0); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate + 1); + + // Decreasing the estimate less than 3% shouldn't trigger a new callback. + EXPECT_CALL(rtp, SetRemb(_, _)).Times(0); + int lower_estimate = bitrate_estimate * 98 / 100; + packet_router.OnReceiveBitrateChanged(ssrcs, lower_estimate); + + packet_router.RemoveSendRtpModule(&rtp); +} + +TEST(PacketRouterRembTest, ChangeSendRtpModule) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp_send; + NiceMock<MockRtpRtcp> rtp_recv; + PacketRouter packet_router; + packet_router.AddSendRtpModule(&rtp_send, true); + packet_router.AddReceiveRtpModule(&rtp_recv, true); + + uint32_t bitrate_estimate = 456; + std::vector<uint32_t> ssrcs = {1234, 5678}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + EXPECT_CALL(rtp_send, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Decrease estimate to trigger a REMB. + bitrate_estimate = bitrate_estimate - 100; + EXPECT_CALL(rtp_send, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Remove the sending module -> should get remb on the second module. + packet_router.RemoveSendRtpModule(&rtp_send); + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + bitrate_estimate = bitrate_estimate - 100; + EXPECT_CALL(rtp_recv, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + packet_router.RemoveReceiveRtpModule(&rtp_recv); +} + +TEST(PacketRouterRembTest, OnlyOneRembForRepeatedOnReceiveBitrateChanged) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp; + PacketRouter packet_router; + packet_router.AddSendRtpModule(&rtp, true); + + uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + EXPECT_CALL(rtp, SetRemb(_, _)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Lower the estimate, should trigger a call to SetRemb right away. + bitrate_estimate = bitrate_estimate - 100; + EXPECT_CALL(rtp, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged again, this should not trigger a new callback. + EXPECT_CALL(rtp, SetRemb(_, _)).Times(0); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + packet_router.RemoveSendRtpModule(&rtp); +} + +TEST(PacketRouterRembTest, SetMaxDesiredReceiveBitrateLimitsSetRemb) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + const uint32_t cap_bitrate = 100000; + EXPECT_CALL(remb_sender, SetRemb(Le(cap_bitrate), _)).Times(AtLeast(1)); + EXPECT_CALL(remb_sender, SetRemb(Gt(cap_bitrate), _)).Times(0); + + const std::vector<uint32_t> ssrcs = {1234}; + packet_router.SetMaxDesiredReceiveBitrate(cap_bitrate); + packet_router.OnReceiveBitrateChanged(ssrcs, cap_bitrate + 5000); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, cap_bitrate - 5000); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +TEST(PacketRouterRembTest, + SetMaxDesiredReceiveBitrateTriggersRembWhenMoreRestrictive) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + const uint32_t measured_bitrate_bps = 150000; + const uint32_t cap_bitrate_bps = measured_bitrate_bps - 5000; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(remb_sender, SetRemb(measured_bitrate_bps, _)); + packet_router.OnReceiveBitrateChanged(ssrcs, measured_bitrate_bps); + + EXPECT_CALL(remb_sender, SetRemb(cap_bitrate_bps, _)); + packet_router.SetMaxDesiredReceiveBitrate(cap_bitrate_bps); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +TEST(PacketRouterRembTest, + SetMaxDesiredReceiveBitrateDoesNotTriggerRembWhenAsRestrictive) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + const uint32_t measured_bitrate_bps = 150000; + const uint32_t cap_bitrate_bps = measured_bitrate_bps; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(remb_sender, SetRemb(measured_bitrate_bps, _)); + packet_router.OnReceiveBitrateChanged(ssrcs, measured_bitrate_bps); + + EXPECT_CALL(remb_sender, SetRemb(_, _)).Times(0); + packet_router.SetMaxDesiredReceiveBitrate(cap_bitrate_bps); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +TEST(PacketRouterRembTest, + SetMaxDesiredReceiveBitrateDoesNotTriggerRembWhenLessRestrictive) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + const uint32_t measured_bitrate_bps = 150000; + const uint32_t cap_bitrate_bps = measured_bitrate_bps + 500; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(remb_sender, SetRemb(measured_bitrate_bps, _)); + packet_router.OnReceiveBitrateChanged(ssrcs, measured_bitrate_bps); + + EXPECT_CALL(remb_sender, SetRemb(_, _)).Times(0); + packet_router.SetMaxDesiredReceiveBitrate(cap_bitrate_bps); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +TEST(PacketRouterRembTest, + SetMaxDesiredReceiveBitrateTriggersRembWhenNoRecentMeasure) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + const uint32_t measured_bitrate_bps = 150000; + const uint32_t cap_bitrate_bps = measured_bitrate_bps + 5000; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(remb_sender, SetRemb(measured_bitrate_bps, _)); + packet_router.OnReceiveBitrateChanged(ssrcs, measured_bitrate_bps); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + + EXPECT_CALL(remb_sender, SetRemb(cap_bitrate_bps, _)); + packet_router.SetMaxDesiredReceiveBitrate(cap_bitrate_bps); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +TEST(PacketRouterRembTest, + SetMaxDesiredReceiveBitrateTriggersRembWhenNoMeasures) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + NiceMock<MockRtpRtcp> remb_sender; + constexpr bool remb_candidate = true; + packet_router.AddSendRtpModule(&remb_sender, remb_candidate); + + // Set cap. + EXPECT_CALL(remb_sender, SetRemb(100000, _)).Times(1); + packet_router.SetMaxDesiredReceiveBitrate(100000); + // Increase cap. + EXPECT_CALL(remb_sender, SetRemb(200000, _)).Times(1); + packet_router.SetMaxDesiredReceiveBitrate(200000); + // Decrease cap. + EXPECT_CALL(remb_sender, SetRemb(150000, _)).Times(1); + packet_router.SetMaxDesiredReceiveBitrate(150000); + + // Test tear-down. + packet_router.RemoveSendRtpModule(&remb_sender); +} + +// Only register receiving modules and make sure we fallback to trigger a REMB +// packet on this one. +TEST(PacketRouterRembTest, NoSendingRtpModule) { + rtc::ScopedFakeClock clock; + NiceMock<MockRtpRtcp> rtp; + PacketRouter packet_router; + + packet_router.AddReceiveRtpModule(&rtp, true); + + uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Call OnReceiveBitrateChanged twice to get a first estimate. + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + EXPECT_CALL(rtp, SetRemb(bitrate_estimate, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Lower the estimate to trigger a new packet REMB packet. + EXPECT_CALL(rtp, SetRemb(bitrate_estimate - 100, ssrcs)).Times(1); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate - 100); + + EXPECT_CALL(rtp, UnsetRemb()).Times(1); + packet_router.RemoveReceiveRtpModule(&rtp); +} + +TEST(PacketRouterRembTest, NonCandidateSendRtpModuleNotUsedForRemb) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = false; + + packet_router.AddSendRtpModule(&module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(module, SetRemb(_, _)).Times(0); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveSendRtpModule(&module); +} + +TEST(PacketRouterRembTest, CandidateSendRtpModuleUsedForRemb) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = true; + + packet_router.AddSendRtpModule(&module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(module, SetRemb(bitrate_estimate, ssrcs)).Times(1); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveSendRtpModule(&module); +} + +TEST(PacketRouterRembTest, NonCandidateReceiveRtpModuleNotUsedForRemb) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = false; + + packet_router.AddReceiveRtpModule(&module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(module, SetRemb(_, _)).Times(0); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&module); +} + +TEST(PacketRouterRembTest, CandidateReceiveRtpModuleUsedForRemb) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> module; + + constexpr bool remb_candidate = true; + + packet_router.AddReceiveRtpModule(&module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(module, SetRemb(bitrate_estimate, ssrcs)).Times(1); + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&module); +} + +TEST(PacketRouterRembTest, + SendCandidatePreferredOverReceiveCandidate_SendModuleAddedFirst) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> send_module; + NiceMock<MockRtpRtcp> receive_module; + + constexpr bool remb_candidate = true; + + // Send module added - activated. + packet_router.AddSendRtpModule(&send_module, remb_candidate); + + // Receive module added - the send module remains the active one. + packet_router.AddReceiveRtpModule(&receive_module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(send_module, SetRemb(bitrate_estimate, ssrcs)).Times(1); + EXPECT_CALL(receive_module, SetRemb(_, _)).Times(0); + + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&receive_module); + packet_router.RemoveSendRtpModule(&send_module); +} + +TEST(PacketRouterRembTest, + SendCandidatePreferredOverReceiveCandidate_ReceiveModuleAddedFirst) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> send_module; + NiceMock<MockRtpRtcp> receive_module; + + constexpr bool remb_candidate = true; + + // Receive module added - activated. + packet_router.AddReceiveRtpModule(&receive_module, remb_candidate); + + // Send module added - replaces receive module as active. + packet_router.AddSendRtpModule(&send_module, remb_candidate); + + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(send_module, SetRemb(bitrate_estimate, ssrcs)).Times(1); + EXPECT_CALL(receive_module, SetRemb(_, _)).Times(0); + + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&receive_module); + packet_router.RemoveSendRtpModule(&send_module); +} + +TEST(PacketRouterRembTest, ReceiveModuleTakesOverWhenLastSendModuleRemoved) { + rtc::ScopedFakeClock clock; + PacketRouter packet_router; + NiceMock<MockRtpRtcp> send_module; + NiceMock<MockRtpRtcp> receive_module; + + constexpr bool remb_candidate = true; + + // Send module active, receive module inactive. + packet_router.AddSendRtpModule(&send_module, remb_candidate); + packet_router.AddReceiveRtpModule(&receive_module, remb_candidate); + + // Send module removed - receive module becomes active. + packet_router.RemoveSendRtpModule(&send_module); + constexpr uint32_t bitrate_estimate = 456; + const std::vector<uint32_t> ssrcs = {1234}; + EXPECT_CALL(send_module, SetRemb(_, _)).Times(0); + EXPECT_CALL(receive_module, SetRemb(bitrate_estimate, ssrcs)).Times(1); + + clock.AdvanceTime(rtc::TimeDelta::FromMilliseconds(1000)); + packet_router.OnReceiveBitrateChanged(ssrcs, bitrate_estimate); + + // Test tear-down + packet_router.RemoveReceiveRtpModule(&receive_module); +} + +} // namespace webrtc |