diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
commit | 6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc | |
parent | Initial commit. (diff) | |
download | thunderbird-upstream.tar.xz thunderbird-upstream.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc')
-rw-r--r-- | third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc new file mode 100644 index 0000000000..d1560a75e4 --- /dev/null +++ b/third_party/libwebrtc/net/dcsctp/tx/stream_scheduler.cc @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/tx/stream_scheduler.h" + +#include <algorithm> + +#include "absl/algorithm/container.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/public/dcsctp_socket.h" +#include "net/dcsctp/public/types.h" +#include "net/dcsctp/tx/send_queue.h" +#include "rtc_base/checks.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +void StreamScheduler::Stream::SetPriority(StreamPriority priority) { + priority_ = priority; + inverse_weight_ = InverseWeight(priority); +} + +absl::optional<SendQueue::DataToSend> StreamScheduler::Produce( + TimeMs now, + size_t max_size) { + // For non-interleaved streams, avoid rescheduling while still sending a + // message as it needs to be sent in full. For interleaved messaging, + // reschedule for every I-DATA chunk sent. + bool rescheduling = + enable_message_interleaving_ || !currently_sending_a_message_; + + RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling + << ", active=" + << StrJoin(active_streams_, ", ", + [&](rtc::StringBuilder& sb, const auto& p) { + sb << *p->stream_id() << "@" + << *p->next_finish_time(); + }); + + RTC_DCHECK(rescheduling || current_stream_ != nullptr); + + absl::optional<SendQueue::DataToSend> data; + while (!data.has_value() && !active_streams_.empty()) { + if (rescheduling) { + auto it = active_streams_.begin(); + current_stream_ = *it; + RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream " + << *current_stream_->stream_id(); + + active_streams_.erase(it); + current_stream_->ForceMarkInactive(); + } else { + RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: " + << *current_stream_->stream_id(); + RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) { + return p == current_stream_; + })); + } + + data = current_stream_->Produce(now, max_size); + } + + if (!data.has_value()) { + RTC_DLOG(LS_VERBOSE) + << "There is no stream with data; Can't produce any data."; + RTC_DCHECK(IsConsistent()); + + return absl::nullopt; + } + + RTC_DCHECK(data->data.stream_id == current_stream_->stream_id()); + + RTC_DLOG(LS_VERBOSE) << "Producing DATA, type=" + << (data->data.is_unordered ? "unordered" : "ordered") + << "::" + << (*data->data.is_beginning && *data->data.is_end + ? "complete" + : *data->data.is_beginning ? "first" + : *data->data.is_end ? "last" + : "middle") + << ", stream_id=" << *current_stream_->stream_id() + << ", ppid=" << *data->data.ppid + << ", length=" << data->data.payload.size(); + + currently_sending_a_message_ = !*data->data.is_end; + virtual_time_ = current_stream_->current_time(); + + // One side-effect of rescheduling is that the new stream will not be present + // in `active_streams`. + size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message(); + if (rescheduling && bytes_to_send_next > 0) { + current_stream_->MakeActive(bytes_to_send_next); + } else if (!rescheduling && bytes_to_send_next == 0) { + current_stream_->MakeInactive(); + } + + RTC_DCHECK(IsConsistent()); + return data; +} + +StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( + size_t bytes_to_send_next) const { + if (parent_.enable_message_interleaving_) { + // Perform weighted fair queuing scheduling. + return VirtualTime(*current_virtual_time_ + + bytes_to_send_next * *inverse_weight_); + } + + // Perform round-robin scheduling by letting the stream have its next virtual + // finish time in the future. It doesn't matter how far into the future, just + // any positive number so that any other stream that has the same virtual + // finish time as this stream gets to produce their data before revisiting + // this stream. + return VirtualTime(*current_virtual_time_ + 1); +} + +absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce( + TimeMs now, + size_t max_size) { + absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size); + + if (data.has_value()) { + VirtualTime new_current = CalculateFinishTime(data->data.payload.size()); + RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_ + << " -> " << *new_current; + current_virtual_time_ = new_current; + } + + return data; +} + +bool StreamScheduler::IsConsistent() const { + for (Stream* stream : active_streams_) { + if (stream->next_finish_time_ == VirtualTime::Zero()) { + RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id() + << " is active, but has no next-finish-time"; + return false; + } + } + return true; +} + +void StreamScheduler::Stream::MaybeMakeActive() { + RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")"; + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + size_t bytes_to_send_next = bytes_to_send_in_next_message(); + if (bytes_to_send_next == 0) { + return; + } + + MakeActive(bytes_to_send_next); +} + +void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) { + current_virtual_time_ = parent_.virtual_time_; + RTC_DCHECK_GT(bytes_to_send_next, 0); + VirtualTime next_finish_time = CalculateFinishTime( + std::min(bytes_to_send_next, parent_.max_payload_bytes_)); + RTC_DCHECK_GT(*next_finish_time, 0); + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() + << " active, expiring at " << *next_finish_time; + RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); + next_finish_time_ = next_finish_time; + RTC_DCHECK(!absl::c_any_of(parent_.active_streams_, + [this](const auto* p) { return p == this; })); + parent_.active_streams_.emplace(this); +} + +void StreamScheduler::Stream::ForceMarkInactive() { + RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive"; + RTC_DCHECK(next_finish_time_ != VirtualTime::Zero()); + next_finish_time_ = VirtualTime::Zero(); +} + +void StreamScheduler::Stream::MakeInactive() { + ForceMarkInactive(); + webrtc::EraseIf(parent_.active_streams_, + [&](const auto* s) { return s == this; }); +} + +std::set<StreamID> StreamScheduler::ActiveStreamsForTesting() const { + std::set<StreamID> stream_ids; + for (const auto& stream : active_streams_) { + stream_ids.insert(stream->stream_id()); + } + return stream_ids; +} + +} // namespace dcsctp |