summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc149
1 files changed, 127 insertions, 22 deletions
diff --git a/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc b/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc
index ea211ea683..2d0d829648 100644
--- a/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc
+++ b/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.cc
@@ -10,41 +10,70 @@
#include "modules/pacing/prioritized_packet_queue.h"
+#include <algorithm>
+#include <array>
#include <utility>
+#include "absl/container/inlined_vector.h"
+#include "absl/types/optional.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
namespace webrtc {
namespace {
constexpr int kAudioPrioLevel = 0;
-int GetPriorityForType(RtpPacketMediaType type) {
+int GetPriorityForType(
+ RtpPacketMediaType type,
+ absl::optional<RtpPacketToSend::OriginalType> original_type) {
// Lower number takes priority over higher.
switch (type) {
case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types.
return kAudioPrioLevel;
case RtpPacketMediaType::kRetransmission:
- // Send retransmissions before new media.
+ // Send retransmissions before new media. If original_type is set, audio
+ // retransmission is prioritized more than video retransmission.
+ if (original_type == RtpPacketToSend::OriginalType::kVideo) {
+ return kAudioPrioLevel + 2;
+ }
return kAudioPrioLevel + 1;
case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful.
- return kAudioPrioLevel + 2;
+ return kAudioPrioLevel + 3;
case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
- return kAudioPrioLevel + 3;
+ return kAudioPrioLevel + 4;
}
RTC_CHECK_NOTREACHED();
}
} // namespace
+absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
+PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) {
+ absl::InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
+ ttl_per_prio(kNumPriorityLevels, TimeDelta::PlusInfinity());
+ ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
+ RtpPacketToSend::OriginalType::kAudio)] =
+ packet_queue_ttl.audio_retransmission;
+ ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
+ RtpPacketToSend::OriginalType::kVideo)] =
+ packet_queue_ttl.video_retransmission;
+ ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, absl::nullopt)] =
+ packet_queue_ttl.video;
+ return ttl_per_prio;
+}
+
DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
return DataSize::Bytes(packet->payload_size() + packet->padding_size());
}
@@ -109,8 +138,13 @@ PrioritizedPacketQueue::StreamQueue::DequeueAll() {
return packets_by_prio;
}
-PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
- : queue_time_sum_(TimeDelta::Zero()),
+PrioritizedPacketQueue::PrioritizedPacketQueue(
+ Timestamp creation_time,
+ bool prioritize_audio_retransmission,
+ PacketQueueTTL packet_queue_ttl)
+ : prioritize_audio_retransmission_(prioritize_audio_retransmission),
+ time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)),
+ queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
size_packets_(0),
size_packets_per_media_type_({}),
@@ -133,7 +167,11 @@ void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
RTC_DCHECK(packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet->packet_type().value();
- int prio_level = GetPriorityForType(packet_type);
+ int prio_level =
+ GetPriorityForType(packet_type, prioritize_audio_retransmission_
+ ? packet->original_packet_type()
+ : absl::nullopt);
+ PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time);
RTC_DCHECK_GE(prio_level, 0);
RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
QueuedPacket queued_packed = {.packet = std::move(packet),
@@ -214,7 +252,8 @@ PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
RtpPacketMediaType type) const {
- const int priority_level = GetPriorityForType(type);
+ RTC_DCHECK(type != RtpPacketMediaType::kRetransmission);
+ const int priority_level = GetPriorityForType(type, absl::nullopt);
if (streams_by_prio_[priority_level].empty()) {
return Timestamp::MinusInfinity();
}
@@ -222,6 +261,39 @@ Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
priority_level);
}
+Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission()
+ const {
+ if (!prioritize_audio_retransmission_) {
+ const int priority_level =
+ GetPriorityForType(RtpPacketMediaType::kRetransmission, absl::nullopt);
+ if (streams_by_prio_[priority_level].empty()) {
+ return Timestamp::PlusInfinity();
+ }
+ return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
+ priority_level);
+ }
+ const int audio_priority_level =
+ GetPriorityForType(RtpPacketMediaType::kRetransmission,
+ RtpPacketToSend::OriginalType::kAudio);
+ const int video_priority_level =
+ GetPriorityForType(RtpPacketMediaType::kRetransmission,
+ RtpPacketToSend::OriginalType::kVideo);
+
+ Timestamp next_audio =
+ streams_by_prio_[audio_priority_level].empty()
+ ? Timestamp::PlusInfinity()
+ : streams_by_prio_[audio_priority_level]
+ .front()
+ ->LeadingPacketEnqueueTime(audio_priority_level);
+ Timestamp next_video =
+ streams_by_prio_[video_priority_level].empty()
+ ? Timestamp::PlusInfinity()
+ : streams_by_prio_[video_priority_level]
+ .front()
+ ->LeadingPacketEnqueueTime(video_priority_level);
+ return std::min(next_audio, next_video);
+}
+
Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
return enqueue_times_.empty() ? Timestamp::MinusInfinity()
: enqueue_times_.front();
@@ -283,9 +355,6 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
// Update the global top prio level if neccessary.
RTC_DCHECK(streams_by_prio_[i].front() == &queue);
streams_by_prio_[i].pop_front();
- if (i == top_active_prio_level_) {
- MaybeUpdateTopPrioLevel();
- }
} else {
// More than stream had packets at this prio level, filter this one out.
std::deque<StreamQueue*> filtered_queue;
@@ -298,6 +367,7 @@ void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
}
}
}
+ MaybeUpdateTopPrioLevel();
}
bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const {
@@ -340,18 +410,53 @@ void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
}
void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
- if (streams_by_prio_[top_active_prio_level_].empty()) {
- // No stream queues have packets at this prio level, find top priority
- // that is not empty.
- if (size_packets_ == 0) {
- top_active_prio_level_ = -1;
+ if (top_active_prio_level_ != -1 &&
+ !streams_by_prio_[top_active_prio_level_].empty()) {
+ return;
+ }
+ // No stream queues have packets at top_active_prio_level_, find top priority
+ // that is not empty.
+ for (int i = 0; i < kNumPriorityLevels; ++i) {
+ PurgeOldPacketsAtPriorityLevel(i, last_update_time_);
+ if (!streams_by_prio_[i].empty()) {
+ top_active_prio_level_ = i;
+ break;
+ }
+ }
+ if (size_packets_ == 0) {
+ // There are no packets left to send. Last packet may have been purged. Prio
+ // will change when a new packet is pushed.
+ top_active_prio_level_ = -1;
+ }
+}
+
+void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level,
+ Timestamp now) {
+ RTC_DCHECK(prio_level >= 0 && prio_level < kNumPriorityLevels);
+ TimeDelta time_to_live = time_to_live_per_prio_[prio_level];
+ if (time_to_live.IsInfinite()) {
+ return;
+ }
+
+ std::deque<StreamQueue*>& queues = streams_by_prio_[prio_level];
+ auto iter = queues.begin();
+ while (iter != queues.end()) {
+ StreamQueue* queue_ptr = *iter;
+ while (queue_ptr->HasPacketsAtPrio(prio_level) &&
+ (now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) >
+ time_to_live) {
+ QueuedPacket packet = queue_ptr->DequeuePacket(prio_level);
+ RTC_LOG(LS_INFO) << "Dropping old packet on SSRC: "
+ << packet.packet->Ssrc()
+ << " seq:" << packet.packet->SequenceNumber()
+ << " time in queue:" << (now - packet.enqueue_time).ms()
+ << " ms";
+ DequeuePacketInternal(packet);
+ }
+ if (!queue_ptr->HasPacketsAtPrio(prio_level)) {
+ iter = queues.erase(iter);
} else {
- for (int i = 0; i < kNumPriorityLevels; ++i) {
- if (!streams_by_prio_[i].empty()) {
- top_active_prio_level_ = i;
- break;
- }
- }
+ ++iter;
}
}
}