/* * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #ifndef MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_ #define MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_ #include #include #include #include #include #include #include "absl/container/inlined_vector.h" #include "api/units/data_size.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" namespace webrtc { // Describes how long time a packet may stay in the queue before being dropped. struct PacketQueueTTL { TimeDelta audio_retransmission = TimeDelta::PlusInfinity(); TimeDelta video_retransmission = TimeDelta::PlusInfinity(); TimeDelta video = TimeDelta::PlusInfinity(); }; class PrioritizedPacketQueue { public: explicit PrioritizedPacketQueue( Timestamp creation_time, bool prioritize_audio_retransmission = false, PacketQueueTTL packet_queue_ttl = PacketQueueTTL()); PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete; PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete; // Add a packet to the queue. The enqueue time is used for queue time stats // and to report the leading packet enqueue time per packet type. void Push(Timestamp enqueue_time, std::unique_ptr packet); // Remove the next packet from the queue. Packets a prioritized first // according to packet type, in the following order: // - audio, retransmissions, video / fec, padding // For each packet type, we use one FIFO-queue per SSRC and emit from // those queues in a round-robin fashion. std::unique_ptr Pop(); // Number of packets in the queue. int SizeInPackets() const; // Sum of all payload bytes in the queue, where the payload is calculated // as `packet->payload_size() + packet->padding_size()`. DataSize SizeInPayloadBytes() const; // Convenience method for `SizeInPackets() == 0`. bool Empty() const; // Total packets in the queue per media type (RtpPacketMediaType values are // used as lookup index). const std::array& SizeInPacketsPerRtpPacketMediaType() const; // The enqueue time of the next packet this queue will return via the Pop() // method, for the given packet type. If queue has no packets, of that type, // returns Timestamp::MinusInfinity(). Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const; Timestamp LeadingPacketEnqueueTimeForRetransmission() const; // Enqueue time of the oldest packet in the queue, // Timestamp::MinusInfinity() if queue is empty. Timestamp OldestEnqueueTime() const; // Average queue time for the packets currently in the queue. // The queuing time is calculated from Push() to the last UpdateQueueTime() // call - with any time spent in a paused state subtracted. // Returns TimeDelta::Zero() for an empty queue. TimeDelta AverageQueueTime() const; // Called during packet processing or when pause stats changes. Since the // AverageQueueTime() method does not look at the wall time, this method // needs to be called before querying queue time. void UpdateAverageQueueTime(Timestamp now); // Set the pause state, while `paused` is true queuing time is not counted. void SetPauseState(bool paused, Timestamp now); // Remove any packets matching the given SSRC. void RemovePacketsForSsrc(uint32_t ssrc); // Checks if the queue for the given SSRC has original (retransmissions not // counted) video packets containing keyframe data. bool HasKeyframePackets(uint32_t ssrc) const; private: static constexpr int kNumPriorityLevels = 5; class QueuedPacket { public: DataSize PacketSize() const; std::unique_ptr packet; Timestamp enqueue_time; std::list::iterator enqueue_time_iterator; }; // Class containing packets for an RTP stream. // For each priority level, packets are simply stored in a fifo queue. class StreamQueue { public: explicit StreamQueue(Timestamp creation_time); StreamQueue(StreamQueue&&) = default; StreamQueue& operator=(StreamQueue&&) = default; StreamQueue(const StreamQueue&) = delete; StreamQueue& operator=(const StreamQueue&) = delete; // Enqueue packet at the given priority level. Returns true if the packet // count for that priority level went from zero to non-zero. bool EnqueuePacket(QueuedPacket packet, int priority_level); QueuedPacket DequeuePacket(int priority_level); bool HasPacketsAtPrio(int priority_level) const; bool IsEmpty() const; Timestamp LeadingPacketEnqueueTime(int priority_level) const; Timestamp LastEnqueueTime() const; bool has_keyframe_packets() const { return num_keyframe_packets_ > 0; } std::array, kNumPriorityLevels> DequeueAll(); private: std::deque packets_[kNumPriorityLevels]; Timestamp last_enqueue_time_; int num_keyframe_packets_; }; // Remove the packet from the internal state, e.g. queue time / size etc. void DequeuePacketInternal(QueuedPacket& packet); // Check if the queue pointed to by `top_active_prio_level_` is empty and // if so move it to the lowest non-empty index. void MaybeUpdateTopPrioLevel(); void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now); static absl::InlinedVector ToTtlPerPrio( PacketQueueTTL); const bool prioritize_audio_retransmission_; const absl::InlinedVector time_to_live_per_prio_; // Cumulative sum, over all packets, of time spent in the queue. TimeDelta queue_time_sum_; // Cumulative sum of time the queue has spent in a paused state. TimeDelta pause_time_sum_; // Total number of packets stored in this queue. int size_packets_; // Total number of packets stored in this queue per RtpPacketMediaType. std::array size_packets_per_media_type_; // Sum of payload sizes for all packts stored in this queue. DataSize size_payload_; // The last time queue/pause time sums were updated. Timestamp last_update_time_; bool paused_; // Last time `streams_` was culled for inactive streams. Timestamp last_culling_time_; // Map from SSRC to packet queues for the associated RTP stream. std::unordered_map> streams_; // For each priority level, a queue of StreamQueues which have at least one // packet pending for that prio level. std::deque streams_by_prio_[kNumPriorityLevels]; // The first index into `stream_by_prio_` that is non-empty. int top_active_prio_level_; // Ordered list of enqueue times. Additions are always increasing and added to // the end. QueuedPacket instances have a iterators into this list for fast // removal. std::list enqueue_times_; }; } // namespace webrtc #endif // MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_