diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 17:32:43 +0000 |
commit | 6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch) | |
tree | a68f146d7fa01f0134297619fbe7e33db084e0aa /third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc | |
parent | Initial commit. (diff) | |
download | thunderbird-6bf0a5cb5034a7e684dcc3500e841785237ce2dd.tar.xz thunderbird-6bf0a5cb5034a7e684dcc3500e841785237ce2dd.zip |
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc')
-rw-r--r-- | third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc b/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc new file mode 100644 index 0000000000..8b316de676 --- /dev/null +++ b/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc @@ -0,0 +1,272 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "net/dcsctp/rx/interleaved_reassembly_streams.h" + +#include <stddef.h> + +#include <cstdint> +#include <functional> +#include <iterator> +#include <map> +#include <numeric> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "absl/algorithm/container.h" +#include "api/array_view.h" +#include "net/dcsctp/common/sequence_numbers.h" +#include "net/dcsctp/packet/chunk/forward_tsn_common.h" +#include "net/dcsctp/packet/data.h" +#include "net/dcsctp/public/types.h" +#include "rtc_base/logging.h" + +namespace dcsctp { + +InterleavedReassemblyStreams::InterleavedReassemblyStreams( + absl::string_view log_prefix, + OnAssembledMessage on_assembled_message) + : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {} + +size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage( + UnwrappedMID mid) { + std::map<UnwrappedMID, ChunkMap>::const_iterator it = + chunks_by_mid_.find(mid); + if (it == chunks_by_mid_.end()) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << " - no chunks"; + return 0; + } + const ChunkMap& chunks = it->second; + if (!chunks.begin()->second.second.is_beginning || + !chunks.rbegin()->second.second.is_end) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << "- missing beginning or end"; + return 0; + } + int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first; + if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) { + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << "- not all chunks exist (have " + << chunks.size() << ", expect " << (fsn_diff + 1) + << ")"; + return 0; + } + + size_t removed_bytes = AssembleMessage(chunks); + RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " + << *mid.Wrap() << " - succeeded and removed " + << removed_bytes; + + chunks_by_mid_.erase(mid); + return removed_bytes; +} + +size_t InterleavedReassemblyStreams::Stream::AssembleMessage( + const ChunkMap& tsn_chunks) { + size_t count = tsn_chunks.size(); + if (count == 1) { + // Fast path - zero-copy + const Data& data = tsn_chunks.begin()->second.second; + size_t payload_size = data.size(); + UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first}; + DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); + parent_.on_assembled_message_(tsns, std::move(message)); + return payload_size; + } + + // Slow path - will need to concatenate the payload. + std::vector<UnwrappedTSN> tsns; + tsns.reserve(count); + + std::vector<uint8_t> payload; + size_t payload_size = absl::c_accumulate( + tsn_chunks, 0, + [](size_t v, const auto& p) { return v + p.second.second.size(); }); + payload.reserve(payload_size); + + for (auto& item : tsn_chunks) { + const UnwrappedTSN tsn = item.second.first; + const Data& data = item.second.second; + tsns.push_back(tsn); + payload.insert(payload.end(), data.payload.begin(), data.payload.end()); + } + + const Data& data = tsn_chunks.begin()->second.second; + + DcSctpMessage message(data.stream_id, data.ppid, std::move(payload)); + parent_.on_assembled_message_(tsns, std::move(message)); + return payload_size; +} + +size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) { + UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id); + + size_t removed_bytes = 0; + auto it = chunks_by_mid_.begin(); + while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) { + removed_bytes += absl::c_accumulate( + it->second, 0, + [](size_t r2, const auto& q) { return r2 + q.second.second.size(); }); + it = chunks_by_mid_.erase(it); + } + + if (!stream_id_.unordered) { + // For ordered streams, erasing a message might suddenly unblock that queue + // and allow it to deliver any following received messages. + if (unwrapped_mid >= next_mid_) { + next_mid_ = unwrapped_mid.next_value(); + } + + removed_bytes += TryToAssembleMessages(); + } + + return removed_bytes; +} + +int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) { + RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered); + RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id); + int queued_bytes = data.size(); + UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id); + FSN fsn = data.fsn; + auto [unused, inserted] = + chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data))); + if (!inserted) { + return 0; + } + + if (stream_id_.unordered) { + queued_bytes -= TryToAssembleMessage(mid); + } else { + if (mid == next_mid_) { + queued_bytes -= TryToAssembleMessages(); + } + } + + return queued_bytes; +} + +size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() { + size_t removed_bytes = 0; + + for (;;) { + size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_); + if (removed_bytes_this_iter == 0) { + break; + } + + removed_bytes += removed_bytes_this_iter; + next_mid_.Increment(); + } + return removed_bytes; +} + +void InterleavedReassemblyStreams::Stream::AddHandoverState( + DcSctpSocketHandoverState& state) const { + if (stream_id_.unordered) { + DcSctpSocketHandoverState::UnorderedStream state_stream; + state_stream.id = stream_id_.stream_id.value(); + state.rx.unordered_streams.push_back(std::move(state_stream)); + } else { + DcSctpSocketHandoverState::OrderedStream state_stream; + state_stream.id = stream_id_.stream_id.value(); + state_stream.next_ssn = next_mid_.Wrap().value(); + state.rx.ordered_streams.push_back(std::move(state_stream)); + } +} + +InterleavedReassemblyStreams::Stream& +InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) { + auto it = streams_.find(stream_id); + if (it == streams_.end()) { + it = + streams_ + .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this)) + .first; + } + return it->second; +} + +int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) { + return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id)) + .Add(tsn, std::move(data)); +} + +size_t InterleavedReassemblyStreams::HandleForwardTsn( + UnwrappedTSN new_cumulative_ack_tsn, + rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) { + size_t removed_bytes = 0; + for (const auto& skipped : skipped_streams) { + removed_bytes += + GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id)) + .EraseTo(skipped.message_id); + } + return removed_bytes; +} + +void InterleavedReassemblyStreams::ResetStreams( + rtc::ArrayView<const StreamID> stream_ids) { + if (stream_ids.empty()) { + for (auto& entry : streams_) { + entry.second.Reset(); + } + } else { + for (StreamID stream_id : stream_ids) { + GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset(); + GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset(); + } + } +} + +HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness() + const { + HandoverReadinessStatus status; + for (const auto& [stream_id, stream] : streams_) { + if (stream.has_unassembled_chunks()) { + status.Add( + stream_id.unordered + ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks + : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks); + break; + } + } + return status; +} + +void InterleavedReassemblyStreams::AddHandoverState( + DcSctpSocketHandoverState& state) { + for (const auto& [unused, stream] : streams_) { + stream.AddHandoverState(state); + } +} + +void InterleavedReassemblyStreams::RestoreFromState( + const DcSctpSocketHandoverState& state) { + // Validate that the component is in pristine state. + RTC_DCHECK(streams_.empty()); + + for (const DcSctpSocketHandoverState::OrderedStream& state : + state.rx.ordered_streams) { + FullStreamId stream_id(IsUnordered(false), StreamID(state.id)); + streams_.emplace( + std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this, MID(state.next_ssn))); + } + for (const DcSctpSocketHandoverState::UnorderedStream& state : + state.rx.unordered_streams) { + FullStreamId stream_id(IsUnordered(true), StreamID(state.id)); + streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), + std::forward_as_tuple(stream_id, this)); + } +} + +} // namespace dcsctp |