summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/test/pc/e2e/analyzer/video/multi_reader_queue.h
blob: 39d26b42bcacfd65a10d42805117f67a3d738e42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
 *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_

#include <deque>
#include <memory>
#include <set>
#include <unordered_map>

#include "absl/types/optional.h"
#include "rtc_base/checks.h"

namespace webrtc {

// Represents the queue which can be read by multiple readers. Each reader reads
// from its own queue head. When an element is added it will become visible for
// all readers. When an element will be removed by all the readers, the element
// will be removed from the queue.
template <typename T>
class MultiReaderQueue {
 public:
  // Creates queue with exactly `readers_count` readers named from 0 to
  // `readers_count - 1`.
  explicit MultiReaderQueue(size_t readers_count) {
    for (size_t i = 0; i < readers_count; ++i) {
      heads_[i] = 0;
    }
  }
  // Creates queue with specified readers.
  explicit MultiReaderQueue(std::set<size_t> readers) {
    for (size_t reader : readers) {
      heads_[reader] = 0;
    }
  }

  // Adds a new `reader`, initializing its reading position (the reader's head)
  // equal to the one of `reader_to_copy`.
  // Complexity O(MultiReaderQueue::size(reader_to_copy)).
  void AddReader(size_t reader, size_t reader_to_copy) {
    size_t pos = GetHeadPositionOrDie(reader_to_copy);

    auto it = heads_.find(reader);
    RTC_CHECK(it == heads_.end())
        << "Reader " << reader << " is already in the queue";
    heads_[reader] = heads_[reader_to_copy];
    for (size_t i = pos; i < queue_.size(); ++i) {
      in_queues_[i]++;
    }
  }

  // Adds a new `reader`, initializing its reading position equal to first
  // element in the queue.
  // Complexity O(MultiReaderQueue::size()).
  void AddReader(size_t reader) {
    auto it = heads_.find(reader);
    RTC_CHECK(it == heads_.end())
        << "Reader " << reader << " is already in the queue";
    heads_[reader] = removed_elements_count_;
    for (size_t i = 0; i < queue_.size(); ++i) {
      in_queues_[i]++;
    }
  }

  // Removes specified `reader` from the queue.
  // Complexity O(MultiReaderQueue::size(reader)).
  void RemoveReader(size_t reader) {
    size_t pos = GetHeadPositionOrDie(reader);
    for (size_t i = pos; i < queue_.size(); ++i) {
      in_queues_[i]--;
    }
    while (!in_queues_.empty() && in_queues_[0] == 0) {
      PopFront();
    }
    heads_.erase(reader);
  }

  // Add value to the end of the queue. Complexity O(1).
  void PushBack(T value) {
    queue_.push_back(value);
    in_queues_.push_back(heads_.size());
  }

  // Extract element from specified head. Complexity O(1).
  absl::optional<T> PopFront(size_t reader) {
    size_t pos = GetHeadPositionOrDie(reader);
    if (pos >= queue_.size()) {
      return absl::nullopt;
    }

    T out = queue_[pos];

    in_queues_[pos]--;
    heads_[reader]++;

    if (in_queues_[pos] == 0) {
      RTC_DCHECK_EQ(pos, 0);
      PopFront();
    }
    return out;
  }

  // Returns element at specified head. Complexity O(1).
  absl::optional<T> Front(size_t reader) const {
    size_t pos = GetHeadPositionOrDie(reader);
    if (pos >= queue_.size()) {
      return absl::nullopt;
    }
    return queue_[pos];
  }

  // Returns true if for specified head there are no more elements in the queue
  // or false otherwise. Complexity O(1).
  bool IsEmpty(size_t reader) const {
    size_t pos = GetHeadPositionOrDie(reader);
    return pos >= queue_.size();
  }

  // Returns size of the longest queue between all readers.
  // Complexity O(1).
  size_t size() const { return queue_.size(); }

  // Returns size of the specified queue. Complexity O(1).
  size_t size(size_t reader) const {
    size_t pos = GetHeadPositionOrDie(reader);
    return queue_.size() - pos;
  }

  // Complexity O(1).
  size_t readers_count() const { return heads_.size(); }

 private:
  size_t GetHeadPositionOrDie(size_t reader) const {
    auto it = heads_.find(reader);
    RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
    return it->second - removed_elements_count_;
  }

  void PopFront() {
    RTC_DCHECK(!queue_.empty());
    RTC_DCHECK_EQ(in_queues_[0], 0);
    queue_.pop_front();
    in_queues_.pop_front();
    removed_elements_count_++;
  }

  // Number of the elements that were removed from the queue. It is used to
  // subtract from each head to compute the right index inside `queue_`;
  size_t removed_elements_count_ = 0;
  std::deque<T> queue_;
  // In how may queues the element at index `i` is. An element can be removed
  // from the front if and only if it is in 0 queues.
  std::deque<size_t> in_queues_;
  // Map from the reader to the head position in the queue.
  std::unordered_map<size_t, size_t> heads_;
};

}  // namespace webrtc

#endif  // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_