summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc')
-rw-r--r--third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc309
1 files changed, 309 insertions, 0 deletions
diff --git a/third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc b/third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc
new file mode 100644
index 0000000000..afa36ea88d
--- /dev/null
+++ b/third_party/libwebrtc/modules/pacing/task_queue_paced_sender.cc
@@ -0,0 +1,309 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/task_queue_paced_sender.h"
+
+#include <algorithm>
+#include <utility>
+
+#include "absl/cleanup/cleanup.h"
+#include "api/task_queue/pending_task_safety_flag.h"
+#include "api/transport/network_types.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/experiments/field_trial_parser.h"
+#include "rtc_base/experiments/field_trial_units.h"
+#include "rtc_base/trace_event.h"
+
+namespace webrtc {
+
+namespace {
+
+constexpr const char* kBurstyPacerFieldTrial = "WebRTC-BurstyPacer";
+
+} // namespace
+
+const int TaskQueuePacedSender::kNoPacketHoldback = -1;
+
+TaskQueuePacedSender::BurstyPacerFlags::BurstyPacerFlags(
+ const FieldTrialsView& field_trials)
+ : burst("burst") {
+ ParseFieldTrial({&burst}, field_trials.Lookup(kBurstyPacerFieldTrial));
+}
+
+TaskQueuePacedSender::TaskQueuePacedSender(
+ Clock* clock,
+ PacingController::PacketSender* packet_sender,
+ const FieldTrialsView& field_trials,
+ TimeDelta max_hold_back_window,
+ int max_hold_back_window_in_packets,
+ absl::optional<TimeDelta> burst_interval)
+ : clock_(clock),
+ bursty_pacer_flags_(field_trials),
+ max_hold_back_window_(max_hold_back_window),
+ max_hold_back_window_in_packets_(max_hold_back_window_in_packets),
+ pacing_controller_(clock, packet_sender, field_trials),
+ next_process_time_(Timestamp::MinusInfinity()),
+ is_started_(false),
+ is_shutdown_(false),
+ packet_size_(/*alpha=*/0.95),
+ include_overhead_(false),
+ task_queue_(TaskQueueBase::Current()) {
+ RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
+ // There are multiple field trials that can affect burst. If multiple bursts
+ // are specified we pick the largest of the values.
+ absl::optional<TimeDelta> burst = bursty_pacer_flags_.burst.GetOptional();
+ // If not overriden by an experiment, the burst is specified by the
+ // `burst_interval` argument.
+ if (!burst.has_value()) {
+ burst = burst_interval;
+ }
+ if (burst.has_value()) {
+ pacing_controller_.SetSendBurstInterval(burst.value());
+ }
+}
+
+TaskQueuePacedSender::~TaskQueuePacedSender() {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ is_shutdown_ = true;
+}
+
+void TaskQueuePacedSender::EnsureStarted() {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ is_started_ = true;
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::CreateProbeClusters(
+ std::vector<ProbeClusterConfig> probe_cluster_configs) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.CreateProbeClusters(probe_cluster_configs);
+ MaybeScheduleProcessPackets();
+}
+
+void TaskQueuePacedSender::Pause() {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.Pause();
+}
+
+void TaskQueuePacedSender::Resume() {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.Resume();
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::SetCongested(bool congested) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.SetCongested(congested);
+ MaybeScheduleProcessPackets();
+}
+
+void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
+ DataRate padding_rate) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+ MaybeScheduleProcessPackets();
+}
+
+void TaskQueuePacedSender::EnqueuePackets(
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
+ task_queue_->PostTask(
+ SafeTask(safety_.flag(), [this, packets = std::move(packets)]() mutable {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
+ "TaskQueuePacedSender::EnqueuePackets");
+ for (auto& packet : packets) {
+ TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
+ "TaskQueuePacedSender::EnqueuePackets::Loop",
+ "sequence_number", packet->SequenceNumber(),
+ "rtp_timestamp", packet->Timestamp());
+
+ size_t packet_size = packet->payload_size() + packet->padding_size();
+ if (include_overhead_) {
+ packet_size += packet->headers_size();
+ }
+ packet_size_.Apply(1, packet_size);
+ RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
+ pacing_controller_.EnqueuePacket(std::move(packet));
+ }
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ }));
+}
+
+void TaskQueuePacedSender::RemovePacketsForSsrc(uint32_t ssrc) {
+ task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.RemovePacketsForSsrc(ssrc);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ }));
+}
+
+void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.SetAccountForAudioPackets(account_for_audio);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::SetIncludeOverhead() {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ include_overhead_ = true;
+ pacing_controller_.SetIncludeOverhead();
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.SetTransportOverhead(overhead_per_packet);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ pacing_controller_.SetQueueTimeLimit(limit);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
+ return GetStats().expected_queue_time;
+}
+
+DataSize TaskQueuePacedSender::QueueSizeData() const {
+ return GetStats().queue_size;
+}
+
+absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
+ return GetStats().first_sent_packet_time;
+}
+
+TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
+ Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
+ if (oldest_packet.IsInfinite()) {
+ return TimeDelta::Zero();
+ }
+
+ // (webrtc:9716): The clock is not always monotonic.
+ Timestamp current = clock_->CurrentTime();
+ if (current < oldest_packet) {
+ return TimeDelta::Zero();
+ }
+
+ return current - oldest_packet;
+}
+
+void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ current_stats_ = stats;
+}
+
+// RTC_RUN_ON(task_queue_)
+void TaskQueuePacedSender::MaybeScheduleProcessPackets() {
+ if (!processing_packets_)
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
+void TaskQueuePacedSender::MaybeProcessPackets(
+ Timestamp scheduled_process_time) {
+ RTC_DCHECK_RUN_ON(task_queue_);
+
+ TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
+ "TaskQueuePacedSender::MaybeProcessPackets");
+
+ if (is_shutdown_ || !is_started_) {
+ return;
+ }
+
+ // Protects against re-entry from transport feedback calling into the task
+ // queue pacer.
+ RTC_DCHECK(!processing_packets_);
+ processing_packets_ = true;
+ absl::Cleanup cleanup = [this] {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ processing_packets_ = false;
+ };
+
+ Timestamp next_send_time = pacing_controller_.NextSendTime();
+ RTC_DCHECK(next_send_time.IsFinite());
+ const Timestamp now = clock_->CurrentTime();
+ TimeDelta early_execute_margin =
+ pacing_controller_.IsProbing()
+ ? PacingController::kMaxEarlyProbeProcessing
+ : TimeDelta::Zero();
+
+ // Process packets and update stats.
+ while (next_send_time <= now + early_execute_margin) {
+ pacing_controller_.ProcessPackets();
+ next_send_time = pacing_controller_.NextSendTime();
+ RTC_DCHECK(next_send_time.IsFinite());
+
+ // Probing state could change. Get margin after process packets.
+ early_execute_margin = pacing_controller_.IsProbing()
+ ? PacingController::kMaxEarlyProbeProcessing
+ : TimeDelta::Zero();
+ }
+ UpdateStats();
+
+ // Ignore retired scheduled task, otherwise reset `next_process_time_`.
+ if (scheduled_process_time.IsFinite()) {
+ if (scheduled_process_time != next_process_time_) {
+ return;
+ }
+ next_process_time_ = Timestamp::MinusInfinity();
+ }
+
+ // Do not hold back in probing.
+ TimeDelta hold_back_window = TimeDelta::Zero();
+ if (!pacing_controller_.IsProbing()) {
+ hold_back_window = max_hold_back_window_;
+ DataRate pacing_rate = pacing_controller_.pacing_rate();
+ if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
+ !pacing_rate.IsZero() &&
+ packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
+ TimeDelta avg_packet_send_time =
+ DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
+ hold_back_window =
+ std::min(hold_back_window,
+ avg_packet_send_time * max_hold_back_window_in_packets_);
+ }
+ }
+
+ // Calculate next process time.
+ TimeDelta time_to_next_process =
+ std::max(hold_back_window, next_send_time - now - early_execute_margin);
+ next_send_time = now + time_to_next_process;
+
+ // If no in flight task or in flight task is later than `next_send_time`,
+ // schedule a new one. Previous in flight task will be retired.
+ if (next_process_time_.IsMinusInfinity() ||
+ next_process_time_ > next_send_time) {
+ // Prefer low precision if allowed and not probing.
+ task_queue_->PostDelayedHighPrecisionTask(
+ SafeTask(
+ safety_.flag(),
+ [this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
+ time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
+ next_process_time_ = next_send_time;
+ }
+}
+
+void TaskQueuePacedSender::UpdateStats() {
+ Stats new_stats;
+ new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
+ new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
+ new_stats.oldest_packet_enqueue_time =
+ pacing_controller_.OldestPacketEnqueueTime();
+ new_stats.queue_size = pacing_controller_.QueueSizeData();
+ OnStatsUpdated(new_stats);
+}
+
+TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ return current_stats_;
+}
+
+} // namespace webrtc