/* * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "modules/rtp_rtcp/source/rtp_sender_egress.h" #include #include #include #include #include "absl/strings/match.h" #include "api/units/timestamp.h" #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h" #include "rtc_base/logging.h" namespace webrtc { namespace { constexpr uint32_t kTimestampTicksPerMs = 90; constexpr TimeDelta kBitrateStatisticsWindow = TimeDelta::Seconds(1); constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13; constexpr TimeDelta kUpdateInterval = kBitrateStatisticsWindow; } // namespace RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender( TaskQueueBase& worker_queue, RtpSenderEgress* sender, PacketSequencer* sequencer) : worker_queue_(worker_queue), transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) { RTC_DCHECK(sequencer); } RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() { RTC_DCHECK_RUN_ON(&worker_queue_); } void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets( std::vector> packets) { if (!worker_queue_.IsCurrent()) { worker_queue_.PostTask(SafeTask( task_safety_.flag(), [this, packets = std::move(packets)]() mutable { EnqueuePackets(std::move(packets)); })); return; } RTC_DCHECK_RUN_ON(&worker_queue_); for (auto& packet : packets) { PrepareForSend(packet.get()); sender_->SendPacket(std::move(packet), PacedPacketInfo()); } auto fec_packets = sender_->FetchFecPackets(); if (!fec_packets.empty()) { EnqueuePackets(std::move(fec_packets)); } } void RtpSenderEgress::NonPacedPacketSender::PrepareForSend( RtpPacketToSend* packet) { RTC_DCHECK_RUN_ON(&worker_queue_); // Assign sequence numbers, but not for flexfec which is already running on // an internally maintained sequence number series. if (packet->Ssrc() != sender_->FlexFecSsrc()) { sequencer_->Sequence(*packet); } if (!packet->SetExtension( ++transport_sequence_number_)) { --transport_sequence_number_; } packet->ReserveExtension(); packet->ReserveExtension(); } RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config, RtpPacketHistory* packet_history) : enable_send_packet_batching_(config.enable_send_packet_batching), worker_queue_(TaskQueueBase::Current()), ssrc_(config.local_media_ssrc), rtx_ssrc_(config.rtx_send_ssrc), flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc() : absl::nullopt), populate_network2_timestamp_(config.populate_network2_timestamp), clock_(config.clock), packet_history_(packet_history), transport_(config.outgoing_transport), event_log_(config.event_log), is_audio_(config.audio), need_rtp_packet_infos_(config.need_rtp_packet_infos), fec_generator_(config.fec_generator), transport_feedback_observer_(config.transport_feedback_callback), send_packet_observer_(config.send_packet_observer), rtp_stats_callback_(config.rtp_stats_callback), bitrate_callback_(config.send_bitrate_observer), media_has_been_sent_(false), force_part_of_allocation_(false), timestamp_offset_(0), send_rates_(kNumMediaTypes, BitrateTracker(kBitrateStatisticsWindow)), rtp_sequence_number_map_(need_rtp_packet_infos_ ? std::make_unique( kRtpSequenceNumberMapMaxEntries) : nullptr) { RTC_DCHECK(worker_queue_); if (bitrate_callback_) { update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_, kUpdateInterval, [this]() { PeriodicUpdate(); return kUpdateInterval; }); } } RtpSenderEgress::~RtpSenderEgress() { RTC_DCHECK_RUN_ON(worker_queue_); update_task_.Stop(); } void RtpSenderEgress::SendPacket(std::unique_ptr packet, const PacedPacketInfo& pacing_info) { RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(packet); if (packet->Ssrc() == ssrc_ && packet->packet_type() != RtpPacketMediaType::kRetransmission) { if (last_sent_seq_.has_value()) { RTC_DCHECK_EQ(static_cast(*last_sent_seq_ + 1), packet->SequenceNumber()); } last_sent_seq_ = packet->SequenceNumber(); } else if (packet->Ssrc() == rtx_ssrc_) { if (last_sent_rtx_seq_.has_value()) { RTC_DCHECK_EQ(static_cast(*last_sent_rtx_seq_ + 1), packet->SequenceNumber()); } last_sent_rtx_seq_ = packet->SequenceNumber(); } RTC_DCHECK(packet->packet_type().has_value()); RTC_DCHECK(HasCorrectSsrc(*packet)); if (packet->packet_type() == RtpPacketMediaType::kRetransmission) { RTC_DCHECK(packet->retransmitted_sequence_number().has_value()); } const Timestamp now = clock_->CurrentTime(); #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE BweTestLoggingPlot(now, packet->Ssrc()); #endif if (need_rtp_packet_infos_ && packet->packet_type() == RtpPacketToSend::Type::kVideo) { // Last packet of a frame, add it to sequence number info map. const uint32_t timestamp = packet->Timestamp() - timestamp_offset_; rtp_sequence_number_map_->InsertPacket( packet->SequenceNumber(), RtpSequenceNumberMap::Info( timestamp, packet->is_first_packet_of_frame(), packet->Marker())); } if (fec_generator_ && packet->fec_protect_packet()) { // This packet should be protected by FEC, add it to packet generator. RTC_DCHECK(fec_generator_); RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo); absl::optional> new_fec_params; new_fec_params.swap(pending_fec_params_); if (new_fec_params) { fec_generator_->SetProtectionParameters(new_fec_params->first, new_fec_params->second); } if (packet->is_red()) { RtpPacketToSend unpacked_packet(*packet); const rtc::CopyOnWriteBuffer buffer = packet->Buffer(); // Grab media payload type from RED header. const size_t headers_size = packet->headers_size(); unpacked_packet.SetPayloadType(buffer[headers_size]); // Copy the media payload into the unpacked buffer. uint8_t* payload_buffer = unpacked_packet.SetPayloadSize(packet->payload_size() - 1); std::copy(&packet->payload()[0] + 1, &packet->payload()[0] + packet->payload_size(), payload_buffer); fec_generator_->AddPacketAndGenerateFec(unpacked_packet); } else { // If not RED encapsulated - we can just insert packet directly. fec_generator_->AddPacketAndGenerateFec(*packet); } } // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after // the pacer, these modifications of the header below are happening after the // FEC protection packets are calculated. This will corrupt recovered packets // at the same place. It's not an issue for extensions, which are present in // all the packets (their content just may be incorrect on recovered packets). // In case of VideoTimingExtension, since it's present not in every packet, // data after rtp header may be corrupted if these packets are protected by // the FEC. if (packet->HasExtension() && packet->capture_time() > Timestamp::Zero()) { TimeDelta diff = now - packet->capture_time(); packet->SetExtension(kTimestampTicksPerMs * diff.ms()); } if (packet->HasExtension()) { packet->SetExtension(AbsoluteSendTime::To24Bits(now)); } if (packet->HasExtension()) { if (populate_network2_timestamp_) { packet->set_network2_time(now); } else { packet->set_pacer_exit_time(now); } } auto compound_packet = Packet{std::move(packet), pacing_info, now}; if (enable_send_packet_batching_ && !is_audio_) { packets_to_send_.push_back(std::move(compound_packet)); } else { CompleteSendPacket(compound_packet, false); } } void RtpSenderEgress::OnBatchComplete() { RTC_DCHECK_RUN_ON(worker_queue_); for (auto& packet : packets_to_send_) { CompleteSendPacket(packet, &packet == &packets_to_send_.back()); } packets_to_send_.clear(); } void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet, bool last_in_batch) { RTC_DCHECK_RUN_ON(worker_queue_); auto& [packet, pacing_info, now] = compound_packet; RTC_CHECK(packet); const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio || packet->packet_type() == RtpPacketMediaType::kVideo; PacketOptions options; options.included_in_allocation = force_part_of_allocation_; // Downstream code actually uses this flag to distinguish between media and // everything else. options.is_retransmit = !is_media; absl::optional packet_id = packet->GetExtension(); if (packet_id.has_value()) { options.packet_id = *packet_id; options.included_in_feedback = true; options.included_in_allocation = true; AddPacketToTransportFeedback(*packet_id, *packet, pacing_info); } options.additional_data = packet->additional_data(); if (packet->packet_type() != RtpPacketMediaType::kPadding && packet->packet_type() != RtpPacketMediaType::kRetransmission && send_packet_observer_ != nullptr && packet->capture_time().IsFinite()) { send_packet_observer_->OnSendPacket(packet_id, packet->capture_time(), packet->Ssrc()); } options.batchable = enable_send_packet_batching_ && !is_audio_; options.last_packet_in_batch = last_in_batch; const bool send_success = SendPacketToNetwork(*packet, options, pacing_info); // Put packet in retransmission history or update pending status even if // actual sending fails. if (is_media && packet->allow_retransmission()) { packet_history_->PutRtpPacket(std::make_unique(*packet), now); } else if (packet->retransmitted_sequence_number()) { packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number()); } if (send_success) { // `media_has_been_sent_` is used by RTPSender to figure out if it can send // padding in the absence of transport-cc or abs-send-time. // In those cases media must be sent first to set a reference timestamp. media_has_been_sent_ = true; // TODO(sprang): Add support for FEC protecting all header extensions, add // media packet to generator here instead. RTC_DCHECK(packet->packet_type().has_value()); RtpPacketMediaType packet_type = *packet->packet_type(); RtpPacketCounter counter(*packet); UpdateRtpStats(now, packet->Ssrc(), packet_type, std::move(counter), packet->size()); } } RtpSendRates RtpSenderEgress::GetSendRates(Timestamp now) const { RTC_DCHECK_RUN_ON(worker_queue_); RtpSendRates current_rates; for (size_t i = 0; i < kNumMediaTypes; ++i) { RtpPacketMediaType type = static_cast(i); current_rates[type] = send_rates_[i].Rate(now).value_or(DataRate::Zero()); } return current_rates; } void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats, StreamDataCounters* rtx_stats) const { RTC_DCHECK_RUN_ON(worker_queue_); *rtp_stats = rtp_stats_; *rtx_stats = rtx_rtp_stats_; } void RtpSenderEgress::ForceIncludeSendPacketsInAllocation( bool part_of_allocation) { RTC_DCHECK_RUN_ON(worker_queue_); force_part_of_allocation_ = part_of_allocation; } bool RtpSenderEgress::MediaHasBeenSent() const { RTC_DCHECK_RUN_ON(worker_queue_); return media_has_been_sent_; } void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) { RTC_DCHECK_RUN_ON(worker_queue_); media_has_been_sent_ = media_sent; } void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) { RTC_DCHECK_RUN_ON(worker_queue_); timestamp_offset_ = timestamp; } std::vector RtpSenderEgress::GetSentRtpPacketInfos( rtc::ArrayView sequence_numbers) const { RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(!sequence_numbers.empty()); if (!need_rtp_packet_infos_) { return std::vector(); } std::vector results; results.reserve(sequence_numbers.size()); for (uint16_t sequence_number : sequence_numbers) { const auto& info = rtp_sequence_number_map_->Get(sequence_number); if (!info) { // The empty vector will be returned. We can delay the clearing // of the vector until after we exit the critical section. return std::vector(); } results.push_back(*info); } return results; } void RtpSenderEgress::SetFecProtectionParameters( const FecProtectionParams& delta_params, const FecProtectionParams& key_params) { RTC_DCHECK_RUN_ON(worker_queue_); pending_fec_params_.emplace(delta_params, key_params); } std::vector> RtpSenderEgress::FetchFecPackets() { RTC_DCHECK_RUN_ON(worker_queue_); if (fec_generator_) { return fec_generator_->GetFecPackets(); } return {}; } void RtpSenderEgress::OnAbortedRetransmissions( rtc::ArrayView sequence_numbers) { RTC_DCHECK_RUN_ON(worker_queue_); // Mark aborted retransmissions as sent, rather than leaving them in // a 'pending' state - otherwise they can not be requested again and // will not be cleared until the history has reached its max size. for (uint16_t seq_no : sequence_numbers) { packet_history_->MarkPacketAsSent(seq_no); } } bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const { switch (*packet.packet_type()) { case RtpPacketMediaType::kAudio: case RtpPacketMediaType::kVideo: return packet.Ssrc() == ssrc_; case RtpPacketMediaType::kRetransmission: case RtpPacketMediaType::kPadding: // Both padding and retransmission must be on either the media or the // RTX stream. return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_; case RtpPacketMediaType::kForwardErrorCorrection: // FlexFEC is on separate SSRC, ULPFEC uses media SSRC. return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_; } return false; } void RtpSenderEgress::AddPacketToTransportFeedback( uint16_t packet_id, const RtpPacketToSend& packet, const PacedPacketInfo& pacing_info) { if (transport_feedback_observer_) { RtpPacketSendInfo packet_info; packet_info.transport_sequence_number = packet_id; packet_info.rtp_timestamp = packet.Timestamp(); packet_info.length = packet.size(); packet_info.pacing_info = pacing_info; packet_info.packet_type = packet.packet_type(); switch (*packet_info.packet_type) { case RtpPacketMediaType::kAudio: case RtpPacketMediaType::kVideo: packet_info.media_ssrc = ssrc_; packet_info.rtp_sequence_number = packet.SequenceNumber(); break; case RtpPacketMediaType::kRetransmission: // For retransmissions, we're want to remove the original media packet // if the retransmit arrives - so populate that in the packet info. packet_info.media_ssrc = ssrc_; packet_info.rtp_sequence_number = *packet.retransmitted_sequence_number(); break; case RtpPacketMediaType::kPadding: case RtpPacketMediaType::kForwardErrorCorrection: // We're not interested in feedback about these packets being received // or lost. break; } transport_feedback_observer_->OnAddPacket(packet_info); } } bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet, const PacketOptions& options, const PacedPacketInfo& pacing_info) { RTC_DCHECK_RUN_ON(worker_queue_); int bytes_sent = -1; if (transport_) { bytes_sent = transport_->SendRtp(packet, options) ? static_cast(packet.size()) : -1; if (event_log_ && bytes_sent > 0) { event_log_->Log(std::make_unique( packet, pacing_info.probe_cluster_id)); } } if (bytes_sent <= 0) { RTC_LOG(LS_WARNING) << "Transport failed to send packet."; return false; } return true; } void RtpSenderEgress::UpdateRtpStats(Timestamp now, uint32_t packet_ssrc, RtpPacketMediaType packet_type, RtpPacketCounter counter, size_t packet_size) { RTC_DCHECK_RUN_ON(worker_queue_); // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the // worker thread. RtpSendRates send_rates; StreamDataCounters* counters = packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_; counters->MaybeSetFirstPacketTime(now); if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) { counters->fec.Add(counter); } else if (packet_type == RtpPacketMediaType::kRetransmission) { counters->retransmitted.Add(counter); } counters->transmitted.Add(counter); send_rates_[static_cast(packet_type)].Update(packet_size, now); if (bitrate_callback_) { send_rates = GetSendRates(now); } if (rtp_stats_callback_) { rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc); } // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point // to the same object, so these callbacks could be consolidated into one. if (bitrate_callback_) { bitrate_callback_->Notify( send_rates.Sum().bps(), send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); } } void RtpSenderEgress::PeriodicUpdate() { RTC_DCHECK_RUN_ON(worker_queue_); RTC_DCHECK(bitrate_callback_); RtpSendRates send_rates = GetSendRates(clock_->CurrentTime()); bitrate_callback_->Notify( send_rates.Sum().bps(), send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_); } #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE void RtpSenderEgress::BweTestLoggingPlot(Timestamp now, uint32_t packet_ssrc) { RTC_DCHECK_RUN_ON(worker_queue_); const auto rates = GetSendRates(now); if (is_audio_) { BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now.ms(), rates.Sum().kbps(), packet_ssrc); BWE_TEST_LOGGING_PLOT_WITH_SSRC( 1, "AudioNackBitrate_kbps", now.ms(), rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc); } else { BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now.ms(), rates.Sum().kbps(), packet_ssrc); BWE_TEST_LOGGING_PLOT_WITH_SSRC( 1, "VideoNackBitrate_kbps", now.ms(), rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc); } } #endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE } // namespace webrtc