/* * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "net/dcsctp/rx/reassembly_queue.h" #include #include #include #include #include #include #include #include #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" #include "net/dcsctp/common/sequence_numbers.h" #include "net/dcsctp/common/str_join.h" #include "net/dcsctp/packet/chunk/forward_tsn_common.h" #include "net/dcsctp/packet/data.h" #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/types.h" #include "net/dcsctp/rx/interleaved_reassembly_streams.h" #include "net/dcsctp/rx/reassembly_streams.h" #include "net/dcsctp/rx/traditional_reassembly_streams.h" #include "rtc_base/logging.h" namespace dcsctp { namespace { std::unique_ptr CreateStreams( absl::string_view log_prefix, ReassemblyStreams::OnAssembledMessage on_assembled_message, bool use_message_interleaving) { if (use_message_interleaving) { return std::make_unique( log_prefix, std::move(on_assembled_message)); } return std::make_unique( log_prefix, std::move(on_assembled_message)); } } // namespace ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, TSN peer_initial_tsn, size_t max_size_bytes, bool use_message_interleaving) : log_prefix_(log_prefix), max_size_bytes_(max_size_bytes), watermark_bytes_(max_size_bytes * kHighWatermarkLimit), last_assembled_tsn_watermark_( tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))), last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)), streams_(CreateStreams( log_prefix_, [this](rtc::ArrayView tsns, DcSctpMessage message) { AddReassembledMessage(tsns, std::move(message)); }, use_message_interleaving)) {} void ReassemblyQueue::Add(TSN tsn, Data data) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn << ", stream=" << *data.stream_id << ":" << *data.mid << ":" << *data.fsn << ", type=" << (data.is_beginning && data.is_end ? "complete" : data.is_beginning ? "first" : data.is_end ? "last" : "middle"); UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn); // If a stream reset has been received with a "sender's last assigned tsn" in // the future, the socket is in "deferred reset processing" mode and must // buffer chunks until it's exited. if (deferred_reset_streams_.has_value() && unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn && deferred_reset_streams_->streams.contains(data.stream_id)) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Deferring chunk with tsn=" << *tsn << ", sid=" << *data.stream_id << " until tsn=" << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap(); // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "In this mode, any data arriving with a TSN larger than the // Sender's Last Assigned TSN for the affected stream(s) MUST be queued // locally and held until the cumulative acknowledgment point reaches the // Sender's Last Assigned TSN." queued_bytes_ += data.size(); deferred_reset_streams_->deferred_actions.push_back( [this, tsn, data = std::move(data)]() mutable { queued_bytes_ -= data.size(); Add(tsn, std::move(data)); }); } else { queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data)); } // https://tools.ietf.org/html/rfc4960#section-6.9 // "Note: If the data receiver runs out of buffer space while still // waiting for more fragments to complete the reassembly of the message, it // should dispatch part of its inbound message through a partial delivery // API (see Section 10), freeing some of its receive buffer space so that // the rest of the message may be received." // TODO(boivie): Support EOR flag and partial delivery? RTC_DCHECK(IsConsistent()); } void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset( rtc::ArrayView stream_ids) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: [" << StrJoin(stream_ids, ",", [](rtc::StringBuilder& sb, StreamID sid) { sb << *sid; }) << "]"; // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "... streams MUST be reset to 0 as the next expected SSN." streams_->ResetStreams(stream_ids); if (deferred_reset_streams_.has_value()) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Leaving deferred reset processing, feeding back " << deferred_reset_streams_->deferred_actions.size() << " actions"; // https://tools.ietf.org/html/rfc6525#section-5.2.2 // "Any queued TSNs (queued at step E2) MUST now be released and processed // normally." auto deferred_actions = std::move(deferred_reset_streams_->deferred_actions); deferred_reset_streams_ = absl::nullopt; for (auto& action : deferred_actions) { action(); } } RTC_DCHECK(IsConsistent()); } void ReassemblyQueue::EnterDeferredReset( TSN sender_last_assigned_tsn, rtc::ArrayView streams) { if (!deferred_reset_streams_.has_value()) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Entering deferred reset; sender_last_assigned_tsn=" << *sender_last_assigned_tsn; deferred_reset_streams_ = absl::make_optional( tsn_unwrapper_.Unwrap(sender_last_assigned_tsn), webrtc::flat_set(streams.begin(), streams.end())); } RTC_DCHECK(IsConsistent()); } std::vector ReassemblyQueue::FlushMessages() { std::vector ret; reassembled_messages_.swap(ret); return ret; } void ReassemblyQueue::AddReassembledMessage( rtc::ArrayView tsns, DcSctpMessage message) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=[" << StrJoin(tsns, ",", [](rtc::StringBuilder& sb, UnwrappedTSN tsn) { sb << *tsn.Wrap(); }) << "], message; stream_id=" << *message.stream_id() << ", ppid=" << *message.ppid() << ", payload=" << message.payload().size() << " bytes"; for (const UnwrappedTSN tsn : tsns) { if (tsn == last_assembled_tsn_watermark_.next_value()) { // Update watermark, or insert into delivered_tsns_ last_assembled_tsn_watermark_.Increment(); } else { delivered_tsns_.insert(tsn); } } // With new TSNs in delivered_tsns, gaps might be filled. MaybeMoveLastAssembledWatermarkFurther(); reassembled_messages_.emplace_back(std::move(message)); } void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { // `delivered_tsns_` contain TSNS when there is a gap between ranges of // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to // that list, because if so, it can be moved. while (!delivered_tsns_.empty() && *delivered_tsns_.begin() == last_assembled_tsn_watermark_.next_value()) { last_assembled_tsn_watermark_.Increment(); delivered_tsns_.erase(delivered_tsns_.begin()); } } void ReassemblyQueue::HandleForwardTsn( TSN new_cumulative_tsn, rtc::ArrayView skipped_streams) { UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn); if (deferred_reset_streams_.has_value() && tsn > deferred_reset_streams_->sender_last_assigned_tsn) { RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() << "- deferring."; deferred_reset_streams_->deferred_actions.emplace_back( [this, new_cumulative_tsn, streams = std::vector( skipped_streams.begin(), skipped_streams.end())] { HandleForwardTsn(new_cumulative_tsn, streams); }); RTC_DCHECK(IsConsistent()); return; } RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap() << " - performing."; last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); delivered_tsns_.erase(delivered_tsns_.begin(), delivered_tsns_.upper_bound(tsn)); MaybeMoveLastAssembledWatermarkFurther(); queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams); RTC_DCHECK(IsConsistent()); } bool ReassemblyQueue::IsConsistent() const { // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be // adjacent. if (!delivered_tsns_.empty() && last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) { return false; } // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively // enforced in this class. This comparison will still trigger if queued_bytes_ // became "negative". return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_); } HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const { HandoverReadinessStatus status = streams_->GetHandoverReadiness(); if (!delivered_tsns_.empty()) { status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap); } if (deferred_reset_streams_.has_value()) { status.Add(HandoverUnreadinessReason::kStreamResetDeferred); } return status; } void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) { state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value(); state.rx.last_completed_deferred_reset_req_sn = last_completed_reset_req_seq_nbr_.value(); streams_->AddHandoverState(state); } void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { // Validate that the component is in pristine state. RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0)); last_assembled_tsn_watermark_ = tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn)); last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn); streams_->RestoreFromState(state); } } // namespace dcsctp