summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/modules/pacing/prioritized_packet_queue.h
blob: 179ef104fe192a01b05464db1350d11c077034f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
 *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#ifndef MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
#define MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_

#include <stddef.h>

#include <array>
#include <deque>
#include <list>
#include <memory>
#include <unordered_map>

#include "absl/container/inlined_vector.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"

namespace webrtc {

// Describes how long time a packet may stay in the queue before being dropped.
struct PacketQueueTTL {
  TimeDelta audio_retransmission = TimeDelta::PlusInfinity();
  TimeDelta video_retransmission = TimeDelta::PlusInfinity();
  TimeDelta video = TimeDelta::PlusInfinity();
};

class PrioritizedPacketQueue {
 public:
  explicit PrioritizedPacketQueue(
      Timestamp creation_time,
      bool prioritize_audio_retransmission = false,
      PacketQueueTTL packet_queue_ttl = PacketQueueTTL());
  PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete;
  PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete;

  // Add a packet to the queue. The enqueue time is used for queue time stats
  // and to report the leading packet enqueue time per packet type.
  void Push(Timestamp enqueue_time, std::unique_ptr<RtpPacketToSend> packet);

  // Remove the next packet from the queue. Packets a prioritized first
  // according to packet type, in the following order:
  // - audio, retransmissions, video / fec, padding
  // For each packet type, we use one FIFO-queue per SSRC and emit from
  // those queues in a round-robin fashion.
  std::unique_ptr<RtpPacketToSend> Pop();

  // Number of packets in the queue.
  int SizeInPackets() const;

  // Sum of all payload bytes in the queue, where the payload is calculated
  // as `packet->payload_size() + packet->padding_size()`.
  DataSize SizeInPayloadBytes() const;

  // Convenience method for `SizeInPackets() == 0`.
  bool Empty() const;

  // Total packets in the queue per media type (RtpPacketMediaType values are
  // used as lookup index).
  const std::array<int, kNumMediaTypes>& SizeInPacketsPerRtpPacketMediaType()
      const;

  // The enqueue time of the next packet this queue will return via the Pop()
  // method, for the given packet type. If queue has no packets, of that type,
  // returns Timestamp::MinusInfinity().
  Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const;
  Timestamp LeadingPacketEnqueueTimeForRetransmission() const;

  // Enqueue time of the oldest packet in the queue,
  // Timestamp::MinusInfinity() if queue is empty.
  Timestamp OldestEnqueueTime() const;

  // Average queue time for the packets currently in the queue.
  // The queuing time is calculated from Push() to the last UpdateQueueTime()
  // call - with any time spent in a paused state subtracted.
  // Returns TimeDelta::Zero() for an empty queue.
  TimeDelta AverageQueueTime() const;

  // Called during packet processing or when pause stats changes. Since the
  // AverageQueueTime() method does not look at the wall time, this method
  // needs to be called before querying queue time.
  void UpdateAverageQueueTime(Timestamp now);

  // Set the pause state, while `paused` is true queuing time is not counted.
  void SetPauseState(bool paused, Timestamp now);

  // Remove any packets matching the given SSRC.
  void RemovePacketsForSsrc(uint32_t ssrc);

  // Checks if the queue for the given SSRC has original (retransmissions not
  // counted) video packets containing keyframe data.
  bool HasKeyframePackets(uint32_t ssrc) const;

 private:
  static constexpr int kNumPriorityLevels = 5;

  class QueuedPacket {
   public:
    DataSize PacketSize() const;

    std::unique_ptr<RtpPacketToSend> packet;
    Timestamp enqueue_time;
    std::list<Timestamp>::iterator enqueue_time_iterator;
  };

  // Class containing packets for an RTP stream.
  // For each priority level, packets are simply stored in a fifo queue.
  class StreamQueue {
   public:
    explicit StreamQueue(Timestamp creation_time);
    StreamQueue(StreamQueue&&) = default;
    StreamQueue& operator=(StreamQueue&&) = default;

    StreamQueue(const StreamQueue&) = delete;
    StreamQueue& operator=(const StreamQueue&) = delete;

    // Enqueue packet at the given priority level. Returns true if the packet
    // count for that priority level went from zero to non-zero.
    bool EnqueuePacket(QueuedPacket packet, int priority_level);

    QueuedPacket DequeuePacket(int priority_level);

    bool HasPacketsAtPrio(int priority_level) const;
    bool IsEmpty() const;
    Timestamp LeadingPacketEnqueueTime(int priority_level) const;
    Timestamp LastEnqueueTime() const;
    bool has_keyframe_packets() const { return num_keyframe_packets_ > 0; }

    std::array<std::deque<QueuedPacket>, kNumPriorityLevels> DequeueAll();

   private:
    std::deque<QueuedPacket> packets_[kNumPriorityLevels];
    Timestamp last_enqueue_time_;
    int num_keyframe_packets_;
  };

  // Remove the packet from the internal state, e.g. queue time / size etc.
  void DequeuePacketInternal(QueuedPacket& packet);

  // Check if the queue pointed to by `top_active_prio_level_` is empty and
  // if so move it to the lowest non-empty index.
  void MaybeUpdateTopPrioLevel();

  void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now);

  static absl::InlinedVector<TimeDelta, kNumPriorityLevels> ToTtlPerPrio(
      PacketQueueTTL);

  const bool prioritize_audio_retransmission_;
  const absl::InlinedVector<TimeDelta, kNumPriorityLevels>
      time_to_live_per_prio_;

  // Cumulative sum, over all packets, of time spent in the queue.
  TimeDelta queue_time_sum_;
  // Cumulative sum of time the queue has spent in a paused state.
  TimeDelta pause_time_sum_;
  // Total number of packets stored in this queue.
  int size_packets_;
  // Total number of packets stored in this queue per RtpPacketMediaType.
  std::array<int, kNumMediaTypes> size_packets_per_media_type_;
  // Sum of payload sizes for all packts stored in this queue.
  DataSize size_payload_;
  // The last time queue/pause time sums were updated.
  Timestamp last_update_time_;
  bool paused_;

  // Last time `streams_` was culled for inactive streams.
  Timestamp last_culling_time_;

  // Map from SSRC to packet queues for the associated RTP stream.
  std::unordered_map<uint32_t, std::unique_ptr<StreamQueue>> streams_;

  // For each priority level, a queue of StreamQueues which have at least one
  // packet pending for that prio level.
  std::deque<StreamQueue*> streams_by_prio_[kNumPriorityLevels];

  // The first index into `stream_by_prio_` that is non-empty.
  int top_active_prio_level_;

  // Ordered list of enqueue times. Additions are always increasing and added to
  // the end. QueuedPacket instances have a iterators into this list for fast
  // removal.
  std::list<Timestamp> enqueue_times_;
};

}  // namespace webrtc

#endif  // MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_