summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h')
-rw-r--r--third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h282
1 files changed, 282 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
new file mode 100644
index 0000000000..e9b8cd2081
--- /dev/null
+++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h
@@ -0,0 +1,282 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
+#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
+
+#include <cstdint>
+#include <deque>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/dcsctp_socket.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/tx/send_queue.h"
+#include "net/dcsctp/tx/stream_scheduler.h"
+
+namespace dcsctp {
+
+// The Round Robin SendQueue holds all messages that the client wants to send,
+// but that haven't yet been split into chunks and fully sent on the wire.
+//
+// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
+// it will cycle to send messages from different streams. It will send all
+// fragments from one message before continuing with a different message on
+// possibly a different stream, until support for message interleaving has been
+// implemented.
+//
+// As messages can be (requested to be) sent before the connection is properly
+// established, this send queue is always present - even for closed connections.
+//
+// The send queue may trigger callbacks:
+// * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow`
+// These will be triggered as defined in their documentation.
+// * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd`
+// These will be triggered when messages have been expired, abandoned or
+// discarded from the send queue. If a message is fully produced, meaning
+// that the last fragment has been produced, the responsibility to send
+// lifecycle events is then transferred to the retransmission queue, which
+// is the one asking to produce the message.
+class RRSendQueue : public SendQueue {
+ public:
+ RRSendQueue(absl::string_view log_prefix,
+ DcSctpSocketCallbacks* callbacks,
+ size_t buffer_size,
+ size_t mtu,
+ StreamPriority default_priority,
+ size_t total_buffered_amount_low_threshold);
+
+ // Indicates if the buffer is full. Note that it's up to the caller to ensure
+ // that the buffer is not full prior to adding new items to it.
+ bool IsFull() const;
+ // Indicates if the buffer is empty.
+ bool IsEmpty() const;
+
+ // Adds the message to be sent using the `send_options` provided. The current
+ // time should be in `now`. Note that it's the responsibility of the caller to
+ // ensure that the buffer is not full (by calling `IsFull`) before adding
+ // messages to it.
+ void Add(TimeMs now,
+ DcSctpMessage message,
+ const SendOptions& send_options = {});
+
+ // Implementation of `SendQueue`.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
+ bool Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) override;
+ void PrepareResetStream(StreamID streams) override;
+ bool HasStreamsReadyToBeReset() const override;
+ std::vector<StreamID> GetStreamsReadyToBeReset() override;
+ void CommitResetStreams() override;
+ void RollbackResetStreams() override;
+ void Reset() override;
+ size_t buffered_amount(StreamID stream_id) const override;
+ size_t total_buffered_amount() const override {
+ return total_buffered_amount_.value();
+ }
+ size_t buffered_amount_low_threshold(StreamID stream_id) const override;
+ void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
+ void EnableMessageInterleaving(bool enabled) override {
+ scheduler_.EnableMessageInterleaving(enabled);
+ }
+
+ void SetStreamPriority(StreamID stream_id, StreamPriority priority);
+ StreamPriority GetStreamPriority(StreamID stream_id) const;
+ HandoverReadinessStatus GetHandoverReadiness() const;
+ void AddHandoverState(DcSctpSocketHandoverState& state);
+ void RestoreFromState(const DcSctpSocketHandoverState& state);
+
+ private:
+ struct MessageAttributes {
+ IsUnordered unordered;
+ MaxRetransmits max_retransmissions;
+ TimeMs expires_at;
+ LifecycleId lifecycle_id;
+ };
+
+ // Represents a value and a "low threshold" that when the value reaches or
+ // goes under the "low threshold", will trigger `on_threshold_reached`
+ // callback.
+ class ThresholdWatcher {
+ public:
+ explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
+ : on_threshold_reached_(std::move(on_threshold_reached)) {}
+ // Increases the value.
+ void Increase(size_t bytes) { value_ += bytes; }
+ // Decreases the value and triggers `on_threshold_reached` if it's at or
+ // below `low_threshold()`.
+ void Decrease(size_t bytes);
+
+ size_t value() const { return value_; }
+ size_t low_threshold() const { return low_threshold_; }
+ void SetLowThreshold(size_t low_threshold);
+
+ private:
+ const std::function<void()> on_threshold_reached_;
+ size_t value_ = 0;
+ size_t low_threshold_ = 0;
+ };
+
+ // Per-stream information.
+ class OutgoingStream : public StreamScheduler::StreamProducer {
+ public:
+ OutgoingStream(
+ RRSendQueue* parent,
+ StreamScheduler* scheduler,
+ StreamID stream_id,
+ StreamPriority priority,
+ std::function<void()> on_buffered_amount_low,
+ const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
+ : parent_(*parent),
+ scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
+ next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
+ next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
+ next_ssn_(SSN(state ? state->next_ssn : 0)),
+ buffered_amount_(std::move(on_buffered_amount_low)) {}
+
+ StreamID stream_id() const { return scheduler_stream_->stream_id(); }
+
+ // Enqueues a message to this stream.
+ void Add(DcSctpMessage message, MessageAttributes attributes);
+
+ // Implementing `StreamScheduler::StreamProducer`.
+ absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
+ size_t max_size) override;
+ size_t bytes_to_send_in_next_message() const override;
+
+ const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
+ ThresholdWatcher& buffered_amount() { return buffered_amount_; }
+
+ // Discards a partially sent message, see `SendQueue::Discard`.
+ bool Discard(IsUnordered unordered, MID message_id);
+
+ // Pauses this stream, which is used before resetting it.
+ void Pause();
+
+ // Resumes a paused stream.
+ void Resume();
+
+ bool IsReadyToBeReset() const {
+ return pause_state_ == PauseState::kPaused;
+ }
+
+ bool IsResetting() const { return pause_state_ == PauseState::kResetting; }
+
+ void SetAsResetting() {
+ RTC_DCHECK(pause_state_ == PauseState::kPaused);
+ pause_state_ = PauseState::kResetting;
+ }
+
+ // Resets this stream, meaning MIDs and SSNs are set to zero.
+ void Reset();
+
+ // Indicates if this stream has a partially sent message in it.
+ bool has_partially_sent_message() const;
+
+ StreamPriority priority() const { return scheduler_stream_->priority(); }
+ void SetPriority(StreamPriority priority) {
+ scheduler_stream_->SetPriority(priority);
+ }
+
+ void AddHandoverState(
+ DcSctpSocketHandoverState::OutgoingStream& state) const;
+
+ private:
+ // Streams are paused before they can be reset. To reset a stream, the
+ // socket sends an outgoing stream reset command with the TSN of the last
+ // fragment of the last message, so that receivers and senders can agree on
+ // when it stopped. And if the send queue is in the middle of sending a
+ // message, and without fragments not yet sent and without TSNs allocated to
+ // them, it will keep sending data until that message has ended.
+ enum class PauseState {
+ // The stream is not paused, and not scheduled to be reset.
+ kNotPaused,
+ // The stream has requested to be reset/paused but is still producing
+ // fragments of a message that hasn't ended yet. When it does, it will
+ // transition to the `kPaused` state.
+ kPending,
+ // The stream is fully paused and can be reset.
+ kPaused,
+ // The stream has been added to an outgoing stream reset request and a
+ // response from the peer hasn't been received yet.
+ kResetting,
+ };
+
+ // An enqueued message and metadata.
+ struct Item {
+ explicit Item(DcSctpMessage msg, MessageAttributes attributes)
+ : message(std::move(msg)),
+ attributes(std::move(attributes)),
+ remaining_offset(0),
+ remaining_size(message.payload().size()) {}
+ DcSctpMessage message;
+ MessageAttributes attributes;
+ // The remaining payload (offset and size) to be sent, when it has been
+ // fragmented.
+ size_t remaining_offset;
+ size_t remaining_size;
+ // If set, an allocated Message ID and SSN. Will be allocated when the
+ // first fragment is sent.
+ absl::optional<MID> message_id = absl::nullopt;
+ absl::optional<SSN> ssn = absl::nullopt;
+ // The current Fragment Sequence Number, incremented for each fragment.
+ FSN current_fsn = FSN(0);
+ };
+
+ bool IsConsistent() const;
+ void HandleMessageExpired(OutgoingStream::Item& item);
+
+ RRSendQueue& parent_;
+
+ const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
+
+ PauseState pause_state_ = PauseState::kNotPaused;
+ // MIDs are different for unordered and ordered messages sent on a stream.
+ MID next_unordered_mid_;
+ MID next_ordered_mid_;
+
+ SSN next_ssn_;
+ // Enqueued messages, and metadata.
+ std::deque<Item> items_;
+
+ // The current amount of buffered data.
+ ThresholdWatcher buffered_amount_;
+ };
+
+ bool IsConsistent() const;
+ OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
+ absl::optional<DataToSend> Produce(
+ std::map<StreamID, OutgoingStream>::iterator it,
+ TimeMs now,
+ size_t max_size);
+
+ const std::string log_prefix_;
+ DcSctpSocketCallbacks& callbacks_;
+ const size_t buffer_size_;
+ const StreamPriority default_priority_;
+ StreamScheduler scheduler_;
+
+ // The total amount of buffer data, for all streams.
+ ThresholdWatcher total_buffered_amount_;
+
+ // All streams, and messages added to those.
+ std::map<StreamID, OutgoingStream> streams_;
+};
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_