From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../libwebrtc/net/dcsctp/tx/stream_scheduler.h | 224 +++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h (limited to 'third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h') diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h new file mode 100644 index 0000000000..ce836a5826 --- /dev/null +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.h @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ +#define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/algorithm/container.h" +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/sctp_packet.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/containers/flat_set.h" +#include "rtc_base/strong_alias.h" + +namespace dcsctp { + +// A parameterized stream scheduler. Currently, it implements the round robin +// scheduling algorithm using virtual finish time. It is to be used as a part of +// a send queue and will track all active streams (streams that have any data +// that can be sent). +// +// The stream scheduler works with the concept of associating active streams +// with a "virtual finish time", which is the time when a stream is allowed to +// produce data. Streams are ordered by their virtual finish time, and the +// "current virtual time" will advance to the next following virtual finish time +// whenever a chunk is to be produced. +// +// When message interleaving is enabled, the WFQ - Weighted Fair Queueing - +// scheduling algorithm will be used. And when it's not, round-robin scheduling +// will be used instead. +// +// In the round robin scheduling algorithm, a stream's virtual finish time will +// just increment by one (1) after having produced a chunk, which results in a +// round-robin scheduling. +// +// In WFQ scheduling algorithm, a stream's virtual finish time will be defined +// as the number of bytes in the next fragment to be sent, multiplied by the +// inverse of the stream's priority, meaning that a high priority - or a smaller +// fragment - results in a closer virtual finish time, compared to a stream with +// either a lower priority or a larger fragment to be sent. +class StreamScheduler { + private: + class VirtualTime : public webrtc::StrongAlias { + public: + constexpr explicit VirtualTime(const UnderlyingType& v) + : webrtc::StrongAlias(v) {} + + static constexpr VirtualTime Zero() { return VirtualTime(0); } + }; + class InverseWeight + : public webrtc::StrongAlias { + public: + constexpr explicit InverseWeight(StreamPriority priority) + : webrtc::StrongAlias( + 1.0 / std::max(static_cast(*priority), 0.000001)) {} + }; + + public: + class StreamProducer { + public: + virtual ~StreamProducer() = default; + + // Produces a fragment of data to send. The current wall time is specified + // as `now` and should be used to skip chunks with expired limited lifetime. + // The parameter `max_size` specifies the maximum amount of actual payload + // that may be returned. If these constraints prevents the stream from + // sending some data, `absl::nullopt` should be returned. + virtual absl::optional Produce(TimeMs now, + size_t max_size) = 0; + + // Returns the number of payload bytes that is scheduled to be sent in the + // next enqueued message, or zero if there are no enqueued messages or if + // the stream has been actively paused. + virtual size_t bytes_to_send_in_next_message() const = 0; + }; + + class Stream { + public: + StreamID stream_id() const { return stream_id_; } + + StreamPriority priority() const { return priority_; } + void SetPriority(StreamPriority priority); + + // Will activate the stream _if_ it has any data to send. That is, if the + // callback to `bytes_to_send_in_next_message` returns non-zero. If the + // callback returns zero, the stream will not be made active. + void MaybeMakeActive(); + + // Will remove the stream from the list of active streams, and will not try + // to produce data from it. To make it active again, call `MaybeMakeActive`. + void MakeInactive(); + + // Make the scheduler move to another message, or another stream. This is + // used to abort the scheduler from continuing producing fragments for the + // current message in case it's deleted. + void ForceReschedule() { parent_.ForceReschedule(); } + + private: + friend class StreamScheduler; + + Stream(StreamScheduler* parent, + StreamProducer* producer, + StreamID stream_id, + StreamPriority priority) + : parent_(*parent), + producer_(*producer), + stream_id_(stream_id), + priority_(priority), + inverse_weight_(priority) {} + + // Produces a message from this stream. This will only be called on streams + // that have data. + absl::optional Produce(TimeMs now, size_t max_size); + + void MakeActive(size_t bytes_to_send_next); + void ForceMarkInactive(); + + VirtualTime current_time() const { return current_virtual_time_; } + VirtualTime next_finish_time() const { return next_finish_time_; } + size_t bytes_to_send_in_next_message() const { + return producer_.bytes_to_send_in_next_message(); + } + + VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const; + + StreamScheduler& parent_; + StreamProducer& producer_; + const StreamID stream_id_; + StreamPriority priority_; + InverseWeight inverse_weight_; + // This outgoing stream's "current" virtual_time. + VirtualTime current_virtual_time_ = VirtualTime::Zero(); + VirtualTime next_finish_time_ = VirtualTime::Zero(); + }; + + // The `mtu` parameter represents the maximum SCTP packet size, which should + // be the same as `DcSctpOptions::mtu`. + StreamScheduler(absl::string_view log_prefix, size_t mtu) + : log_prefix_(log_prefix), + max_payload_bytes_(mtu - SctpPacket::kHeaderSize - + IDataChunk::kHeaderSize) {} + + std::unique_ptr CreateStream(StreamProducer* producer, + StreamID stream_id, + StreamPriority priority) { + return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); + } + + void EnableMessageInterleaving(bool enabled) { + enable_message_interleaving_ = enabled; + } + + // Makes the scheduler stop producing message from the current stream and + // re-evaluates which stream to produce from. + void ForceReschedule() { currently_sending_a_message_ = false; } + + // Produces a fragment of data to send. The current wall time is specified as + // `now` and will be used to skip chunks with expired limited lifetime. The + // parameter `max_size` specifies the maximum amount of actual payload that + // may be returned. If no data can be produced, `absl::nullopt` is returned. + absl::optional Produce(TimeMs now, size_t max_size); + + std::set ActiveStreamsForTesting() const; + + private: + struct ActiveStreamComparator { + // Ordered by virtual finish time (primary), stream-id (secondary). + bool operator()(Stream* a, Stream* b) const { + VirtualTime a_vft = a->next_finish_time(); + VirtualTime b_vft = b->next_finish_time(); + if (a_vft == b_vft) { + return a->stream_id() < b->stream_id(); + } + return a_vft < b_vft; + } + }; + + bool IsConsistent() const; + + const absl::string_view log_prefix_; + const size_t max_payload_bytes_; + + // The current virtual time, as defined in the WFQ algorithm. + VirtualTime virtual_time_ = VirtualTime::Zero(); + + // The current stream to send chunks from. + Stream* current_stream_ = nullptr; + + bool enable_message_interleaving_ = false; + + // Indicates if the streams is currently sending a message, and should then + // - if message interleaving is not enabled - continue sending from this + // stream until that message has been sent in full. + bool currently_sending_a_message_ = false; + + // The currently active streams, ordered by virtual finish time. + webrtc::flat_set active_streams_; +}; + +} // namespace dcsctp + +#endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_ -- cgit v1.2.3