summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc')
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc312
1 files changed, 312 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
new file mode 100644
index 0000000000..f72c5cb8c1
--- /dev/null
+++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/reassembly_queue.h"
+
+#include <stddef.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/common/str_join.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
+#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+namespace {
+std::unique_ptr<ReassemblyStreams> CreateStreams(
+ absl::string_view log_prefix,
+ ReassemblyStreams::OnAssembledMessage on_assembled_message,
+ bool use_message_interleaving) {
+ if (use_message_interleaving) {
+ return std::make_unique<InterleavedReassemblyStreams>(
+ log_prefix, std::move(on_assembled_message));
+ }
+ return std::make_unique<TraditionalReassemblyStreams>(
+ log_prefix, std::move(on_assembled_message));
+}
+} // namespace
+
+ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
+ TSN peer_initial_tsn,
+ size_t max_size_bytes,
+ bool use_message_interleaving)
+ : log_prefix_(std::string(log_prefix) + "reasm: "),
+ max_size_bytes_(max_size_bytes),
+ watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
+ last_assembled_tsn_watermark_(
+ tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
+ last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
+ streams_(CreateStreams(
+ log_prefix_,
+ [this](rtc::ArrayView<const UnwrappedTSN> tsns,
+ DcSctpMessage message) {
+ AddReassembledMessage(tsns, std::move(message));
+ },
+ use_message_interleaving)) {}
+
+void ReassemblyQueue::Add(TSN tsn, Data data) {
+ RTC_DCHECK(IsConsistent());
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
+ << ", stream=" << *data.stream_id << ":"
+ << *data.message_id << ":" << *data.fsn << ", type="
+ << (data.is_beginning && data.is_end ? "complete"
+ : data.is_beginning ? "first"
+ : data.is_end ? "last"
+ : "middle");
+
+ UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
+
+ if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
+ delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Chunk has already been delivered - skipping";
+ return;
+ }
+
+ // If a stream reset has been received with a "sender's last assigned tsn" in
+ // the future, the socket is in "deferred reset processing" mode and must
+ // buffer chunks until it's exited.
+ if (deferred_reset_streams_.has_value() &&
+ unwrapped_tsn >
+ tsn_unwrapper_.Unwrap(
+ deferred_reset_streams_->req.sender_last_assigned_tsn())) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_ << "Deferring chunk with tsn=" << *tsn
+ << " until cum_ack_tsn="
+ << *deferred_reset_streams_->req.sender_last_assigned_tsn();
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "In this mode, any data arriving with a TSN larger than the
+ // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
+ // locally and held until the cumulative acknowledgment point reaches the
+ // Sender's Last Assigned TSN."
+ queued_bytes_ += data.size();
+ deferred_reset_streams_->deferred_chunks.emplace_back(
+ std::make_pair(tsn, std::move(data)));
+ } else {
+ queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
+ }
+
+ // https://tools.ietf.org/html/rfc4960#section-6.9
+ // "Note: If the data receiver runs out of buffer space while still
+ // waiting for more fragments to complete the reassembly of the message, it
+ // should dispatch part of its inbound message through a partial delivery
+ // API (see Section 10), freeing some of its receive buffer space so that
+ // the rest of the message may be received."
+
+ // TODO(boivie): Support EOR flag and partial delivery?
+ RTC_DCHECK(IsConsistent());
+}
+
+ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
+ const OutgoingSSNResetRequestParameter& req,
+ TSN cum_tsn_ack) {
+ RTC_DCHECK(IsConsistent());
+ if (deferred_reset_streams_.has_value()) {
+ // In deferred mode already.
+ return ReconfigurationResponseParameter::Result::kInProgress;
+ } else if (req.request_sequence_number() <=
+ last_completed_reset_req_seq_nbr_) {
+ // Already performed at some time previously.
+ return ReconfigurationResponseParameter::Result::kSuccessPerformed;
+ }
+
+ UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
+ UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
+
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "If the Sender's Last Assigned TSN is greater than the
+ // cumulative acknowledgment point, then the endpoint MUST enter "deferred
+ // reset processing"."
+ if (sla_tsn > unwrapped_cum_tsn_ack) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "Entering deferred reset processing mode until cum_tsn_ack="
+ << *req.sender_last_assigned_tsn();
+ deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
+ return ReconfigurationResponseParameter::Result::kInProgress;
+ }
+
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "... streams MUST be reset to 0 as the next expected SSN."
+ streams_->ResetStreams(req.stream_ids());
+ last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
+ RTC_DCHECK(IsConsistent());
+ return ReconfigurationResponseParameter::Result::kSuccessPerformed;
+}
+
+bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
+ RTC_DCHECK(IsConsistent());
+ if (deferred_reset_streams_.has_value()) {
+ UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
+ UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
+ deferred_reset_streams_->req.sender_last_assigned_tsn());
+ if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Leaving deferred reset processing with tsn="
+ << *cum_ack_tsn << ", feeding back "
+ << deferred_reset_streams_->deferred_chunks.size()
+ << " chunks";
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "... streams MUST be reset to 0 as the next expected SSN."
+ streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
+ std::vector<std::pair<TSN, Data>> deferred_chunks =
+ std::move(deferred_reset_streams_->deferred_chunks);
+ // The response will not be sent now, but as a reply to the retried
+ // request, which will come as "in progress" has been sent prior.
+ last_completed_reset_req_seq_nbr_ =
+ deferred_reset_streams_->req.request_sequence_number();
+ deferred_reset_streams_ = absl::nullopt;
+
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "Any queued TSNs (queued at step E2) MUST now be released and processed
+ // normally."
+ for (auto& [tsn, data] : deferred_chunks) {
+ queued_bytes_ -= data.size();
+ Add(tsn, std::move(data));
+ }
+
+ RTC_DCHECK(IsConsistent());
+ return true;
+ } else {
+ RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
+ << *cum_ack_tsn;
+ }
+ }
+
+ return false;
+}
+
+std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
+ std::vector<DcSctpMessage> ret;
+ reassembled_messages_.swap(ret);
+ return ret;
+}
+
+void ReassemblyQueue::AddReassembledMessage(
+ rtc::ArrayView<const UnwrappedTSN> tsns,
+ DcSctpMessage message) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
+ << StrJoin(tsns, ",",
+ [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
+ sb << *tsn.Wrap();
+ })
+ << "], message; stream_id=" << *message.stream_id()
+ << ", ppid=" << *message.ppid()
+ << ", payload=" << message.payload().size() << " bytes";
+
+ for (const UnwrappedTSN tsn : tsns) {
+ if (tsn <= last_assembled_tsn_watermark_) {
+ // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
+ // invalid SSNs, allowing ordered messages to stay in the queue that
+ // should've been discarded.
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "Message is built from fragments already seen - skipping";
+ return;
+ } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
+ // Update watermark, or insert into delivered_tsns_
+ last_assembled_tsn_watermark_.Increment();
+ } else {
+ delivered_tsns_.insert(tsn);
+ }
+ }
+
+ // With new TSNs in delivered_tsns, gaps might be filled.
+ MaybeMoveLastAssembledWatermarkFurther();
+
+ reassembled_messages_.emplace_back(std::move(message));
+}
+
+void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
+ // `delivered_tsns_` contain TSNS when there is a gap between ranges of
+ // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
+ // that list, because if so, it can be moved.
+ while (!delivered_tsns_.empty() &&
+ *delivered_tsns_.begin() ==
+ last_assembled_tsn_watermark_.next_value()) {
+ last_assembled_tsn_watermark_.Increment();
+ delivered_tsns_.erase(delivered_tsns_.begin());
+ }
+}
+
+void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
+ RTC_DCHECK(IsConsistent());
+ UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
+
+ last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
+ delivered_tsns_.erase(delivered_tsns_.begin(),
+ delivered_tsns_.upper_bound(tsn));
+
+ MaybeMoveLastAssembledWatermarkFurther();
+
+ queued_bytes_ -=
+ streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
+ RTC_DCHECK(IsConsistent());
+}
+
+bool ReassemblyQueue::IsConsistent() const {
+ // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
+ // adjacent.
+ if (!delivered_tsns_.empty() &&
+ last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
+ return false;
+ }
+
+ // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
+ // enforced in this class. This comparison will still trigger if queued_bytes_
+ // became "negative".
+ return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
+}
+
+HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
+ HandoverReadinessStatus status = streams_->GetHandoverReadiness();
+ if (!delivered_tsns_.empty()) {
+ status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
+ }
+ if (deferred_reset_streams_.has_value()) {
+ status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
+ }
+ return status;
+}
+
+void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
+ state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
+ state.rx.last_completed_deferred_reset_req_sn =
+ last_completed_reset_req_seq_nbr_.value();
+ streams_->AddHandoverState(state);
+}
+
+void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
+ // Validate that the component is in pristine state.
+ RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
+
+ last_assembled_tsn_watermark_ =
+ tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
+ last_completed_reset_req_seq_nbr_ =
+ ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
+ streams_->RestoreFromState(state);
+}
+} // namespace dcsctp