summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc')
-rw-r--r--third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc272
1 files changed, 272 insertions, 0 deletions
diff --git a/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc b/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc
new file mode 100644
index 0000000000..8b316de676
--- /dev/null
+++ b/third_party/libwebrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc
@@ -0,0 +1,272 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
+
+#include <stddef.h>
+
+#include <cstdint>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <numeric>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/types.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+
+InterleavedReassemblyStreams::InterleavedReassemblyStreams(
+ absl::string_view log_prefix,
+ OnAssembledMessage on_assembled_message)
+ : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
+
+size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
+ UnwrappedMID mid) {
+ std::map<UnwrappedMID, ChunkMap>::const_iterator it =
+ chunks_by_mid_.find(mid);
+ if (it == chunks_by_mid_.end()) {
+ RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+ << *mid.Wrap() << " - no chunks";
+ return 0;
+ }
+ const ChunkMap& chunks = it->second;
+ if (!chunks.begin()->second.second.is_beginning ||
+ !chunks.rbegin()->second.second.is_end) {
+ RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+ << *mid.Wrap() << "- missing beginning or end";
+ return 0;
+ }
+ int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
+ if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
+ RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+ << *mid.Wrap() << "- not all chunks exist (have "
+ << chunks.size() << ", expect " << (fsn_diff + 1)
+ << ")";
+ return 0;
+ }
+
+ size_t removed_bytes = AssembleMessage(chunks);
+ RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+ << *mid.Wrap() << " - succeeded and removed "
+ << removed_bytes;
+
+ chunks_by_mid_.erase(mid);
+ return removed_bytes;
+}
+
+size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
+ const ChunkMap& tsn_chunks) {
+ size_t count = tsn_chunks.size();
+ if (count == 1) {
+ // Fast path - zero-copy
+ const Data& data = tsn_chunks.begin()->second.second;
+ size_t payload_size = data.size();
+ UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
+ DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+ parent_.on_assembled_message_(tsns, std::move(message));
+ return payload_size;
+ }
+
+ // Slow path - will need to concatenate the payload.
+ std::vector<UnwrappedTSN> tsns;
+ tsns.reserve(count);
+
+ std::vector<uint8_t> payload;
+ size_t payload_size = absl::c_accumulate(
+ tsn_chunks, 0,
+ [](size_t v, const auto& p) { return v + p.second.second.size(); });
+ payload.reserve(payload_size);
+
+ for (auto& item : tsn_chunks) {
+ const UnwrappedTSN tsn = item.second.first;
+ const Data& data = item.second.second;
+ tsns.push_back(tsn);
+ payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+ }
+
+ const Data& data = tsn_chunks.begin()->second.second;
+
+ DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
+ parent_.on_assembled_message_(tsns, std::move(message));
+ return payload_size;
+}
+
+size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
+ UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
+
+ size_t removed_bytes = 0;
+ auto it = chunks_by_mid_.begin();
+ while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
+ removed_bytes += absl::c_accumulate(
+ it->second, 0,
+ [](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
+ it = chunks_by_mid_.erase(it);
+ }
+
+ if (!stream_id_.unordered) {
+ // For ordered streams, erasing a message might suddenly unblock that queue
+ // and allow it to deliver any following received messages.
+ if (unwrapped_mid >= next_mid_) {
+ next_mid_ = unwrapped_mid.next_value();
+ }
+
+ removed_bytes += TryToAssembleMessages();
+ }
+
+ return removed_bytes;
+}
+
+int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
+ RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
+ RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
+ int queued_bytes = data.size();
+ UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
+ FSN fsn = data.fsn;
+ auto [unused, inserted] =
+ chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
+ if (!inserted) {
+ return 0;
+ }
+
+ if (stream_id_.unordered) {
+ queued_bytes -= TryToAssembleMessage(mid);
+ } else {
+ if (mid == next_mid_) {
+ queued_bytes -= TryToAssembleMessages();
+ }
+ }
+
+ return queued_bytes;
+}
+
+size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
+ size_t removed_bytes = 0;
+
+ for (;;) {
+ size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
+ if (removed_bytes_this_iter == 0) {
+ break;
+ }
+
+ removed_bytes += removed_bytes_this_iter;
+ next_mid_.Increment();
+ }
+ return removed_bytes;
+}
+
+void InterleavedReassemblyStreams::Stream::AddHandoverState(
+ DcSctpSocketHandoverState& state) const {
+ if (stream_id_.unordered) {
+ DcSctpSocketHandoverState::UnorderedStream state_stream;
+ state_stream.id = stream_id_.stream_id.value();
+ state.rx.unordered_streams.push_back(std::move(state_stream));
+ } else {
+ DcSctpSocketHandoverState::OrderedStream state_stream;
+ state_stream.id = stream_id_.stream_id.value();
+ state_stream.next_ssn = next_mid_.Wrap().value();
+ state.rx.ordered_streams.push_back(std::move(state_stream));
+ }
+}
+
+InterleavedReassemblyStreams::Stream&
+InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
+ auto it = streams_.find(stream_id);
+ if (it == streams_.end()) {
+ it =
+ streams_
+ .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(stream_id, this))
+ .first;
+ }
+ return it->second;
+}
+
+int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
+ return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
+ .Add(tsn, std::move(data));
+}
+
+size_t InterleavedReassemblyStreams::HandleForwardTsn(
+ UnwrappedTSN new_cumulative_ack_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+ size_t removed_bytes = 0;
+ for (const auto& skipped : skipped_streams) {
+ removed_bytes +=
+ GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
+ .EraseTo(skipped.message_id);
+ }
+ return removed_bytes;
+}
+
+void InterleavedReassemblyStreams::ResetStreams(
+ rtc::ArrayView<const StreamID> stream_ids) {
+ if (stream_ids.empty()) {
+ for (auto& entry : streams_) {
+ entry.second.Reset();
+ }
+ } else {
+ for (StreamID stream_id : stream_ids) {
+ GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
+ GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
+ }
+ }
+}
+
+HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
+ const {
+ HandoverReadinessStatus status;
+ for (const auto& [stream_id, stream] : streams_) {
+ if (stream.has_unassembled_chunks()) {
+ status.Add(
+ stream_id.unordered
+ ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
+ : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
+ break;
+ }
+ }
+ return status;
+}
+
+void InterleavedReassemblyStreams::AddHandoverState(
+ DcSctpSocketHandoverState& state) {
+ for (const auto& [unused, stream] : streams_) {
+ stream.AddHandoverState(state);
+ }
+}
+
+void InterleavedReassemblyStreams::RestoreFromState(
+ const DcSctpSocketHandoverState& state) {
+ // Validate that the component is in pristine state.
+ RTC_DCHECK(streams_.empty());
+
+ for (const DcSctpSocketHandoverState::OrderedStream& state :
+ state.rx.ordered_streams) {
+ FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
+ streams_.emplace(
+ std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
+ }
+ for (const DcSctpSocketHandoverState::UnorderedStream& state :
+ state.rx.unordered_streams) {
+ FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
+ streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(stream_id, this));
+ }
+}
+
+} // namespace dcsctp