summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/rtc_base/operations_chain.h
blob: 0e8c0681bae1e3c164a8dc4dc63a7e02010ac262 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*
 *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#ifndef RTC_BASE_OPERATIONS_CHAIN_H_
#define RTC_BASE_OPERATIONS_CHAIN_H_

#include <functional>
#include <memory>
#include <queue>
#include <set>
#include <type_traits>
#include <utility>

#include "absl/types/optional.h"
#include "api/ref_counted_base.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "rtc_base/checks.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/ref_counted_object.h"
#include "rtc_base/system/no_unique_address.h"

namespace rtc {

namespace rtc_operations_chain_internal {

// Abstract base class for operations on the OperationsChain. Run() must be
// invoked exactly once during the Operation's lifespan.
class Operation {
 public:
  virtual ~Operation() {}

  virtual void Run() = 0;
};

// FunctorT is the same as in OperationsChain::ChainOperation(). `callback_` is
// passed on to the `functor_` and is used to inform the OperationsChain that
// the operation completed. The functor is responsible for invoking the
// callback when the operation has completed.
template <typename FunctorT>
class OperationWithFunctor final : public Operation {
 public:
  OperationWithFunctor(FunctorT&& functor, std::function<void()> callback)
      : functor_(std::forward<FunctorT>(functor)),
        callback_(std::move(callback)) {}

  ~OperationWithFunctor() override {
#if RTC_DCHECK_IS_ON
    RTC_DCHECK(has_run_);
#endif  // RTC_DCHECK_IS_ON
  }

  void Run() override {
#if RTC_DCHECK_IS_ON
    RTC_DCHECK(!has_run_);
    has_run_ = true;
#endif  // RTC_DCHECK_IS_ON
    // The functor being executed may invoke the callback synchronously,
    // marking the operation as complete. As such, `this` OperationWithFunctor
    // object may get deleted here, including destroying `functor_`. To
    // protect the functor from self-destruction while running, it is moved to
    // a local variable.
    auto functor = std::move(functor_);
    functor(std::move(callback_));
    // `this` may now be deleted; don't touch any member variables.
  }

 private:
  typename std::remove_reference<FunctorT>::type functor_;
  std::function<void()> callback_;
#if RTC_DCHECK_IS_ON
  bool has_run_ = false;
#endif  // RTC_DCHECK_IS_ON
};

}  // namespace rtc_operations_chain_internal

// An implementation of an operations chain. An operations chain is used to
// ensure that asynchronous tasks are executed in-order with at most one task
// running at a time. The notion of an operation chain is defined in
// https://w3c.github.io/webrtc-pc/#dfn-operations-chain, though unlike this
// implementation, the referenced definition is coupled with a peer connection.
//
// An operation is an asynchronous task. The operation starts when its functor
// is invoked, and completes when the callback that is passed to functor is
// invoked by the operation. The operation must start and complete on the same
// sequence that the operation was "chained" on. As such, the OperationsChain
// operates in a "single-threaded" fashion, but the asynchronous operations may
// use any number of threads to achieve "in parallel" behavior.
//
// When an operation is chained onto the OperationsChain, it is enqueued to be
// executed. Operations are executed in FIFO order, where the next operation
// does not start until the previous operation has completed. OperationsChain
// guarantees that:
// - If the operations chain is empty when an operation is chained, the
//   operation starts immediately, inside ChainOperation().
// - If the operations chain is not empty when an operation is chained, the
//   operation starts upon the previous operation completing, inside the
//   callback.
//
// An operation is contractually obligated to invoke the completion callback
// exactly once. Cancelling a chained operation is not supported by the
// OperationsChain; an operation that wants to be cancellable is responsible for
// aborting its own steps. The callback must still be invoked.
//
// The OperationsChain is kept-alive through reference counting if there are
// operations pending. This, together with the contract, guarantees that all
// operations that are chained get executed.
class OperationsChain final : public RefCountedNonVirtual<OperationsChain> {
 public:
  static scoped_refptr<OperationsChain> Create();
  ~OperationsChain();

  OperationsChain(const OperationsChain&) = delete;
  OperationsChain& operator=(const OperationsChain&) = delete;

  void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback);
  bool IsEmpty() const;

  // Chains an operation. Chained operations are executed in FIFO order. The
  // operation starts when `functor` is executed by the OperationsChain and is
  // contractually obligated to invoke the callback passed to it when the
  // operation is complete. Operations must start and complete on the same
  // sequence that this method was invoked on.
  //
  // If the OperationsChain is empty, the operation starts immediately.
  // Otherwise it starts upon the previous operation completing.
  //
  // Requirements of FunctorT:
  // - FunctorT is movable.
  // - FunctorT implements "T operator()(std::function<void()> callback)" or
  //   "T operator()(std::function<void()> callback) const" for some T (if T is
  //   not void, the return value is discarded in the invoking sequence). The
  //   operator starts the operation; when the operation is complete, "callback"
  //   MUST be invoked, and it MUST be so on the sequence that ChainOperation()
  //   was invoked on.
  //
  // Lambda expressions are valid functors.
  template <typename FunctorT>
  void ChainOperation(FunctorT&& functor) {
    RTC_DCHECK_RUN_ON(&sequence_checker_);
    chained_operations_.push(
        std::make_unique<
            rtc_operations_chain_internal::OperationWithFunctor<FunctorT>>(
            std::forward<FunctorT>(functor), CreateOperationsChainCallback()));
    // If this is the only operation in the chain we execute it immediately.
    // Otherwise the callback will get invoked when the pending operation
    // completes which will trigger the next operation to execute.
    if (chained_operations_.size() == 1) {
      chained_operations_.front()->Run();
    }
  }

 private:
  friend class CallbackHandle;

  // The callback that is passed to an operation's functor (that is used to
  // inform the OperationsChain that the operation has completed) is of type
  // std::function<void()>, which is a copyable type. To allow the callback to
  // be copyable, it is backed up by this reference counted handle. See
  // CreateOperationsChainCallback().
  class CallbackHandle final : public RefCountedNonVirtual<CallbackHandle> {
   public:
    explicit CallbackHandle(scoped_refptr<OperationsChain> operations_chain);
    ~CallbackHandle();

    CallbackHandle(const CallbackHandle&) = delete;
    CallbackHandle& operator=(const CallbackHandle&) = delete;

    void OnOperationComplete();

   private:
    scoped_refptr<OperationsChain> operations_chain_;
#if RTC_DCHECK_IS_ON
    bool has_run_ = false;
#endif  // RTC_DCHECK_IS_ON
  };

  OperationsChain();

  std::function<void()> CreateOperationsChainCallback();
  void OnOperationComplete();

  RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
  // FIFO-list of operations that are chained. An operation that is executing
  // remains on this list until it has completed by invoking the callback passed
  // to it.
  std::queue<std::unique_ptr<rtc_operations_chain_internal::Operation>>
      chained_operations_ RTC_GUARDED_BY(sequence_checker_);
  absl::optional<std::function<void()>> on_chain_empty_callback_
      RTC_GUARDED_BY(sequence_checker_);
};

}  // namespace rtc

#endif  // RTC_BASE_OPERATIONS_CHAIN_H_