/* * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h" #include #include #include "absl/algorithm/container.h" #include "absl/memory/memory.h" #include "absl/types/optional.h" #include "api/video/video_bitrate_allocation.h" #include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtcp_packet.h" #include "modules/rtp_rtcp/source/rtcp_packet/bye.h" #include "modules/rtp_rtcp/source/rtcp_packet/common_header.h" #include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h" #include "modules/rtp_rtcp/source/rtcp_packet/fir.h" #include "modules/rtp_rtcp/source/rtcp_packet/nack.h" #include "modules/rtp_rtcp/source/rtcp_packet/pli.h" #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h" #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h" #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h" #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "modules/rtp_rtcp/source/time_util.h" #include "rtc_base/checks.h" #include "rtc_base/containers/flat_map.h" #include "rtc_base/logging.h" #include "rtc_base/numerics/divide_round.h" #include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/time_utils.h" #include "system_wrappers/include/clock.h" namespace webrtc { namespace { struct SenderReportTimes { Timestamp local_received_time; NtpTime remote_sent_time; }; std::function)> GetRtcpTransport( const RtcpTransceiverConfig& config) { if (config.rtcp_transport != nullptr) { return config.rtcp_transport; } bool first = true; std::string log_prefix = config.debug_id; return [first, log_prefix](rtc::ArrayView packet) mutable { if (first) { RTC_LOG(LS_ERROR) << log_prefix << "Sending RTCP packets is disabled."; first = false; } }; } } // namespace struct RtcpTransceiverImpl::RemoteSenderState { uint8_t fir_sequence_number = 0; absl::optional last_received_sender_report; std::vector observers; }; struct RtcpTransceiverImpl::LocalSenderState { uint32_t ssrc; size_t last_num_sent_bytes = 0; ReportBlockData report_block; // Sequence number of the last FIR message per sender SSRC. flat_map last_fir; RtpStreamRtcpHandler* handler = nullptr; }; // Helper to put several RTCP packets into lower layer datagram composing // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2. // TODO(bugs.webrtc.org/8239): When in compound mode and packets are so many // that several compound RTCP packets need to be generated, ensure each packet // is compound. class RtcpTransceiverImpl::PacketSender { public: PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback, size_t max_packet_size) : callback_(callback), max_packet_size_(max_packet_size) { RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE); } ~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; } // Appends a packet to pending compound packet. // Sends rtcp compound packet if buffer was already full and resets buffer. void AppendPacket(const rtcp::RtcpPacket& packet) { packet.Create(buffer_, &index_, max_packet_size_, callback_); } // Sends pending rtcp compound packet. void Send() { if (index_ > 0) { callback_(rtc::ArrayView(buffer_, index_)); index_ = 0; } } bool IsEmpty() const { return index_ == 0; } private: const rtcp::RtcpPacket::PacketReadyCallback callback_; const size_t max_packet_size_; size_t index_ = 0; uint8_t buffer_[IP_PACKET_SIZE]; }; RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config) : config_(config), rtcp_transport_(GetRtcpTransport(config_)), ready_to_send_(config.initial_ready_to_send) { RTC_CHECK(config_.Validate()); if (ready_to_send_ && config_.schedule_periodic_compound_packets) { SchedulePeriodicCompoundPackets(config_.initial_report_delay); } } RtcpTransceiverImpl::~RtcpTransceiverImpl() = default; void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver( uint32_t remote_ssrc, MediaReceiverRtcpObserver* observer) { if (config_.receive_statistics == nullptr && remote_senders_.empty()) { RTC_LOG(LS_WARNING) << config_.debug_id << "receive statistic is not set. RTCP report blocks " "will not be generated."; } auto& stored = remote_senders_[remote_ssrc].observers; RTC_DCHECK(!absl::c_linear_search(stored, observer)); stored.push_back(observer); } void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver( uint32_t remote_ssrc, MediaReceiverRtcpObserver* observer) { auto remote_sender_it = remote_senders_.find(remote_ssrc); if (remote_sender_it == remote_senders_.end()) return; auto& stored = remote_sender_it->second.observers; auto it = absl::c_find(stored, observer); if (it == stored.end()) return; stored.erase(it); } bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc, RtpStreamRtcpHandler* handler) { RTC_DCHECK(handler != nullptr); LocalSenderState state; state.ssrc = local_ssrc; state.handler = handler; local_senders_.push_back(state); auto it = std::prev(local_senders_.end()); auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it); if (!inserted) { local_senders_.pop_back(); return false; } return true; } bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) { auto index_it = local_senders_by_ssrc_.find(local_ssrc); if (index_it == local_senders_by_ssrc_.end()) { return false; } local_senders_.erase(index_it->second); local_senders_by_ssrc_.erase(index_it); return true; } void RtcpTransceiverImpl::SetReadyToSend(bool ready) { if (config_.schedule_periodic_compound_packets) { if (ready_to_send_ && !ready) periodic_task_handle_.Stop(); if (!ready_to_send_ && ready) // Restart periodic sending. SchedulePeriodicCompoundPackets(config_.report_period / 2); } ready_to_send_ = ready; } void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView packet, Timestamp now) { // Report blocks may be spread across multiple sender and receiver reports. std::vector report_blocks; while (!packet.empty()) { rtcp::CommonHeader rtcp_block; if (!rtcp_block.Parse(packet.data(), packet.size())) break; HandleReceivedPacket(rtcp_block, now, report_blocks); packet = packet.subview(rtcp_block.packet_size()); } if (!report_blocks.empty()) { ProcessReportBlocks(now, report_blocks); } } void RtcpTransceiverImpl::SendCompoundPacket() { if (!ready_to_send_) return; SendPeriodicCompoundPacket(); ReschedulePeriodicCompoundPackets(); } void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps, std::vector ssrcs) { RTC_DCHECK_GE(bitrate_bps, 0); bool send_now = config_.send_remb_on_change && (!remb_.has_value() || bitrate_bps != remb_->bitrate_bps()); remb_.emplace(); remb_->SetSsrcs(std::move(ssrcs)); remb_->SetBitrateBps(bitrate_bps); remb_->SetSenderSsrc(config_.feedback_ssrc); // TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb // immideately on large bitrate change when there is one RtcpTransceiver per // rtp transport. if (send_now) { absl::optional remb; remb.swap(remb_); SendImmediateFeedback(*remb); remb.swap(remb_); } } void RtcpTransceiverImpl::UnsetRemb() { remb_.reset(); } void RtcpTransceiverImpl::SendNack(uint32_t ssrc, std::vector sequence_numbers) { RTC_DCHECK(!sequence_numbers.empty()); if (!ready_to_send_) return; rtcp::Nack nack; nack.SetSenderSsrc(config_.feedback_ssrc); nack.SetMediaSsrc(ssrc); nack.SetPacketIds(std::move(sequence_numbers)); SendImmediateFeedback(nack); } void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) { if (!ready_to_send_) return; rtcp::Pli pli; pli.SetSenderSsrc(config_.feedback_ssrc); pli.SetMediaSsrc(ssrc); SendImmediateFeedback(pli); } void RtcpTransceiverImpl::SendFullIntraRequest( rtc::ArrayView ssrcs, bool new_request) { RTC_DCHECK(!ssrcs.empty()); if (!ready_to_send_) return; rtcp::Fir fir; fir.SetSenderSsrc(config_.feedback_ssrc); for (uint32_t media_ssrc : ssrcs) { uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number; if (new_request) command_seq_num += 1; fir.AddRequestTo(media_ssrc, command_seq_num); } SendImmediateFeedback(fir); } void RtcpTransceiverImpl::HandleReceivedPacket( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now, std::vector& report_blocks) { switch (rtcp_packet_header.type()) { case rtcp::Bye::kPacketType: HandleBye(rtcp_packet_header); break; case rtcp::SenderReport::kPacketType: HandleSenderReport(rtcp_packet_header, now, report_blocks); break; case rtcp::ReceiverReport::kPacketType: HandleReceiverReport(rtcp_packet_header, now, report_blocks); break; case rtcp::ExtendedReports::kPacketType: HandleExtendedReports(rtcp_packet_header, now); break; case rtcp::Psfb::kPacketType: HandlePayloadSpecificFeedback(rtcp_packet_header, now); break; case rtcp::Rtpfb::kPacketType: HandleRtpFeedback(rtcp_packet_header, now); break; } } void RtcpTransceiverImpl::HandleBye( const rtcp::CommonHeader& rtcp_packet_header) { rtcp::Bye bye; if (!bye.Parse(rtcp_packet_header)) return; auto remote_sender_it = remote_senders_.find(bye.sender_ssrc()); if (remote_sender_it == remote_senders_.end()) return; for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers) observer->OnBye(bye.sender_ssrc()); } void RtcpTransceiverImpl::HandleSenderReport( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now, std::vector& report_blocks) { rtcp::SenderReport sender_report; if (!sender_report.Parse(rtcp_packet_header)) return; RemoteSenderState& remote_sender = remote_senders_[sender_report.sender_ssrc()]; remote_sender.last_received_sender_report = {{now, sender_report.ntp()}}; HandleReportBlocks(sender_report.sender_ssrc(), now, sender_report.report_blocks(), report_blocks); for (MediaReceiverRtcpObserver* observer : remote_sender.observers) { observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(), sender_report.rtp_timestamp()); } } void RtcpTransceiverImpl::HandleReceiverReport( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now, std::vector& report_blocks) { rtcp::ReceiverReport receiver_report; if (!receiver_report.Parse(rtcp_packet_header)) { return; } HandleReportBlocks(receiver_report.sender_ssrc(), now, receiver_report.report_blocks(), report_blocks); } void RtcpTransceiverImpl::HandleReportBlocks( uint32_t sender_ssrc, Timestamp now, rtc::ArrayView rtcp_report_blocks, std::vector& report_blocks) { if (rtcp_report_blocks.empty()) { return; } NtpTime now_ntp = config_.clock->ConvertTimestampToNtpTime(now); uint32_t receive_time_ntp = CompactNtp(now_ntp); Timestamp now_utc = Timestamp::Millis(now_ntp.ToMs() - rtc::kNtpJan1970Millisecs); for (const rtcp::ReportBlock& block : rtcp_report_blocks) { absl::optional rtt; if (block.last_sr() != 0) { rtt = CompactNtpRttToTimeDelta( receive_time_ntp - block.delay_since_last_sr() - block.last_sr()); } auto sender_it = local_senders_by_ssrc_.find(block.source_ssrc()); if (sender_it != local_senders_by_ssrc_.end()) { LocalSenderState& state = *sender_it->second; state.report_block.SetReportBlock(sender_ssrc, block, now_utc); if (rtt.has_value()) { state.report_block.AddRoundTripTimeSample(*rtt); } state.handler->OnReport(state.report_block); report_blocks.push_back(state.report_block); } else { // No registered sender for this report block, still report it to the // network link. ReportBlockData report_block; report_block.SetReportBlock(sender_ssrc, block, now_utc); if (rtt.has_value()) { report_block.AddRoundTripTimeSample(*rtt); } report_blocks.push_back(report_block); } } } void RtcpTransceiverImpl::HandlePayloadSpecificFeedback( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now) { switch (rtcp_packet_header.fmt()) { case rtcp::Fir::kFeedbackMessageType: HandleFir(rtcp_packet_header); break; case rtcp::Pli::kFeedbackMessageType: HandlePli(rtcp_packet_header); break; case rtcp::Psfb::kAfbMessageType: HandleRemb(rtcp_packet_header, now); break; } } void RtcpTransceiverImpl::HandleFir( const rtcp::CommonHeader& rtcp_packet_header) { rtcp::Fir fir; if (local_senders_.empty() || !fir.Parse(rtcp_packet_header)) { return; } for (const rtcp::Fir::Request& r : fir.requests()) { auto it = local_senders_by_ssrc_.find(r.ssrc); if (it == local_senders_by_ssrc_.end()) { continue; } auto [fir_it, is_new] = it->second->last_fir.emplace(fir.sender_ssrc(), r.seq_nr); if (is_new || fir_it->second != r.seq_nr) { it->second->handler->OnFir(fir.sender_ssrc()); fir_it->second = r.seq_nr; } } } void RtcpTransceiverImpl::HandlePli( const rtcp::CommonHeader& rtcp_packet_header) { rtcp::Pli pli; if (local_senders_.empty() || !pli.Parse(rtcp_packet_header)) { return; } auto it = local_senders_by_ssrc_.find(pli.media_ssrc()); if (it != local_senders_by_ssrc_.end()) { it->second->handler->OnPli(pli.sender_ssrc()); } } void RtcpTransceiverImpl::HandleRemb( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now) { rtcp::Remb remb; if (config_.network_link_observer == nullptr || !remb.Parse(rtcp_packet_header)) { return; } config_.network_link_observer->OnReceiverEstimatedMaxBitrate( now, DataRate::BitsPerSec(remb.bitrate_bps())); } void RtcpTransceiverImpl::HandleRtpFeedback( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now) { switch (rtcp_packet_header.fmt()) { case rtcp::Nack::kFeedbackMessageType: HandleNack(rtcp_packet_header); break; case rtcp::TransportFeedback::kFeedbackMessageType: HandleTransportFeedback(rtcp_packet_header, now); break; } } void RtcpTransceiverImpl::HandleNack( const rtcp::CommonHeader& rtcp_packet_header) { rtcp::Nack nack; if (local_senders_.empty() || !nack.Parse(rtcp_packet_header)) { return; } auto it = local_senders_by_ssrc_.find(nack.media_ssrc()); if (it != local_senders_by_ssrc_.end()) { it->second->handler->OnNack(nack.sender_ssrc(), nack.packet_ids()); } } void RtcpTransceiverImpl::HandleTransportFeedback( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now) { RTC_DCHECK_EQ(rtcp_packet_header.fmt(), rtcp::TransportFeedback::kFeedbackMessageType); if (config_.network_link_observer == nullptr) { return; } rtcp::TransportFeedback feedback; if (feedback.Parse(rtcp_packet_header)) { config_.network_link_observer->OnTransportFeedback(now, feedback); } } void RtcpTransceiverImpl::HandleExtendedReports( const rtcp::CommonHeader& rtcp_packet_header, Timestamp now) { rtcp::ExtendedReports extended_reports; if (!extended_reports.Parse(rtcp_packet_header)) return; if (config_.reply_to_non_sender_rtt_measurement && extended_reports.rrtr()) { RrtrTimes& rrtr = received_rrtrs_[extended_reports.sender_ssrc()]; rrtr.received_remote_mid_ntp_time = CompactNtp(extended_reports.rrtr()->ntp()); rrtr.local_receive_mid_ntp_time = CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); } if (extended_reports.dlrr()) HandleDlrr(extended_reports.dlrr(), now); if (extended_reports.target_bitrate()) HandleTargetBitrate(*extended_reports.target_bitrate(), extended_reports.sender_ssrc()); } void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, Timestamp now) { if (!config_.non_sender_rtt_measurement || config_.network_link_observer == nullptr) { return; } // Delay and last_rr are transferred using 32bit compact ntp resolution. // Convert packet arrival time to same format through 64bit ntp format. uint32_t receive_time_ntp = CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) { if (rti.ssrc != config_.feedback_ssrc) continue; uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr; TimeDelta rtt = CompactNtpRttToTimeDelta(rtt_ntp); config_.network_link_observer->OnRttUpdate(now, rtt); } } void RtcpTransceiverImpl::ProcessReportBlocks( Timestamp now, rtc::ArrayView report_blocks) { RTC_DCHECK(!report_blocks.empty()); if (config_.network_link_observer == nullptr) { return; } // Round trip time calculated from different report blocks suppose to be about // the same, as those blocks should be generated by the same remote sender. // To avoid too many callbacks, this code accumulate multiple rtts into one. TimeDelta rtt_sum = TimeDelta::Zero(); size_t num_rtts = 0; for (const ReportBlockData& report_block : report_blocks) { if (report_block.has_rtt()) { rtt_sum += report_block.last_rtt(); ++num_rtts; } } if (num_rtts > 0) { config_.network_link_observer->OnRttUpdate(now, rtt_sum / num_rtts); } config_.network_link_observer->OnReport(now, report_blocks); } void RtcpTransceiverImpl::HandleTargetBitrate( const rtcp::TargetBitrate& target_bitrate, uint32_t remote_ssrc) { auto remote_sender_it = remote_senders_.find(remote_ssrc); if (remote_sender_it == remote_senders_.end() || remote_sender_it->second.observers.empty()) return; // Convert rtcp::TargetBitrate to VideoBitrateAllocation. VideoBitrateAllocation bitrate_allocation; for (const rtcp::TargetBitrate::BitrateItem& item : target_bitrate.GetTargetBitrates()) { if (item.spatial_layer >= kMaxSpatialLayers || item.temporal_layer >= kMaxTemporalStreams) { RTC_DLOG(LS_WARNING) << config_.debug_id << "Invalid incoming TargetBitrate with spatial layer " << item.spatial_layer << ", temporal layer " << item.temporal_layer; continue; } bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer, item.target_bitrate_kbps * 1000); } for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers) observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation); } void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() { if (!config_.schedule_periodic_compound_packets) return; periodic_task_handle_.Stop(); RTC_DCHECK(ready_to_send_); SchedulePeriodicCompoundPackets(config_.report_period); } void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(TimeDelta delay) { periodic_task_handle_ = RepeatingTaskHandle::DelayedStart( config_.task_queue, delay, [this] { RTC_DCHECK(config_.schedule_periodic_compound_packets); RTC_DCHECK(ready_to_send_); SendPeriodicCompoundPacket(); return config_.report_period; }, TaskQueueBase::DelayPrecision::kLow, config_.clock); } std::vector RtcpTransceiverImpl::FillReports( Timestamp now, ReservedBytes reserved, PacketSender& rtcp_sender) { // Sender/receiver reports should be first in the RTCP packet. RTC_DCHECK(rtcp_sender.IsEmpty()); size_t available_bytes = config_.max_packet_size; if (reserved.per_packet > available_bytes) { // Because reserved.per_packet is unsigned, substracting would underflow and // will not produce desired result. available_bytes = 0; } else { available_bytes -= reserved.per_packet; } const size_t sender_report_size_bytes = 28 + reserved.per_sender; const size_t full_sender_report_size_bytes = sender_report_size_bytes + rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength; size_t max_full_sender_reports = available_bytes / full_sender_report_size_bytes; size_t max_report_blocks = max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks; size_t available_bytes_for_last_sender_report = available_bytes - max_full_sender_reports * full_sender_report_size_bytes; if (available_bytes_for_last_sender_report >= sender_report_size_bytes) { max_report_blocks += (available_bytes_for_last_sender_report - sender_report_size_bytes) / rtcp::ReportBlock::kLength; } std::vector report_blocks = CreateReportBlocks(now, max_report_blocks); // Previous calculation of max number of sender report made space for max // number of report blocks per sender report, but if number of report blocks // is low, more sender reports may fit in. size_t max_sender_reports = (available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) / sender_report_size_bytes; auto last_handled_sender_it = local_senders_.end(); auto report_block_it = report_blocks.begin(); std::vector sender_ssrcs; for (auto it = local_senders_.begin(); it != local_senders_.end() && sender_ssrcs.size() < max_sender_reports; ++it) { LocalSenderState& rtp_sender = *it; RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats(); if (stats.num_sent_bytes() < rtp_sender.last_num_sent_bytes) { RTC_LOG(LS_ERROR) << "Inconsistent SR for SSRC " << rtp_sender.ssrc << ". Number of total sent bytes decreased."; rtp_sender.last_num_sent_bytes = 0; } if (stats.num_sent_bytes() == rtp_sender.last_num_sent_bytes) { // Skip because no RTP packet was send for this SSRC since last report. continue; } rtp_sender.last_num_sent_bytes = stats.num_sent_bytes(); last_handled_sender_it = it; rtcp::SenderReport sender_report; sender_report.SetSenderSsrc(rtp_sender.ssrc); sender_report.SetPacketCount(stats.num_sent_packets()); sender_report.SetOctetCount(stats.num_sent_bytes()); sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); RTC_DCHECK_GE(now, stats.last_capture_time()); sender_report.SetRtpTimestamp( stats.last_rtp_timestamp() + ((now - stats.last_capture_time()) * stats.last_clock_rate()) .seconds()); if (report_block_it != report_blocks.end()) { size_t num_blocks = std::min(rtcp::SenderReport::kMaxNumberOfReportBlocks, report_blocks.end() - report_block_it); std::vector sub_blocks(report_block_it, report_block_it + num_blocks); sender_report.SetReportBlocks(std::move(sub_blocks)); report_block_it += num_blocks; } rtcp_sender.AppendPacket(sender_report); sender_ssrcs.push_back(rtp_sender.ssrc); } if (last_handled_sender_it != local_senders_.end()) { // Rotate `local_senders_` so that the 1st unhandled sender become first in // the list, and thus will be first to generate rtcp sender report for on // the next call to `FillReports`. local_senders_.splice(local_senders_.end(), local_senders_, local_senders_.begin(), std::next(last_handled_sender_it)); } // Calculcate number of receiver reports to attach remaining report blocks to. size_t num_receiver_reports = DivideRoundUp(report_blocks.end() - report_block_it, rtcp::ReceiverReport::kMaxNumberOfReportBlocks); // In compound mode each RTCP packet has to start with a sender or receiver // report. if (config_.rtcp_mode == RtcpMode::kCompound && sender_ssrcs.empty() && num_receiver_reports == 0) { num_receiver_reports = 1; } uint32_t sender_ssrc = sender_ssrcs.empty() ? config_.feedback_ssrc : sender_ssrcs.front(); for (size_t i = 0; i < num_receiver_reports; ++i) { rtcp::ReceiverReport receiver_report; receiver_report.SetSenderSsrc(sender_ssrc); size_t num_blocks = std::min(rtcp::ReceiverReport::kMaxNumberOfReportBlocks, report_blocks.end() - report_block_it); std::vector sub_blocks(report_block_it, report_block_it + num_blocks); receiver_report.SetReportBlocks(std::move(sub_blocks)); report_block_it += num_blocks; rtcp_sender.AppendPacket(receiver_report); } // All report blocks should be attached at this point. RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0); return sender_ssrcs; } void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now, size_t reserved_bytes, PacketSender& sender) { RTC_DCHECK(sender.IsEmpty()); ReservedBytes reserved = {.per_packet = reserved_bytes}; absl::optional sdes; if (!config_.cname.empty()) { sdes.emplace(); bool added = sdes->AddCName(config_.feedback_ssrc, config_.cname); RTC_DCHECK(added) << "Failed to add CNAME " << config_.cname << " to RTCP SDES packet."; reserved.per_packet += sdes->BlockLength(); } if (remb_.has_value()) { reserved.per_packet += remb_->BlockLength(); } absl::optional xr_with_dlrr; if (!received_rrtrs_.empty()) { RTC_DCHECK(config_.reply_to_non_sender_rtt_measurement); xr_with_dlrr.emplace(); uint32_t now_ntp = CompactNtp(config_.clock->ConvertTimestampToNtpTime(now)); for (const auto& [ssrc, rrtr_info] : received_rrtrs_) { rtcp::ReceiveTimeInfo reply; reply.ssrc = ssrc; reply.last_rr = rrtr_info.received_remote_mid_ntp_time; reply.delay_since_last_rr = now_ntp - rrtr_info.local_receive_mid_ntp_time; xr_with_dlrr->AddDlrrItem(reply); } if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs) { reserved.per_sender += xr_with_dlrr->BlockLength(); } else { reserved.per_packet += xr_with_dlrr->BlockLength(); } } if (config_.non_sender_rtt_measurement) { // It looks like bytes for ExtendedReport header are reserved twice, but in // practice the same RtcpTransceiver won't both produce RRTR (i.e. it is a // receiver-only) and reply to RRTR (i.e. remote participant is a receiver // only). If that happen, then `reserved_bytes` would be slightly larger // than it should, which is not an issue. // 4 bytes for common RTCP header + 4 bytes for the ExtenedReports header. reserved.per_packet += (4 + 4 + rtcp::Rrtr::kLength); } std::vector sender_ssrcs = FillReports(now, reserved, sender); bool has_sender_report = !sender_ssrcs.empty(); uint32_t sender_ssrc = has_sender_report ? sender_ssrcs.front() : config_.feedback_ssrc; if (sdes.has_value() && !sender.IsEmpty()) { sender.AppendPacket(*sdes); } if (remb_.has_value()) { remb_->SetSenderSsrc(sender_ssrc); sender.AppendPacket(*remb_); } if (!has_sender_report && config_.non_sender_rtt_measurement) { rtcp::ExtendedReports xr_with_rrtr; xr_with_rrtr.SetSenderSsrc(config_.feedback_ssrc); rtcp::Rrtr rrtr; rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now)); xr_with_rrtr.SetRrtr(rrtr); sender.AppendPacket(xr_with_rrtr); } if (xr_with_dlrr.has_value()) { rtc::ArrayView ssrcs(&sender_ssrc, 1); if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs && !sender_ssrcs.empty()) { ssrcs = sender_ssrcs; } RTC_DCHECK(!ssrcs.empty()); for (uint32_t ssrc : ssrcs) { xr_with_dlrr->SetSenderSsrc(ssrc); sender.AppendPacket(*xr_with_dlrr); } } } void RtcpTransceiverImpl::SendPeriodicCompoundPacket() { Timestamp now = config_.clock->CurrentTime(); PacketSender sender(rtcp_transport_, config_.max_packet_size); CreateCompoundPacket(now, /*reserved_bytes=*/0, sender); sender.Send(); } void RtcpTransceiverImpl::SendCombinedRtcpPacket( std::vector> rtcp_packets) { PacketSender sender(rtcp_transport_, config_.max_packet_size); for (auto& rtcp_packet : rtcp_packets) { rtcp_packet->SetSenderSsrc(config_.feedback_ssrc); sender.AppendPacket(*rtcp_packet); } sender.Send(); } void RtcpTransceiverImpl::SendImmediateFeedback( const rtcp::RtcpPacket& rtcp_packet) { PacketSender sender(rtcp_transport_, config_.max_packet_size); // Compound mode requires every sent rtcp packet to be compound, i.e. start // with a sender or receiver report. if (config_.rtcp_mode == RtcpMode::kCompound) { Timestamp now = config_.clock->CurrentTime(); CreateCompoundPacket(now, /*reserved_bytes=*/rtcp_packet.BlockLength(), sender); } sender.AppendPacket(rtcp_packet); sender.Send(); // If compound packet was sent, delay (reschedule) the periodic one. if (config_.rtcp_mode == RtcpMode::kCompound) ReschedulePeriodicCompoundPackets(); } std::vector RtcpTransceiverImpl::CreateReportBlocks( Timestamp now, size_t num_max_blocks) { if (!config_.receive_statistics) return {}; std::vector report_blocks = config_.receive_statistics->RtcpReportBlocks(num_max_blocks); uint32_t last_sr = 0; uint32_t last_delay = 0; for (rtcp::ReportBlock& report_block : report_blocks) { auto it = remote_senders_.find(report_block.source_ssrc()); if (it == remote_senders_.end() || !it->second.last_received_sender_report) { continue; } const SenderReportTimes& last_sender_report = *it->second.last_received_sender_report; last_sr = CompactNtp(last_sender_report.remote_sent_time); last_delay = SaturatedToCompactNtp(now - last_sender_report.local_received_time); report_block.SetLastSr(last_sr); report_block.SetDelayLastSr(last_delay); } return report_blocks; } } // namespace webrtc