diff options
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc')
-rw-r--r-- | third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc new file mode 100644 index 0000000000..f72c5cb8c1 --- /dev/null +++ b/third_party/libwebrtc/net/dcsctp/rx/reassembly_queue.cc @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/reassembly_queue.h" + +#include <stddef.h> + +#include <algorithm> +#include <cstdint> +#include <memory> +#include <set> +#include <string> +#include <utility> +#include <vector> + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/common/str_join.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h" +#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h" +#include "net/dcsctp/public/dcsctp_message.h" +#include "net/dcsctp/rx/interleaved_reassembly_streams.h" +#include "net/dcsctp/rx/reassembly_streams.h" +#include "net/dcsctp/rx/traditional_reassembly_streams.h" +#include "rtc_base/logging.h" + +namespace dcsctp { +namespace { +std::unique_ptr<ReassemblyStreams> CreateStreams( + absl::string_view log_prefix, + ReassemblyStreams::OnAssembledMessage on_assembled_message, + bool use_message_interleaving) { + if (use_message_interleaving) { + return std::make_unique<InterleavedReassemblyStreams>( + log_prefix, std::move(on_assembled_message)); + } + return std::make_unique<TraditionalReassemblyStreams>( + log_prefix, std::move(on_assembled_message)); +} +} // namespace + +ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix, + TSN peer_initial_tsn, + size_t max_size_bytes, + bool use_message_interleaving) + : log_prefix_(std::string(log_prefix) + "reasm: "), + max_size_bytes_(max_size_bytes), + watermark_bytes_(max_size_bytes * kHighWatermarkLimit), + last_assembled_tsn_watermark_( + tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))), + last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)), + streams_(CreateStreams( + log_prefix_, + [this](rtc::ArrayView<const UnwrappedTSN> tsns, + DcSctpMessage message) { + AddReassembledMessage(tsns, std::move(message)); + }, + use_message_interleaving)) {} + +void ReassemblyQueue::Add(TSN tsn, Data data) { + RTC_DCHECK(IsConsistent()); + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn + << ", stream=" << *data.stream_id << ":" + << *data.message_id << ":" << *data.fsn << ", type=" + << (data.is_beginning && data.is_end ? "complete" + : data.is_beginning ? "first" + : data.is_end ? "last" + : "middle"); + + UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn); + + if (unwrapped_tsn <= last_assembled_tsn_watermark_ || + delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Chunk has already been delivered - skipping"; + return; + } + + // If a stream reset has been received with a "sender's last assigned tsn" in + // the future, the socket is in "deferred reset processing" mode and must + // buffer chunks until it's exited. + if (deferred_reset_streams_.has_value() && + unwrapped_tsn > + tsn_unwrapper_.Unwrap( + deferred_reset_streams_->req.sender_last_assigned_tsn())) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ << "Deferring chunk with tsn=" << *tsn + << " until cum_ack_tsn=" + << *deferred_reset_streams_->req.sender_last_assigned_tsn(); + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "In this mode, any data arriving with a TSN larger than the + // Sender's Last Assigned TSN for the affected stream(s) MUST be queued + // locally and held until the cumulative acknowledgment point reaches the + // Sender's Last Assigned TSN." + queued_bytes_ += data.size(); + deferred_reset_streams_->deferred_chunks.emplace_back( + std::make_pair(tsn, std::move(data))); + } else { + queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data)); + } + + // https://tools.ietf.org/html/rfc4960#section-6.9 + // "Note: If the data receiver runs out of buffer space while still + // waiting for more fragments to complete the reassembly of the message, it + // should dispatch part of its inbound message through a partial delivery + // API (see Section 10), freeing some of its receive buffer space so that + // the rest of the message may be received." + + // TODO(boivie): Support EOR flag and partial delivery? + RTC_DCHECK(IsConsistent()); +} + +ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams( + const OutgoingSSNResetRequestParameter& req, + TSN cum_tsn_ack) { + RTC_DCHECK(IsConsistent()); + if (deferred_reset_streams_.has_value()) { + // In deferred mode already. + return ReconfigurationResponseParameter::Result::kInProgress; + } else if (req.request_sequence_number() <= + last_completed_reset_req_seq_nbr_) { + // Already performed at some time previously. + return ReconfigurationResponseParameter::Result::kSuccessPerformed; + } + + UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn()); + UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack); + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "If the Sender's Last Assigned TSN is greater than the + // cumulative acknowledgment point, then the endpoint MUST enter "deferred + // reset processing"." + if (sla_tsn > unwrapped_cum_tsn_ack) { + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "Entering deferred reset processing mode until cum_tsn_ack=" + << *req.sender_last_assigned_tsn(); + deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req); + return ReconfigurationResponseParameter::Result::kInProgress; + } + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "... streams MUST be reset to 0 as the next expected SSN." + streams_->ResetStreams(req.stream_ids()); + last_completed_reset_req_seq_nbr_ = req.request_sequence_number(); + RTC_DCHECK(IsConsistent()); + return ReconfigurationResponseParameter::Result::kSuccessPerformed; +} + +bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) { + RTC_DCHECK(IsConsistent()); + if (deferred_reset_streams_.has_value()) { + UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn); + UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap( + deferred_reset_streams_->req.sender_last_assigned_tsn()); + if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Leaving deferred reset processing with tsn=" + << *cum_ack_tsn << ", feeding back " + << deferred_reset_streams_->deferred_chunks.size() + << " chunks"; + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "... streams MUST be reset to 0 as the next expected SSN." + streams_->ResetStreams(deferred_reset_streams_->req.stream_ids()); + std::vector<std::pair<TSN, Data>> deferred_chunks = + std::move(deferred_reset_streams_->deferred_chunks); + // The response will not be sent now, but as a reply to the retried + // request, which will come as "in progress" has been sent prior. + last_completed_reset_req_seq_nbr_ = + deferred_reset_streams_->req.request_sequence_number(); + deferred_reset_streams_ = absl::nullopt; + + // https://tools.ietf.org/html/rfc6525#section-5.2.2 + // "Any queued TSNs (queued at step E2) MUST now be released and processed + // normally." + for (auto& [tsn, data] : deferred_chunks) { + queued_bytes_ -= data.size(); + Add(tsn, std::move(data)); + } + + RTC_DCHECK(IsConsistent()); + return true; + } else { + RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn=" + << *cum_ack_tsn; + } + } + + return false; +} + +std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() { + std::vector<DcSctpMessage> ret; + reassembled_messages_.swap(ret); + return ret; +} + +void ReassemblyQueue::AddReassembledMessage( + rtc::ArrayView<const UnwrappedTSN> tsns, + DcSctpMessage message) { + RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=[" + << StrJoin(tsns, ",", + [](rtc::StringBuilder& sb, UnwrappedTSN tsn) { + sb << *tsn.Wrap(); + }) + << "], message; stream_id=" << *message.stream_id() + << ", ppid=" << *message.ppid() + << ", payload=" << message.payload().size() << " bytes"; + + for (const UnwrappedTSN tsn : tsns) { + if (tsn <= last_assembled_tsn_watermark_) { + // This can be provoked by a misbehaving peer by sending FORWARD-TSN with + // invalid SSNs, allowing ordered messages to stay in the queue that + // should've been discarded. + RTC_DLOG(LS_VERBOSE) + << log_prefix_ + << "Message is built from fragments already seen - skipping"; + return; + } else if (tsn == last_assembled_tsn_watermark_.next_value()) { + // Update watermark, or insert into delivered_tsns_ + last_assembled_tsn_watermark_.Increment(); + } else { + delivered_tsns_.insert(tsn); + } + } + + // With new TSNs in delivered_tsns, gaps might be filled. + MaybeMoveLastAssembledWatermarkFurther(); + + reassembled_messages_.emplace_back(std::move(message)); +} + +void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() { + // `delivered_tsns_` contain TSNS when there is a gap between ranges of + // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to + // that list, because if so, it can be moved. + while (!delivered_tsns_.empty() && + *delivered_tsns_.begin() == + last_assembled_tsn_watermark_.next_value()) { + last_assembled_tsn_watermark_.Increment(); + delivered_tsns_.erase(delivered_tsns_.begin()); + } +} + +void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) { + RTC_DCHECK(IsConsistent()); + UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn()); + + last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn); + delivered_tsns_.erase(delivered_tsns_.begin(), + delivered_tsns_.upper_bound(tsn)); + + MaybeMoveLastAssembledWatermarkFurther(); + + queued_bytes_ -= + streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams()); + RTC_DCHECK(IsConsistent()); +} + +bool ReassemblyQueue::IsConsistent() const { + // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be + // adjacent. + if (!delivered_tsns_.empty() && + last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) { + return false; + } + + // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively + // enforced in this class. This comparison will still trigger if queued_bytes_ + // became "negative". + return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_); +} + +HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const { + HandoverReadinessStatus status = streams_->GetHandoverReadiness(); + if (!delivered_tsns_.empty()) { + status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap); + } + if (deferred_reset_streams_.has_value()) { + status.Add(HandoverUnreadinessReason::kStreamResetDeferred); + } + return status; +} + +void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) { + state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value(); + state.rx.last_completed_deferred_reset_req_sn = + last_completed_reset_req_seq_nbr_.value(); + streams_->AddHandoverState(state); +} + +void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) { + // Validate that the component is in pristine state. + RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0)); + + last_assembled_tsn_watermark_ = + tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn)); + last_completed_reset_req_seq_nbr_ = + ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn); + streams_->RestoreFromState(state); +} +} // namespace dcsctp |