diff options
Diffstat (limited to 'dom/media/webrtc/transportbridge')
-rw-r--r-- | dom/media/webrtc/transportbridge/MediaPipeline.cpp | 1682 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/MediaPipeline.h | 425 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp | 153 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/MediaPipelineFilter.h | 89 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/RtpLogger.cpp | 67 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/RtpLogger.h | 28 | ||||
-rw-r--r-- | dom/media/webrtc/transportbridge/moz.build | 27 |
7 files changed, 2471 insertions, 0 deletions
diff --git a/dom/media/webrtc/transportbridge/MediaPipeline.cpp b/dom/media/webrtc/transportbridge/MediaPipeline.cpp new file mode 100644 index 0000000000..6cb0afc36e --- /dev/null +++ b/dom/media/webrtc/transportbridge/MediaPipeline.cpp @@ -0,0 +1,1682 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: ekr@rtfm.com + +#include "MediaPipeline.h" + +#include <inttypes.h> +#include <math.h> +#include <sstream> + +#include "AudioSegment.h" +#include "AudioConverter.h" +#include "DOMMediaStream.h" +#include "ImageContainer.h" +#include "ImageTypes.h" +#include "MediaEngine.h" +#include "MediaSegment.h" +#include "MediaTrackGraphImpl.h" +#include "MediaTrackListener.h" +#include "MediaStreamTrack.h" +#include "jsapi/RemoteTrackSource.h" +#include "RtpLogger.h" +#include "VideoFrameConverter.h" +#include "VideoSegment.h" +#include "VideoStreamTrack.h" +#include "VideoUtils.h" +#include "mozilla/Logging.h" +#include "mozilla/NullPrincipal.h" +#include "mozilla/PeerIdentity.h" +#include "mozilla/Preferences.h" +#include "mozilla/SharedThreadPool.h" +#include "mozilla/Sprintf.h" +#include "mozilla/StaticPrefs_media.h" +#include "mozilla/TaskQueue.h" +#include "mozilla/UniquePtr.h" +#include "mozilla/UniquePtrExtensions.h" +#include "mozilla/dom/RTCStatsReportBinding.h" +#include "mozilla/dom/Document.h" +#include "mozilla/gfx/Point.h" +#include "mozilla/gfx/Types.h" +#include "nsError.h" +#include "nsThreadUtils.h" +#include "transport/runnable_utils.h" +#include "jsapi/MediaTransportHandler.h" +#include "jsapi/PeerConnectionImpl.h" +#include "Tracing.h" +#include "libwebrtcglue/WebrtcImageBuffer.h" +#include "common_video/include/video_frame_buffer.h" +#include "modules/rtp_rtcp/include/rtp_rtcp.h" +#include "modules/rtp_rtcp/include/rtp_header_extension_map.h" +#include "modules/rtp_rtcp/source/rtp_packet_received.h" + +// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at +// 48KHz) +#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2) +static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <= + AUDIO_SAMPLE_BUFFER_MAX_BYTES, + "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough"); + +using namespace mozilla; +using namespace mozilla::dom; +using namespace mozilla::gfx; +using namespace mozilla::layers; + +mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline"); + +namespace mozilla { + +// An async inserter for audio data, to avoid running audio codec encoders +// on the MTG/input audio thread. Basically just bounces all the audio +// data to a single audio processing/input queue. We could if we wanted to +// use multiple threads and a TaskQueue. +class AudioProxyThread { + public: + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread) + + explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit) + : mConduit(std::move(aConduit)), + mTaskQueue(TaskQueue::Create( + GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")), + mAudioConverter(nullptr) { + MOZ_ASSERT(mConduit); + MOZ_COUNT_CTOR(AudioProxyThread); + } + + // This function is the identity if aInputRate is supported. + // Else, it returns a rate that is supported, that ensure no loss in audio + // quality: the sampling rate returned is always greater to the inputed + // sampling-rate, if they differ.. + uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) { + AudioSessionConduit* conduit = + static_cast<AudioSessionConduit*>(mConduit.get()); + if (conduit->IsSamplingFreqSupported(aInputRate)) { + return aInputRate; + } + if (aInputRate < 16000) { + return 16000; + } + if (aInputRate < 32000) { + return 32000; + } + if (aInputRate < 44100) { + return 44100; + } + return 48000; + } + + // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into + // something the conduit can work with (or send silence if the track is not + // enabled), and send the audio in 10ms chunks to the conduit. + void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk, + bool aEnabled) { + MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn()); + + // Convert to interleaved 16-bits integer audio, with a maximum of two + // channels (since the WebRTC.org code below makes the assumption that the + // input audio is either mono or stereo), with a sample-rate rate that is + // 16, 32, 44.1, or 48kHz. + uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2; + int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate); + + // We take advantage of the fact that the common case (microphone directly + // to PeerConnection, that is, a normal call), the samples are already + // 16-bits mono, so the representation in interleaved and planar is the + // same, and we can just use that. + if (aEnabled && outputChannels == 1 && + aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) { + const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0]; + PacketizeAndSend(samples, transmissionRate, outputChannels, + aChunk.mDuration); + return; + } + + uint32_t sampleCount = aChunk.mDuration * outputChannels; + if (mInterleavedAudio.Length() < sampleCount) { + mInterleavedAudio.SetLength(sampleCount); + } + + if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) { + PodZero(mInterleavedAudio.Elements(), sampleCount); + } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) { + DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration, + aChunk.mVolume, outputChannels, + mInterleavedAudio.Elements()); + } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) { + DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration, + aChunk.mVolume, outputChannels, + mInterleavedAudio.Elements()); + } + int16_t* inputAudio = mInterleavedAudio.Elements(); + size_t inputAudioFrameCount = aChunk.mDuration; + + AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate, + AudioConfig::FORMAT_S16); + AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels), + transmissionRate, AudioConfig::FORMAT_S16); + // Resample to an acceptable sample-rate for the sending side + if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig || + mAudioConverter->OutputConfig() != outputConfig) { + mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig); + } + + int16_t* processedAudio = nullptr; + size_t framesProcessed = + mAudioConverter->Process(inputAudio, inputAudioFrameCount); + + if (framesProcessed == 0) { + // In place conversion not possible, use a buffer. + framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio, + inputAudioFrameCount); + processedAudio = mOutputAudio.Data(); + } else { + processedAudio = inputAudio; + } + + PacketizeAndSend(processedAudio, transmissionRate, outputChannels, + framesProcessed); + } + + // This packetizes aAudioData in 10ms chunks and sends it. + // aAudioData is interleaved audio data at a rate and with a channel count + // that is appropriate to send with the conduit. + void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate, + uint32_t aChannels, uint32_t aFrameCount) { + MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate); + MOZ_ASSERT(aChannels == 1 || aChannels == 2); + MOZ_ASSERT(aAudioData); + + uint32_t audio_10ms = aRate / 100; + + if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms || + mPacketizer->mChannels != aChannels) { + // It's the right thing to drop the bit of audio still in the packetizer: + // we don't want to send to the conduit audio that has two different + // rates while telling it that it has a constante rate. + mPacketizer = + MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels); + mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels); + } + + mPacketizer->Input(aAudioData, aFrameCount); + + while (mPacketizer->PacketsAvailable()) { + mPacketizer->Output(mPacket.get()); + auto frame = std::make_unique<webrtc::AudioFrame>(); + // UpdateFrame makes a copy of the audio data. + frame->UpdateFrame(frame->timestamp_, mPacket.get(), + mPacketizer->mPacketSize, aRate, frame->speech_type_, + frame->vad_activity_, mPacketizer->mChannels); + mConduit->SendAudioFrame(std::move(frame)); + } + } + + void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk, + bool aEnabled) { + RefPtr<AudioProxyThread> self = this; + nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction( + "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() { + self->InternalProcessAudioChunk(aRate, aChunk, aEnabled); + })); + MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv)); + Unused << rv; + } + + protected: + virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); } + + const RefPtr<AudioSessionConduit> mConduit; + const RefPtr<TaskQueue> mTaskQueue; + // Only accessed on mTaskQueue + UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer; + // A buffer to hold a single packet of audio. + UniquePtr<int16_t[]> mPacket; + nsTArray<int16_t> mInterleavedAudio; + AlignedShortBuffer mOutputAudio; + UniquePtr<AudioConverter> mAudioConverter; +}; + +MediaPipeline::MediaPipeline(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + DirectionType aDirection, + RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<MediaSessionConduit> aConduit) + : mConduit(std::move(aConduit)), + mDirection(aDirection), + mCallThread(std::move(aCallThread)), + mStsThread(aStsThread), + mActive(false, "MediaPipeline::mActive"), + mLevel(0), + mTransportHandler(std::move(aTransportHandler)), + mRtpPacketsSent(0), + mRtcpPacketsSent(0), + mRtpPacketsReceived(0), + mRtcpPacketsReceived(0), + mRtpBytesSent(0), + mRtpBytesReceived(0), + mPc(aPc), + mFilter(), + mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()), + mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {} + +MediaPipeline::~MediaPipeline() { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("Destroying MediaPipeline: %s", mDescription.c_str())); +} + +void MediaPipeline::Start() { + MOZ_ASSERT(NS_IsMainThread()); + mActive = true; +} + +void MediaPipeline::Stop() { + MOZ_ASSERT(NS_IsMainThread()); + mActive = false; +} + +void MediaPipeline::Shutdown() { + MOZ_ASSERT(NS_IsMainThread()); + Stop(); + + RUN_ON_THREAD(mStsThread, + WrapRunnable(RefPtr<MediaPipeline>(this), + &MediaPipeline::DetachTransport_s), + NS_DISPATCH_NORMAL); +} + +void MediaPipeline::DetachTransport_s() { + ASSERT_ON_THREAD(mStsThread); + + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("%s in %s", mDescription.c_str(), __FUNCTION__)); + + disconnect_all(); + mRtpState = TransportLayer::TS_NONE; + mRtcpState = TransportLayer::TS_NONE; + mTransportId.clear(); + mConduit->SetTransportActive(false); + mRtpSendEventListener.DisconnectIfExists(); + mSenderRtcpSendEventListener.DisconnectIfExists(); + mReceiverRtcpSendEventListener.DisconnectIfExists(); +} + +void MediaPipeline::UpdateTransport_m( + const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) { + mStsThread->Dispatch(NS_NewRunnableFunction( + __func__, [aTransportId, filter = std::move(aFilter), + self = RefPtr<MediaPipeline>(this)]() mutable { + self->UpdateTransport_s(aTransportId, std::move(filter)); + })); +} + +void MediaPipeline::UpdateTransport_s( + const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) { + ASSERT_ON_THREAD(mStsThread); + if (!mSignalsConnected) { + mTransportHandler->SignalStateChange.connect( + this, &MediaPipeline::RtpStateChange); + mTransportHandler->SignalRtcpStateChange.connect( + this, &MediaPipeline::RtcpStateChange); + mTransportHandler->SignalEncryptedSending.connect( + this, &MediaPipeline::EncryptedPacketSending); + mTransportHandler->SignalPacketReceived.connect( + this, &MediaPipeline::PacketReceived); + mTransportHandler->SignalAlpnNegotiated.connect( + this, &MediaPipeline::AlpnNegotiated); + mSignalsConnected = true; + } + + if (aTransportId != mTransportId) { + mTransportId = aTransportId; + mRtpState = mTransportHandler->GetState(mTransportId, false); + mRtcpState = mTransportHandler->GetState(mTransportId, true); + CheckTransportStates(); + } + + if (mFilter) { + for (const auto& extension : mFilter->GetExtmap()) { + mRtpHeaderExtensionMap->Deregister(extension.uri); + } + } + if (mFilter && aFilter) { + // Use the new filter, but don't forget any remote SSRCs that we've learned + // by receiving traffic. + mFilter->Update(*aFilter); + } else { + mFilter = std::move(aFilter); + } + if (mFilter) { + for (const auto& extension : mFilter->GetExtmap()) { + mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri); + } + } +} + +void MediaPipeline::GetContributingSourceStats( + const nsString& aInboundRtpStreamId, + FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const { + ASSERT_ON_THREAD(mStsThread); + // Get the expiry from now + DOMHighResTimeStamp expiry = + RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow()); + for (auto info : mCsrcStats) { + if (!info.second.Expired(expiry)) { + RTCRTPContributingSourceStats stats; + info.second.GetWebidlInstance(stats, aInboundRtpStreamId); + if (!aArr.AppendElement(stats, fallible)) { + mozalloc_handle_oom(0); + } + } + } +} + +void MediaPipeline::RtpStateChange(const std::string& aTransportId, + TransportLayer::State aState) { + if (mTransportId != aTransportId) { + return; + } + mRtpState = aState; + CheckTransportStates(); +} + +void MediaPipeline::RtcpStateChange(const std::string& aTransportId, + TransportLayer::State aState) { + if (mTransportId != aTransportId) { + return; + } + mRtcpState = aState; + CheckTransportStates(); +} + +void MediaPipeline::CheckTransportStates() { + ASSERT_ON_THREAD(mStsThread); + + if (mRtpState == TransportLayer::TS_CLOSED || + mRtpState == TransportLayer::TS_ERROR || + mRtcpState == TransportLayer::TS_CLOSED || + mRtcpState == TransportLayer::TS_ERROR) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Warning, + ("RTP Transport failed for pipeline %p flow %s", this, + mDescription.c_str())); + + NS_WARNING( + "MediaPipeline Transport failed. This is not properly cleaned up yet"); + // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the + // connection was good and now it is bad. + // TODO(ekr@rtfm.com): Report up so that the PC knows we + // have experienced an error. + mConduit->SetTransportActive(false); + mRtpSendEventListener.DisconnectIfExists(); + mSenderRtcpSendEventListener.DisconnectIfExists(); + mReceiverRtcpSendEventListener.DisconnectIfExists(); + return; + } + + if (mRtpState == TransportLayer::TS_OPEN) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("RTP Transport ready for pipeline %p flow %s", this, + mDescription.c_str())); + } + + if (mRtcpState == TransportLayer::TS_OPEN) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("RTCP Transport ready for pipeline %p flow %s", this, + mDescription.c_str())); + } + + if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) { + if (mDirection == DirectionType::TRANSMIT) { + mConduit->ConnectSenderRtcpEvent(mSenderRtcpReceiveEvent); + mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect( + mStsThread, this, &MediaPipeline::SendPacket); + mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect( + mStsThread, this, &MediaPipeline::SendPacket); + } else { + mConduit->ConnectReceiverRtcpEvent(mReceiverRtcpReceiveEvent); + mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent); + mReceiverRtcpSendEventListener = + mConduit->ReceiverRtcpSendEvent().Connect(mStsThread, this, + &MediaPipeline::SendPacket); + } + mConduit->SetTransportActive(true); + TransportReady_s(); + } +} + +void MediaPipeline::SendPacket(MediaPacket&& aPacket) { + ASSERT_ON_THREAD(mStsThread); + + const bool isRtp = aPacket.type() == MediaPacket::RTP; + + if (isRtp && mRtpState != TransportLayer::TS_OPEN) { + return; + } + + if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) { + return; + } + + aPacket.sdp_level() = Some(Level()); + + if (RtpLogger::IsPacketLoggingOn()) { + RtpLogger::LogPacket(aPacket, false, mDescription); + } + + if (isRtp) { + mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true, + aPacket.data(), aPacket.len()); + IncrementRtpPacketsSent(aPacket); + } else { + mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true, + aPacket.data(), aPacket.len()); + IncrementRtcpPacketsSent(); + } + + MOZ_LOG( + gMediaPipelineLog, LogLevel::Debug, + ("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP"))); + + mTransportHandler->SendPacket(mTransportId, std::move(aPacket)); +} + +void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) { + ASSERT_ON_THREAD(mStsThread); + ++mRtpPacketsSent; + mRtpBytesSent += aPacket.len(); + + if (!(mRtpPacketsSent % 100)) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)", + mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent)); + } +} + +void MediaPipeline::IncrementRtcpPacketsSent() { + ASSERT_ON_THREAD(mStsThread); + ++mRtcpPacketsSent; + if (!(mRtcpPacketsSent % 100)) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("RTCP sent packet count for %s Pipeline %p: %u", + mDescription.c_str(), this, mRtcpPacketsSent)); + } +} + +void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) { + ASSERT_ON_THREAD(mStsThread); + ++mRtpPacketsReceived; + mRtpBytesReceived += aBytes; + if (!(mRtpPacketsReceived % 100)) { + MOZ_LOG( + gMediaPipelineLog, LogLevel::Info, + ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)", + mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived)); + } +} + +void MediaPipeline::IncrementRtcpPacketsReceived() { + ASSERT_ON_THREAD(mStsThread); + ++mRtcpPacketsReceived; + if (!(mRtcpPacketsReceived % 100)) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("RTCP received packet count for %s Pipeline %p: %u", + mDescription.c_str(), this, mRtcpPacketsReceived)); + } +} + +void MediaPipeline::RtpPacketReceived(const MediaPacket& packet) { + ASSERT_ON_THREAD(mStsThread); + + if (mDirection == DirectionType::TRANSMIT) { + return; + } + + if (!packet.len()) { + return; + } + + webrtc::RTPHeader header; + rtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len()); + webrtc::RtpPacketReceived pktHeader(mRtpHeaderExtensionMap.get()); + if (!pktHeader.Parse(packet_buffer)) { + return; + } + pktHeader.GetHeader(&header); + + if (mFilter && !mFilter->Filter(header)) { + return; + } + + // Make sure to only get the time once, and only if we need it by + // using getTimestamp() for access + DOMHighResTimeStamp now = 0.0; + bool hasTime = false; + + // Remove expired RtpCSRCStats + if (!mCsrcStats.empty()) { + if (!hasTime) { + now = GetTimestampMaker().GetNow(); + hasTime = true; + } + auto expiry = RtpCSRCStats::GetExpiryFromTime(now); + for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) { + if (p->second.Expired(expiry)) { + p = mCsrcStats.erase(p); + continue; + } + p++; + } + } + + // Add new RtpCSRCStats + if (header.numCSRCs) { + for (auto i = 0; i < header.numCSRCs; i++) { + if (!hasTime) { + now = GetTimestampMaker().GetNow(); + hasTime = true; + } + auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]); + if (csrcInfo == mCsrcStats.end()) { + mCsrcStats.insert(std::make_pair( + header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now))); + } else { + csrcInfo->second.SetTimestamp(now); + } + } + } + + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("%s received RTP packet.", mDescription.c_str())); + IncrementRtpPacketsReceived(packet.len()); + OnRtpPacketReceived(); + + RtpLogger::LogPacket(packet, true, mDescription); + + // Might be nice to pass ownership of the buffer in this case, but it is a + // small optimization in a rare case. + mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false, + packet.encrypted_data(), packet.encrypted_len()); + + mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(), + packet.len()); + + mRtpReceiveEvent.Notify(packet.Clone(), header); +} + +void MediaPipeline::RtcpPacketReceived(const MediaPacket& packet) { + ASSERT_ON_THREAD(mStsThread); + + if (!packet.len()) { + return; + } + + // We do not filter RTCP. This is because a compound RTCP packet can contain + // any collection of RTCP packets, and webrtc.org already knows how to filter + // out what it is interested in, and what it is not. Maybe someday we should + // have a TransportLayer that breaks up compound RTCP so we can filter them + // individually, but I doubt that will matter much. + + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("%s received RTCP packet.", mDescription.c_str())); + IncrementRtcpPacketsReceived(); + + RtpLogger::LogPacket(packet, true, mDescription); + + // Might be nice to pass ownership of the buffer in this case, but it is a + // small optimization in a rare case. + mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false, + packet.encrypted_data(), packet.encrypted_len()); + + mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false, + packet.data(), packet.len()); + + if (StaticPrefs::media_webrtc_net_force_disable_rtcp_reception()) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("%s RTCP packet forced to be dropped", mDescription.c_str())); + return; + } + + if (mDirection == DirectionType::TRANSMIT) { + mSenderRtcpReceiveEvent.Notify(packet.Clone()); + } else { + mReceiverRtcpReceiveEvent.Notify(packet.Clone()); + } +} + +void MediaPipeline::PacketReceived(const std::string& aTransportId, + const MediaPacket& packet) { + ASSERT_ON_THREAD(mStsThread); + + if (mTransportId != aTransportId) { + return; + } + + MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN); + MOZ_ASSERT(mRtcpState == mRtpState); + + switch (packet.type()) { + case MediaPacket::RTP: + RtpPacketReceived(packet); + break; + case MediaPacket::RTCP: + RtcpPacketReceived(packet); + break; + default:; + } +} + +void MediaPipeline::AlpnNegotiated(const std::string& aAlpn, + bool aPrivacyRequested) { + ASSERT_ON_THREAD(mStsThread); + + if (aPrivacyRequested) { + MakePrincipalPrivate_s(); + } +} + +void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId, + const MediaPacket& aPacket) { + ASSERT_ON_THREAD(mStsThread); + + if (mTransportId == aTransportId) { + dom::mozPacketDumpType type; + if (aPacket.type() == MediaPacket::SRTP) { + type = dom::mozPacketDumpType::Srtp; + } else if (aPacket.type() == MediaPacket::SRTCP) { + type = dom::mozPacketDumpType::Srtcp; + } else if (aPacket.type() == MediaPacket::DTLS) { + // TODO(bug 1497936): Implement packet dump for DTLS + return; + } else { + MOZ_ASSERT(false); + return; + } + mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len()); + } +} + +class MediaPipelineTransmit::PipelineListener + : public DirectMediaTrackListener { + friend class MediaPipelineTransmit; + + public: + explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit) + : mConduit(std::move(aConduit)), + mActive(false), + mEnabled(false), + mDirectConnect(false) {} + + ~PipelineListener() { + if (mConverter) { + mConverter->Shutdown(); + } + } + + void SetActive(bool aActive) { + mActive = aActive; + if (mConverter) { + mConverter->SetActive(aActive); + } + } + void SetEnabled(bool aEnabled) { mEnabled = aEnabled; } + + // These are needed since nested classes don't have access to any particular + // instance of the parent + void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) { + mAudioProcessing = std::move(aProxy); + } + + void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) { + mConverter = std::move(aConverter); + } + + void OnVideoFrameConverted(webrtc::VideoFrame aVideoFrame) { + MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO); + static_cast<VideoSessionConduit*>(mConduit.get()) + ->SendVideoFrame(std::move(aVideoFrame)); + } + + // Implement MediaTrackListener + void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset, + const MediaSegment& aQueuedMedia) override; + void NotifyEnabledStateChanged(MediaTrackGraph* aGraph, + bool aEnabled) override; + + // Implement DirectMediaTrackListener + void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset, + const MediaSegment& aMedia) override; + void NotifyDirectListenerInstalled(InstallationResult aResult) override; + void NotifyDirectListenerUninstalled() override; + + private: + void NewData(const MediaSegment& aMedia, TrackRate aRate = 0); + + const RefPtr<MediaSessionConduit> mConduit; + RefPtr<AudioProxyThread> mAudioProcessing; + RefPtr<VideoFrameConverter> mConverter; + + // active is true if there is a transport to send on + mozilla::Atomic<bool> mActive; + // enabled is true if the media access control permits sending + // actual content; when false you get black/silence + mozilla::Atomic<bool> mEnabled; + + // Written and read on the MediaTrackGraph thread + bool mDirectConnect; +}; + +MediaPipelineTransmit::MediaPipelineTransmit( + const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread, + bool aIsVideo, RefPtr<MediaSessionConduit> aConduit) + : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT, + std::move(aCallThread), std::move(aStsThread), + std::move(aConduit)), + mWatchManager(this, AbstractThread::MainThread()), + mIsVideo(aIsVideo), + mListener(new PipelineListener(mConduit)), + mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"), + mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") { + if (!IsVideo()) { + mAudioProcessing = + MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit()); + mListener->SetAudioProxy(mAudioProcessing); + } else { // Video + mConverter = MakeAndAddRef<VideoFrameConverter>(GetTimestampMaker()); + mFrameListener = mConverter->VideoFrameConvertedEvent().Connect( + mConverter->mTaskQueue, + [listener = mListener](webrtc::VideoFrame aFrame) { + listener->OnVideoFrameConverted(std::move(aFrame)); + }); + mListener->SetVideoFrameConverter(mConverter); + } + + mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState); + mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState); + mWatchManager.Watch(mSendTrackOverride, + &MediaPipelineTransmit::UpdateSendState); + + mDescription = GenerateDescription(); +} + +MediaPipelineTransmit::~MediaPipelineTransmit() { + mFrameListener.DisconnectIfExists(); + + MOZ_ASSERT(!mTransmitting); + MOZ_ASSERT(!mDomTrack.Ref()); +} + +void MediaPipelineTransmit::Shutdown() { + MediaPipeline::Shutdown(); + MOZ_ASSERT(!mActive); + mWatchManager.Shutdown(); + if (mDomTrack.Ref()) { + mDomTrack.Ref()->RemovePrincipalChangeObserver(this); + mDomTrack = nullptr; + } + mUnsettingSendTrack = false; + UpdateSendState(); + MOZ_ASSERT(!mTransmitting); +} + +void MediaPipeline::SetDescription_s(const std::string& description) { + ASSERT_ON_THREAD(mStsThread); + mDescription = description; +} + +std::string MediaPipelineTransmit::GenerateDescription() const { + MOZ_ASSERT(NS_IsMainThread()); + + std::stringstream description; + description << mPc << "| "; + description << (mIsVideo ? "Transmit video[" : "Transmit audio["); + + if (mDomTrack.Ref()) { + nsString nsTrackId; + mDomTrack.Ref()->GetId(nsTrackId); + description << NS_ConvertUTF16toUTF8(nsTrackId).get(); + } else if (mSendTrackOverride.Ref()) { + description << "override " << mSendTrackOverride.Ref().get(); + } else { + description << "no track"; + } + + description << "]"; + + return description.str(); +} + +void MediaPipelineTransmit::UpdateSendState() { + MOZ_ASSERT(NS_IsMainThread()); + + // This runs because either mActive, mDomTrack or mSendTrackOverride changed, + // or because mSendTrack was unset async. Based on these inputs this method + // is responsible for hooking up mSendTrack to mListener in order to feed data + // to the conduit. + // + // If we are inactive, or if the send track does not match what we want to + // send (mDomTrack or mSendTrackOverride), we must stop feeding data to the + // conduit. NB that removing the listener from mSendTrack is async, and we + // must wait for it to resolve before adding mListener to another track. + // mUnsettingSendTrack gates us until the listener has been removed from + // mSendTrack. + // + // If we are active and the send track does match what we want to send, we + // make sure mListener is added to the send track. Either now, or if we're + // still waiting for another send track to be removed, during a future call to + // this method. + + if (mUnsettingSendTrack) { + // We must wait for the send track to be unset before we can set it again, + // to avoid races. Once unset this function is triggered again. + return; + } + + const bool wasTransmitting = mTransmitting; + + const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed(); + const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended(); + const bool haveLiveOverrideTrack = + mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed(); + const bool mustRemoveSendTrack = + haveLiveSendTrack && !mSendTrackOverride.Ref() && + (!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource); + + mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) && + !mustRemoveSendTrack; + + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, " + "sendTrack=%p (%s), domTrack=%p (%s), " + "sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d", + this, wasTransmitting, mActive.Ref(), mSendTrack.get(), + haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(), + haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(), + haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack, + mTransmitting)); + + if (!wasTransmitting && mTransmitting) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("Attaching pipeline %p to track %p conduit type=%s", this, + mDomTrack.Ref().get(), mIsVideo ? "video" : "audio")); + if (mDescriptionInvalidated) { + // Only update the description when we attach to a track, as detaching is + // always a longer async step than updating the description. Updating on + // detach would cause the wrong track id to be attributed in logs. + RUN_ON_THREAD(mStsThread, + WrapRunnable(RefPtr<MediaPipeline>(this), + &MediaPipelineTransmit::SetDescription_s, + GenerateDescription()), + NS_DISPATCH_NORMAL); + mDescriptionInvalidated = false; + } + if (mSendTrackOverride.Ref()) { + // Special path that allows unittests to avoid mDomTrack and the graph by + // manually calling SetSendTrack. + mSendTrack = mSendTrackOverride.Ref(); + } else { + mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack( + mDomTrack.Ref()->GetTrack()->mType); + mSendPortSource = mDomTrack.Ref()->GetTrack(); + mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get()); + } + if (mIsVideo) { + mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId); + } + mSendTrack->QueueSetAutoend(false); + if (mIsVideo) { + mSendTrack->AddDirectListener(mListener); + } + mSendTrack->AddListener(mListener); + } + + if (wasTransmitting && !mTransmitting) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("Detaching pipeline %p from track %p conduit type=%s", this, + mDomTrack.Ref().get(), mIsVideo ? "video" : "audio")); + mUnsettingSendTrack = true; + if (mIsVideo) { + mSendTrack->RemoveDirectListener(mListener); + } + mSendTrack->RemoveListener(mListener)->Then( + GetMainThreadSerialEventTarget(), __func__, + [this, self = RefPtr<MediaPipelineTransmit>(this)] { + mUnsettingSendTrack = false; + mSendTrack = nullptr; + if (!mWatchManager.IsShutdown()) { + mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState); + } + }); + if (!mSendTrackOverride.Ref()) { + // If an override is set it may be re-used. + mSendTrack->Destroy(); + mSendPort->Destroy(); + mSendPort = nullptr; + mSendPortSource = nullptr; + } + } +} + +bool MediaPipelineTransmit::Transmitting() const { + MOZ_ASSERT(NS_IsMainThread()); + + return mActive; +} + +bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; } + +void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) { + MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref()); + + PeerConnectionWrapper pcw(mPc); + if (pcw.impl()) { + Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc(); + if (doc) { + UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity()); + } else { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("Can't update sink principal; document gone")); + } + } +} + +void MediaPipelineTransmit::UpdateSinkIdentity( + nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) { + MOZ_ASSERT(NS_IsMainThread()); + + if (!mDomTrack.Ref()) { + // Nothing to do here + return; + } + + bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal()); + if (!enableTrack) { + // first try didn't work, but there's a chance that this is still available + // if our track is bound to a peerIdentity, and the peer connection (our + // sink) is bound to the same identity, then we can enable the track. + const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity(); + if (aSinkIdentity && trackIdentity) { + enableTrack = (*aSinkIdentity == *trackIdentity); + } + } + + mListener->SetEnabled(enableTrack); +} + +void MediaPipelineTransmit::TransportReady_s() { + ASSERT_ON_THREAD(mStsThread); + // Call base ready function. + MediaPipeline::TransportReady_s(); + mListener->SetActive(true); +} + +nsresult MediaPipelineTransmit::SetTrack(RefPtr<MediaStreamTrack> aDomTrack) { + MOZ_ASSERT(NS_IsMainThread()); + if (mDomTrack.Ref()) { + mDomTrack.Ref()->RemovePrincipalChangeObserver(this); + } + + if (aDomTrack) { + nsString nsTrackId; + aDomTrack->GetId(nsTrackId); + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("Reattaching pipeline to track %p track %s conduit type: %s", + aDomTrack.get(), NS_ConvertUTF16toUTF8(nsTrackId).get(), + mIsVideo ? "video" : "audio")); + } + + mDescriptionInvalidated = true; + mDomTrack = std::move(aDomTrack); + if (mDomTrack.Ref()) { + mDomTrack.Ref()->AddPrincipalChangeObserver(this); + PrincipalChanged(mDomTrack.Ref()); + } + + return NS_OK; +} + +RefPtr<dom::MediaStreamTrack> MediaPipelineTransmit::GetTrack() const { + MOZ_ASSERT(NS_IsMainThread()); + return mDomTrack; +} + +void MediaPipelineTransmit::SetSendTrackOverride( + RefPtr<ProcessedMediaTrack> aSendTrack) { + MOZ_ASSERT(NS_IsMainThread()); + MOZ_RELEASE_ASSERT(!mSendTrack); + MOZ_RELEASE_ASSERT(!mSendPort); + MOZ_RELEASE_ASSERT(!mSendTrackOverride.Ref()); + mDescriptionInvalidated = true; + mSendTrackOverride = std::move(aSendTrack); +} + +// Called if we're attached with AddDirectListener() +void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData( + MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) { + MOZ_LOG( + gMediaPipelineLog, LogLevel::Debug, + ("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64 + ", duration=%" PRId64, + this, aOffset, aMedia.GetDuration())); + TRACE_COMMENT( + "MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData", "%s", + aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio"); + NewData(aMedia, aGraph->GraphRate()); +} + +void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges( + MediaTrackGraph* aGraph, TrackTime aOffset, + const MediaSegment& aQueuedMedia) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("MediaPipeline::NotifyQueuedChanges()")); + + if (aQueuedMedia.GetType() == MediaSegment::VIDEO) { + // We always get video from the direct listener. + return; + } + + TRACE("MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges (Audio)"); + + if (mDirectConnect) { + // ignore non-direct data if we're also getting direct data + return; + } + + size_t rate; + if (aGraph) { + rate = aGraph->GraphRate(); + } else { + // When running tests, graph may be null. In that case use a default. + rate = 16000; + } + NewData(aQueuedMedia, rate); +} + +void MediaPipelineTransmit::PipelineListener::NotifyEnabledStateChanged( + MediaTrackGraph* aGraph, bool aEnabled) { + if (mConduit->type() != MediaSessionConduit::VIDEO) { + return; + } + MOZ_ASSERT(mConverter); + mConverter->SetTrackEnabled(aEnabled); +} + +void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled( + InstallationResult aResult) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Info, + ("MediaPipeline::NotifyDirectListenerInstalled() listener=%p," + " result=%d", + this, static_cast<int32_t>(aResult))); + + mDirectConnect = InstallationResult::SUCCESS == aResult; +} + +void MediaPipelineTransmit::PipelineListener:: + NotifyDirectListenerUninstalled() { + MOZ_LOG( + gMediaPipelineLog, LogLevel::Info, + ("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p", this)); + + if (mConduit->type() == MediaSessionConduit::VIDEO) { + // Reset the converter's track-enabled state. If re-added to a new track + // later and that track is disabled, we will be signaled explicitly. + MOZ_ASSERT(mConverter); + mConverter->SetTrackEnabled(true); + } + + mDirectConnect = false; +} + +void MediaPipelineTransmit::PipelineListener::NewData( + const MediaSegment& aMedia, TrackRate aRate /* = 0 */) { + if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO + ? MediaSessionConduit::AUDIO + : MediaSessionConduit::VIDEO)) { + MOZ_ASSERT(false, + "The media type should always be correct since the " + "listener is locked to a specific track"); + return; + } + + // TODO(ekr@rtfm.com): For now assume that we have only one + // track type and it's destined for us + // See bug 784517 + if (aMedia.GetType() == MediaSegment::AUDIO) { + MOZ_RELEASE_ASSERT(aRate > 0); + + if (!mActive) { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("Discarding audio packets because transport not ready")); + return; + } + + const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia); + for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded(); + iter.Next()) { + mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled); + } + } else { + const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia); + + for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded(); + iter.Next()) { + mConverter->QueueVideoChunk(*iter, !mEnabled); + } + } +} + +class GenericReceiveListener : public MediaTrackListener { + public: + explicit GenericReceiveListener(const RefPtr<dom::MediaStreamTrack>& aTrack) + : mTrackSource(new nsMainThreadPtrHolder<RemoteTrackSource>( + "GenericReceiveListener::mTrackSource", + &static_cast<RemoteTrackSource&>(aTrack->GetSource()))), + mSource(mTrackSource->mStream), + mTrackingId(mTrackSource->mTrackingId), + mIsAudio(aTrack->AsAudioStreamTrack()), + mEnabled(false), + mMaybeTrackNeedsUnmute(true) { + MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread()); + MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack"); + } + + virtual ~GenericReceiveListener() = default; + + void SetEnabled(bool aEnabled) { + if (mEnabled == aEnabled) { + return; + } + mEnabled = aEnabled; + if (aEnabled) { + mMaybeTrackNeedsUnmute = true; + } + if (mIsAudio && !mSource->IsDestroyed()) { + mSource->SetPullingEnabled(mEnabled); + } + } + + void OnRtpReceived() { + if (mMaybeTrackNeedsUnmute) { + mMaybeTrackNeedsUnmute = false; + GetMainThreadEventTarget()->Dispatch( + NewRunnableMethod("GenericReceiveListener::OnRtpReceived_m", this, + &GenericReceiveListener::OnRtpReceived_m)); + } + } + + void OnRtpReceived_m() { + if (mEnabled) { + mTrackSource->SetMuted(false); + } + } + + void EndTrack() { + MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, + ("GenericReceiveListener ending track")); + + if (!mSource->IsDestroyed()) { + // This breaks the cycle with the SourceMediaTrack + mSource->RemoveListener(this); + mSource->End(); + mSource->Destroy(); + } + + GetMainThreadEventTarget()->Dispatch( + NewRunnableMethod("RemoteTrackSource::ForceEnded", mTrackSource.get(), + &RemoteTrackSource::ForceEnded)); + } + + protected: + const nsMainThreadPtrHandle<RemoteTrackSource> mTrackSource; + const RefPtr<SourceMediaTrack> mSource; + const TrackingId mTrackingId; + const bool mIsAudio; + // Main thread only. + bool mEnabled; + // Any thread. + Atomic<bool> mMaybeTrackNeedsUnmute; +}; + +MediaPipelineReceive::MediaPipelineReceive( + const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<MediaSessionConduit> aConduit) + : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE, + std::move(aCallThread), std::move(aStsThread), + std::move(aConduit)) {} + +MediaPipelineReceive::~MediaPipelineReceive() = default; + +class MediaPipelineReceiveAudio::PipelineListener + : public GenericReceiveListener { + public: + PipelineListener(const RefPtr<dom::MediaStreamTrack>& aTrack, + RefPtr<MediaSessionConduit> aConduit, + const PrincipalHandle& aPrincipalHandle) + : GenericReceiveListener(aTrack), + mConduit(std::move(aConduit)), + // AudioSession conduit only supports 16, 32, 44.1 and 48kHz + // This is an artificial limitation, it would however require more + // changes to support any rates. If the sampling rate is not-supported, + // we will use 48kHz instead. + mRate(static_cast<AudioSessionConduit*>(mConduit.get()) + ->IsSamplingFreqSupported(mSource->Graph()->GraphRate()) + ? mSource->Graph()->GraphRate() + : WEBRTC_MAX_SAMPLE_RATE), + mTaskQueue(TaskQueue::Create( + GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), + "AudioPipelineListener")), + mPlayedTicks(0), + mAudioFrame(std::make_unique<webrtc::AudioFrame>()), + mPrincipalHandle(aPrincipalHandle), + mForceSilence(false) {} + + void Init() { + mSource->SetAppendDataSourceRate(mRate); + mSource->AddListener(this); + } + + // Implement MediaTrackListener + void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData, + TrackTime aDesiredTime) override { + NotifyPullImpl(aDesiredTime); + } + + void MakePrincipalPrivate_s() { + mForceSilence = true; + + GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction( + "MediaPipelineReceiveAudio::PipelineListener::MakePrincipalPrivate_s", + [self = RefPtr<PipelineListener>(this), this] { + class Message : public ControlMessage { + public: + Message(RefPtr<PipelineListener> aListener, + const PrincipalHandle& aPrivatePrincipal) + : ControlMessage(nullptr), + mListener(std::move(aListener)), + mPrivatePrincipal(aPrivatePrincipal) {} + + void Run() override { + mListener->mPrincipalHandle = mPrivatePrincipal; + mListener->mForceSilence = false; + } + + const RefPtr<PipelineListener> mListener; + PrincipalHandle mPrivatePrincipal; + }; + + RefPtr<nsIPrincipal> privatePrincipal = + NullPrincipal::CreateWithInheritedAttributes( + mTrackSource->GetPrincipal()); + mTrackSource->SetPrincipal(privatePrincipal); + + if (mSource->IsDestroyed()) { + return; + } + + mSource->GraphImpl()->AppendMessage( + MakeUnique<Message>(this, MakePrincipalHandle(privatePrincipal))); + })); + } + + private: + ~PipelineListener() = default; + + void NotifyPullImpl(TrackTime aDesiredTime) { + TRACE_COMMENT("PiplineListener::NotifyPullImpl", "PipelineListener %p", + this); + uint32_t samplesPer10ms = mRate / 100; + + // mSource's rate is not necessarily the same as the graph rate, since there + // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and + // 48kHz are supported. The audio frames we get here is going to be + // resampled when inserted into the graph. aDesiredTime and mPlayedTicks are + // in the graph rate. + + while (mPlayedTicks < aDesiredTime) { + // This fetches 10ms of data, either mono or stereo + MediaConduitErrorCode err = + static_cast<AudioSessionConduit*>(mConduit.get()) + ->GetAudioFrame(mRate, mAudioFrame.get()); + + if (err != kMediaConduitNoError) { + // Insert silence on conduit/GIPS failure (extremely unlikely) + MOZ_LOG(gMediaPipelineLog, LogLevel::Error, + ("Audio conduit failed (%d) to return data @ %" PRId64 + " (desired %" PRId64 " -> %f)", + err, mPlayedTicks, aDesiredTime, + mSource->TrackTimeToSeconds(aDesiredTime))); + constexpr size_t mono = 1; + mAudioFrame->UpdateFrame( + mAudioFrame->timestamp_, nullptr, samplesPer10ms, mRate, + mAudioFrame->speech_type_, mAudioFrame->vad_activity_, + std::max(mono, mAudioFrame->num_channels())); + } + + MOZ_LOG( + gMediaPipelineLog, LogLevel::Debug, + ("Audio conduit returned buffer for %zu channels, %zu frames", + mAudioFrame->num_channels(), mAudioFrame->samples_per_channel())); + + AudioSegment segment; + if (mForceSilence || mAudioFrame->muted()) { + segment.AppendNullData(mAudioFrame->samples_per_channel()); + } else { + CheckedInt<size_t> bufferSize(sizeof(uint16_t)); + bufferSize *= mAudioFrame->samples_per_channel(); + bufferSize *= mAudioFrame->num_channels(); + RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize); + int16_t* samplesData = static_cast<int16_t*>(samples->Data()); + AutoTArray<int16_t*, 2> channels; + AutoTArray<const int16_t*, 2> outputChannels; + + channels.SetLength(mAudioFrame->num_channels()); + + size_t offset = 0; + for (size_t i = 0; i < mAudioFrame->num_channels(); i++) { + channels[i] = samplesData + offset; + offset += mAudioFrame->samples_per_channel(); + } + + DeinterleaveAndConvertBuffer( + mAudioFrame->data(), mAudioFrame->samples_per_channel(), + mAudioFrame->num_channels(), channels.Elements()); + + outputChannels.AppendElements(channels); + + segment.AppendFrames(samples.forget(), outputChannels, + mAudioFrame->samples_per_channel(), + mPrincipalHandle); + } + + // Handle track not actually added yet or removed/finished + if (TrackTime appended = mSource->AppendData(&segment)) { + mPlayedTicks += appended; + } else { + MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed")); + // we can't un-read the data, but that's ok since we don't want to + // buffer - but don't i-loop! + break; + } + } + } + + const RefPtr<MediaSessionConduit> mConduit; + // This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and + // tries to be the same as the graph rate. If the graph rate is higher than + // 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate, + // audio is resampled to the graph rate. + const TrackRate mRate; + const RefPtr<TaskQueue> mTaskQueue; + // Number of frames of data that has been added to the SourceMediaTrack in + // the graph's rate. Graph thread only. + TrackTicks mPlayedTicks; + // Allocation of an audio frame used as a scratch buffer when reading data out + // of libwebrtc for forwarding into the graph. Graph thread only. + std::unique_ptr<webrtc::AudioFrame> mAudioFrame; + // Principal handle used when appending data to the SourceMediaTrack. Graph + // thread only. + PrincipalHandle mPrincipalHandle; + // Set to true on the sts thread if privacy is requested when ALPN was + // negotiated. Set to false again when mPrincipalHandle is private. + Atomic<bool> mForceSilence; +}; + +MediaPipelineReceiveAudio::MediaPipelineReceiveAudio( + const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<AudioSessionConduit> aConduit, + const RefPtr<dom::MediaStreamTrack>& aTrack, + const PrincipalHandle& aPrincipalHandle) + : MediaPipelineReceive(aPc, std::move(aTransportHandler), + std::move(aCallThread), std::move(aStsThread), + std::move(aConduit)), + mListener(aTrack + ? new PipelineListener(aTrack, mConduit, aPrincipalHandle) + : nullptr), + mWatchManager(this, AbstractThread::MainThread()) { + mDescription = mPc + "| Receive audio"; + if (mListener) { + mListener->Init(); + } + mWatchManager.Watch(mActive, &MediaPipelineReceiveAudio::UpdateListener); +} + +void MediaPipelineReceiveAudio::Shutdown() { + MOZ_ASSERT(NS_IsMainThread()); + MediaPipeline::Shutdown(); + mWatchManager.Shutdown(); + if (mListener) { + mListener->EndTrack(); + } +} + +void MediaPipelineReceiveAudio::MakePrincipalPrivate_s() { + ASSERT_ON_THREAD(mStsThread); + if (mListener) { + mListener->MakePrincipalPrivate_s(); + } +} + +void MediaPipelineReceiveAudio::OnRtpPacketReceived() { + ASSERT_ON_THREAD(mStsThread); + if (mListener) { + mListener->OnRtpReceived(); + } +} + +void MediaPipelineReceiveAudio::UpdateListener() { + MOZ_ASSERT(NS_IsMainThread()); + if (mListener) { + mListener->SetEnabled(mActive.Ref()); + } +} + +class MediaPipelineReceiveVideo::PipelineListener + : public GenericReceiveListener { + public: + PipelineListener(const RefPtr<dom::MediaStreamTrack>& aTrack, + const PrincipalHandle& aPrincipalHandle) + : GenericReceiveListener(aTrack), + mImageContainer( + MakeAndAddRef<ImageContainer>(ImageContainer::ASYNCHRONOUS)), + mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"), + mPrincipalHandle(aPrincipalHandle) {} + + void Init() { mSource->AddListener(this); } + + void MakePrincipalPrivate_s() { + { + MutexAutoLock lock(mMutex); + mForceDropFrames = true; + } + + GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction( + __func__, [self = RefPtr<PipelineListener>(this), this] { + RefPtr<nsIPrincipal> privatePrincipal = + NullPrincipal::CreateWithInheritedAttributes( + mTrackSource->GetPrincipal()); + mTrackSource->SetPrincipal(privatePrincipal); + + MutexAutoLock lock(mMutex); + mPrincipalHandle = MakePrincipalHandle(privatePrincipal); + mForceDropFrames = false; + })); + } + + void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer, + uint32_t aTimeStamp, int64_t aRenderTime) { + PrincipalHandle principal; + { + MutexAutoLock lock(mMutex); + if (mForceDropFrames) { + return; + } + principal = mPrincipalHandle; + } + RefPtr<Image> image; + if (aBuffer.type() == webrtc::VideoFrameBuffer::Type::kNative) { + // We assume that only native handles are used with the + // WebrtcMediaDataCodec decoder. + const ImageBuffer* imageBuffer = + static_cast<const ImageBuffer*>(&aBuffer); + image = imageBuffer->GetNativeImage(); + } else { + MOZ_ASSERT(aBuffer.type() == webrtc::VideoFrameBuffer::Type::kI420); + rtc::scoped_refptr<const webrtc::I420BufferInterface> i420( + aBuffer.GetI420()); + + MOZ_ASSERT(i420->DataY()); + // Create a video frame using |buffer|. + PerformanceRecorder<CopyVideoStage> rec( + "MediaPipelineReceiveVideo::CopyToImage"_ns, mTrackingId, + i420->width(), i420->height()); + + RefPtr<PlanarYCbCrImage> yuvImage = + mImageContainer->CreatePlanarYCbCrImage(); + + PlanarYCbCrData yuvData; + yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY()); + yuvData.mYStride = i420->StrideY(); + MOZ_ASSERT(i420->StrideU() == i420->StrideV()); + yuvData.mCbCrStride = i420->StrideU(); + yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU()); + yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV()); + yuvData.mPictureRect = IntRect(0, 0, i420->width(), i420->height()); + yuvData.mStereoMode = StereoMode::MONO; + // This isn't the best default. + yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601; + yuvData.mChromaSubsampling = + gfx::ChromaSubsampling::HALF_WIDTH_AND_HEIGHT; + + if (!yuvImage->CopyData(yuvData)) { + MOZ_ASSERT(false); + return; + } + rec.Record(); + + image = std::move(yuvImage); + } + + VideoSegment segment; + auto size = image->GetSize(); + segment.AppendFrame(image.forget(), size, principal); + mSource->AppendData(&segment); + } + + private: + RefPtr<layers::ImageContainer> mImageContainer; + Mutex mMutex MOZ_UNANNOTATED; // Protects the below members. + PrincipalHandle mPrincipalHandle; + // Set to true on the sts thread if privacy is requested when ALPN was + // negotiated. Set to false again when mPrincipalHandle is private. + bool mForceDropFrames = false; +}; + +class MediaPipelineReceiveVideo::PipelineRenderer + : public mozilla::VideoRenderer { + public: + explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline) + : mPipeline(aPipeline) {} + + void Detach() { mPipeline = nullptr; } + + // Implement VideoRenderer + void FrameSizeChange(unsigned int aWidth, unsigned int aHeight) override {} + void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer, + uint32_t aTimeStamp, int64_t aRenderTime) override { + mPipeline->mListener->RenderVideoFrame(aBuffer, aTimeStamp, aRenderTime); + } + + private: + MediaPipelineReceiveVideo* mPipeline; // Raw pointer to avoid cycles +}; + +MediaPipelineReceiveVideo::MediaPipelineReceiveVideo( + const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<VideoSessionConduit> aConduit, + const RefPtr<dom::MediaStreamTrack>& aTrack, + const PrincipalHandle& aPrincipalHandle) + : MediaPipelineReceive(aPc, std::move(aTransportHandler), + std::move(aCallThread), std::move(aStsThread), + std::move(aConduit)), + mRenderer(new PipelineRenderer(this)), + mListener(aTrack ? new PipelineListener(aTrack, aPrincipalHandle) + : nullptr), + mWatchManager(this, AbstractThread::MainThread()) { + mDescription = mPc + "| Receive video"; + if (mListener) { + mListener->Init(); + } + static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer); + mWatchManager.Watch(mActive, &MediaPipelineReceiveVideo::UpdateListener); +} + +void MediaPipelineReceiveVideo::Shutdown() { + MOZ_ASSERT(NS_IsMainThread()); + MediaPipeline::Shutdown(); + mWatchManager.Shutdown(); + + // stop generating video and thus stop invoking the PipelineRenderer + // and PipelineListener - the renderer has a raw ptr to the Pipeline to + // avoid cycles, and the render callbacks are invoked from a different + // thread so simple null-checks would cause TSAN bugs without locks. + static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer(); + if (mListener) { + mListener->EndTrack(); + } +} + +void MediaPipelineReceiveVideo::MakePrincipalPrivate_s() { + ASSERT_ON_THREAD(mStsThread); + if (mListener) { + mListener->MakePrincipalPrivate_s(); + } +} + +void MediaPipelineReceiveVideo::OnRtpPacketReceived() { + ASSERT_ON_THREAD(mStsThread); + if (mListener) { + mListener->OnRtpReceived(); + } +} + +void MediaPipelineReceiveVideo::UpdateListener() { + MOZ_ASSERT(NS_IsMainThread()); + if (mListener) { + mListener->SetEnabled(mActive.Ref()); + } +} + +const dom::RTCStatsTimestampMaker& MediaPipeline::GetTimestampMaker() const { + return mConduit->GetTimestampMaker(); +} + +DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime( + const DOMHighResTimeStamp aTime) { + // DOMHighResTimeStamp is a unit measured in ms + return aTime + EXPIRY_TIME_MILLISECONDS; +} + +MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc, + const DOMHighResTimeStamp aTime) + : mCsrc(aCsrc), mTimestamp(aTime) {} + +void MediaPipeline::RtpCSRCStats::GetWebidlInstance( + dom::RTCRTPContributingSourceStats& aWebidlObj, + const nsString& aInboundRtpStreamId) const { + nsString statId = u"csrc_"_ns + aInboundRtpStreamId; + statId.AppendLiteral("_"); + statId.AppendInt(mCsrc); + aWebidlObj.mId.Construct(statId); + aWebidlObj.mType.Construct(RTCStatsType::Csrc); + aWebidlObj.mTimestamp.Construct(mTimestamp); + aWebidlObj.mContributorSsrc.Construct(mCsrc); + aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId); +} + +} // namespace mozilla diff --git a/dom/media/webrtc/transportbridge/MediaPipeline.h b/dom/media/webrtc/transportbridge/MediaPipeline.h new file mode 100644 index 0000000000..6fe3f5f287 --- /dev/null +++ b/dom/media/webrtc/transportbridge/MediaPipeline.h @@ -0,0 +1,425 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: ekr@rtfm.com + +#ifndef mediapipeline_h__ +#define mediapipeline_h__ + +#include <map> + +#include "transport/sigslot.h" +#include "transport/transportlayer.h" // For TransportLayer::State + +#include "libwebrtcglue/MediaConduitInterface.h" +#include "mozilla/ReentrantMonitor.h" +#include "mozilla/Atomics.h" +#include "mozilla/StateMirroring.h" +#include "transport/mediapacket.h" +#include "transport/runnable_utils.h" +#include "AudioPacketizer.h" +#include "MediaEventSource.h" +#include "MediaPipelineFilter.h" +#include "MediaSegment.h" +#include "PrincipalChangeObserver.h" +#include "jsapi/PacketDumper.h" + +#include "modules/rtp_rtcp/include/rtp_header_extension_map.h" + +// Should come from MediaEngine.h, but that's a pain to include here +// because of the MOZILLA_EXTERNAL_LINKAGE stuff. +#define WEBRTC_MAX_SAMPLE_RATE 48000 + +class nsIPrincipal; + +namespace mozilla { +class AudioProxyThread; +class MediaInputPort; +class MediaPipelineFilter; +class MediaTransportHandler; +class PeerIdentity; +class ProcessedMediaTrack; +class SourceMediaTrack; +class VideoFrameConverter; + +namespace dom { +class MediaStreamTrack; +struct RTCRTPContributingSourceStats; +} // namespace dom + +// A class that represents the pipeline of audio and video +// The dataflow looks like: +// +// TRANSMIT +// CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network +// +// RECEIVE +// network -> transport -> [us] -> conduit -> [us] -> stream -> Playout +// +// The boxes labeled [us] are just bridge logic implemented in this class +// +// We have to deal with a number of threads: +// +// GSM: +// * Assembles the pipeline +// SocketTransportService +// * Receives notification that ICE and DTLS have completed +// * Processes incoming network data and passes it to the conduit +// * Processes outgoing RTP and RTCP +// MediaTrackGraph +// * Receives outgoing data from the MediaTrackGraph +// * Receives pull requests for more data from the +// MediaTrackGraph +// One or another GIPS threads +// * Receives RTCP messages to send to the other side +// * Processes video frames GIPS wants to render +// +// For a transmitting conduit, "output" is RTP and "input" is RTCP. +// For a receiving conduit, "input" is RTP and "output" is RTCP. +// + +class MediaPipeline : public sigslot::has_slots<> { + public: + enum class DirectionType { TRANSMIT, RECEIVE }; + MediaPipeline(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + DirectionType aDirection, RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<MediaSessionConduit> aConduit); + + void Start(); + void Stop(); + + void SetLevel(size_t aLevel) { mLevel = aLevel; } + + // Main thread shutdown. + virtual void Shutdown(); + + void UpdateTransport_m(const std::string& aTransportId, + UniquePtr<MediaPipelineFilter>&& aFilter); + + void UpdateTransport_s(const std::string& aTransportId, + UniquePtr<MediaPipelineFilter>&& aFilter); + + virtual DirectionType Direction() const { return mDirection; } + size_t Level() const { return mLevel; } + virtual bool IsVideo() const = 0; + + class RtpCSRCStats { + public: + // Gets an expiration time for CRC info given a reference time, + // this reference time would normally be the time of calling. + // This value can then be used to check if a RtpCSRCStats + // has expired via Expired(...) + static DOMHighResTimeStamp GetExpiryFromTime( + const DOMHighResTimeStamp aTime); + + RtpCSRCStats(const uint32_t aCsrc, const DOMHighResTimeStamp aTime); + ~RtpCSRCStats() = default; + // Initialize a webidl representation suitable for adding to a report. + // This assumes that the webidl object is empty. + // @param aWebidlObj the webidl binding object to popluate + // @param aInboundRtpStreamId the associated RTCInboundRTPStreamStats.id + void GetWebidlInstance(dom::RTCRTPContributingSourceStats& aWebidlObj, + const nsString& aInboundRtpStreamId) const; + void SetTimestamp(const DOMHighResTimeStamp aTime) { mTimestamp = aTime; } + // Check if the RtpCSRCStats has expired, checks against a + // given expiration time. + bool Expired(const DOMHighResTimeStamp aExpiry) const { + return mTimestamp < aExpiry; + } + + private: + static const double constexpr EXPIRY_TIME_MILLISECONDS = 10 * 1000; + const uint32_t mCsrc; + DOMHighResTimeStamp mTimestamp; + }; + + // Gets the gathered contributing source stats for the last expiration period. + // @param aId the stream id to use for populating inboundRtpStreamId field + // @param aArr the array to append the stats objects to + void GetContributingSourceStats( + const nsString& aInboundRtpStreamId, + FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const; + + int32_t RtpPacketsSent() const { return mRtpPacketsSent; } + int64_t RtpBytesSent() const { return mRtpBytesSent; } + int32_t RtcpPacketsSent() const { return mRtcpPacketsSent; } + int32_t RtpPacketsReceived() const { return mRtpPacketsReceived; } + int64_t RtpBytesReceived() const { return mRtpBytesReceived; } + int32_t RtcpPacketsReceived() const { return mRtcpPacketsReceived; } + + const dom::RTCStatsTimestampMaker& GetTimestampMaker() const; + + // Thread counting + NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline) + + protected: + virtual ~MediaPipeline(); + + // The transport is ready + virtual void TransportReady_s() {} + + void IncrementRtpPacketsSent(const MediaPacket& aPacket); + void IncrementRtcpPacketsSent(); + void IncrementRtpPacketsReceived(int aBytes); + virtual void OnRtpPacketReceived() {} + void IncrementRtcpPacketsReceived(); + + virtual void SendPacket(MediaPacket&& packet); + + // Process slots on transports + void RtpStateChange(const std::string& aTransportId, TransportLayer::State); + void RtcpStateChange(const std::string& aTransportId, TransportLayer::State); + virtual void CheckTransportStates(); + void PacketReceived(const std::string& aTransportId, + const MediaPacket& packet); + void AlpnNegotiated(const std::string& aAlpn, bool aPrivacyRequested); + + void RtpPacketReceived(const MediaPacket& packet); + void RtcpPacketReceived(const MediaPacket& packet); + + void EncryptedPacketSending(const std::string& aTransportId, + const MediaPacket& aPacket); + + void SetDescription_s(const std::string& description); + + // Called when ALPN is negotiated and is requesting privacy, so receive + // pipelines do not enter data into the graph under a content principal. + virtual void MakePrincipalPrivate_s() {} + + public: + const RefPtr<MediaSessionConduit> mConduit; + const DirectionType mDirection; + + // Pointers to the threads we need. Initialized at creation + // and used all over the place. + const RefPtr<AbstractThread> mCallThread; + const RefPtr<nsISerialEventTarget> mStsThread; + + protected: + // True if we should be actively transmitting or receiving data. Main thread + // only. + Watchable<bool> mActive; + Atomic<size_t> mLevel; + std::string mTransportId; + const RefPtr<MediaTransportHandler> mTransportHandler; + + TransportLayer::State mRtpState = TransportLayer::TS_NONE; + TransportLayer::State mRtcpState = TransportLayer::TS_NONE; + bool mSignalsConnected = false; + + // Only safe to access from STS thread. + int32_t mRtpPacketsSent; + int32_t mRtcpPacketsSent; + int32_t mRtpPacketsReceived; + int32_t mRtcpPacketsReceived; + int64_t mRtpBytesSent; + int64_t mRtpBytesReceived; + + // Only safe to access from STS thread. + std::map<uint32_t, RtpCSRCStats> mCsrcStats; + + // Written in c'tor. Read on STS and main thread. + const std::string mPc; + + // String describing this MediaPipeline for logging purposes. Only safe to + // access from STS thread. + std::string mDescription; + + // Written in c'tor, all following accesses are on the STS thread. + UniquePtr<MediaPipelineFilter> mFilter; + const UniquePtr<webrtc::RtpHeaderExtensionMap> mRtpHeaderExtensionMap; + + RefPtr<PacketDumper> mPacketDumper; + + MediaEventProducerExc<MediaPacket, webrtc::RTPHeader> mRtpReceiveEvent; + MediaEventProducerExc<MediaPacket> mSenderRtcpReceiveEvent; + MediaEventProducerExc<MediaPacket> mReceiverRtcpReceiveEvent; + + MediaEventListener mRtpSendEventListener; + MediaEventListener mSenderRtcpSendEventListener; + MediaEventListener mReceiverRtcpSendEventListener; + + private: + bool IsRtp(const unsigned char* aData, size_t aLen) const; + // Must be called on the STS thread. Must be called after Shutdown(). + void DetachTransport_s(); +}; + +// A specialization of pipeline for reading from an input device +// and transmitting to the network. +class MediaPipelineTransmit + : public MediaPipeline, + public dom::PrincipalChangeObserver<dom::MediaStreamTrack> { + public: + // Set aRtcpTransport to nullptr to use rtcp-mux + MediaPipelineTransmit(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, bool aIsVideo, + RefPtr<MediaSessionConduit> aConduit); + + void Shutdown() override; + + bool Transmitting() const; + + // written and used from MainThread + bool IsVideo() const override; + + // When the principal of the domtrack changes, it calls through to here + // so that we can determine whether to enable track transmission. + // In cases where the peer isn't yet identified, we disable the pipeline (not + // the stream, that would potentially affect others), so that it sends + // black/silence. Once the peer is identified, re-enable those streams. + virtual void UpdateSinkIdentity(nsIPrincipal* aPrincipal, + const PeerIdentity* aSinkIdentity); + + // for monitoring changes in track ownership + void PrincipalChanged(dom::MediaStreamTrack* aTrack) override; + + // Override MediaPipeline::TransportReady_s. + void TransportReady_s() override; + + // Replace a track with a different one. + nsresult SetTrack(RefPtr<dom::MediaStreamTrack> aDomTrack); + + // Used to correlate stats + RefPtr<dom::MediaStreamTrack> GetTrack() const; + + // For test use only. This allows a send track to be set without a + // corresponding dom track. + void SetSendTrackOverride(RefPtr<ProcessedMediaTrack> aSendTrack); + + // Separate classes to allow ref counting + class PipelineListener; + class VideoFrameFeeder; + + protected: + ~MediaPipelineTransmit(); + + // Updates mDescription (async) with information about the track we are + // transmitting. + std::string GenerateDescription() const; + + // Sets up mSendPort and mSendTrack to feed mConduit if we are transmitting + // and have a dom track but no send track. Main thread only. + void UpdateSendState(); + + private: + WatchManager<MediaPipelineTransmit> mWatchManager; + const bool mIsVideo; + const RefPtr<PipelineListener> mListener; + RefPtr<AudioProxyThread> mAudioProcessing; + RefPtr<VideoFrameConverter> mConverter; + MediaEventListener mFrameListener; + Watchable<RefPtr<dom::MediaStreamTrack>> mDomTrack; + // Input port connecting mDomTrack's MediaTrack to mSendTrack. + RefPtr<MediaInputPort> mSendPort; + // The source track of the mSendTrack. Main thread only. + RefPtr<ProcessedMediaTrack> mSendPortSource; + // True if a parameter affecting mDescription has changed. To avoid updating + // the description unnecessarily. Main thread only. + bool mDescriptionInvalidated = true; + // Set true once we trigger the async removal of mSendTrack. Set false once + // the async removal is done. Main thread only. + bool mUnsettingSendTrack = false; + // MediaTrack that we send over the network. This allows changing mDomTrack. + // Because changing mSendTrack is async and can be racy (when changing from a + // track in one graph to a track in another graph), it is set very strictly. + // If mSendTrack is null it can be set by UpdateSendState(). + // If it is non-null it can only be set to null, and only by the + // RemoveListener MozPromise handler, as seen in UpdateSendState. + RefPtr<ProcessedMediaTrack> mSendTrack; + // When this is set and we are active, this track will be used as mSendTrack. + // Allows unittests to insert a send track without requiring a dom track or a + // graph. Main thread only. + Watchable<RefPtr<ProcessedMediaTrack>> mSendTrackOverride; + // True when mSendTrack is set, not destroyed and mActive is true. mListener + // is attached to mSendTrack when this is true. Main thread only. + bool mTransmitting = false; +}; + +// A specialization of pipeline for reading from the network and +// rendering media. +class MediaPipelineReceive : public MediaPipeline { + public: + // Set aRtcpTransport to nullptr to use rtcp-mux + MediaPipelineReceive(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<MediaSessionConduit> aConduit); + + protected: + ~MediaPipelineReceive(); +}; + +// A specialization of pipeline for reading from the network and +// rendering audio. +class MediaPipelineReceiveAudio : public MediaPipelineReceive { + public: + MediaPipelineReceiveAudio(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<AudioSessionConduit> aConduit, + const RefPtr<dom::MediaStreamTrack>& aTrack, + const PrincipalHandle& aPrincipalHandle); + + void Shutdown() override; + + bool IsVideo() const override { return false; } + + void MakePrincipalPrivate_s() override; + + void OnRtpPacketReceived() override; + + private: + void UpdateListener(); + + // Separate class to allow ref counting + class PipelineListener; + + const RefPtr<PipelineListener> mListener; + WatchManager<MediaPipelineReceiveAudio> mWatchManager; +}; + +// A specialization of pipeline for reading from the network and +// rendering video. +class MediaPipelineReceiveVideo : public MediaPipelineReceive { + public: + MediaPipelineReceiveVideo(const std::string& aPc, + RefPtr<MediaTransportHandler> aTransportHandler, + RefPtr<AbstractThread> aCallThread, + RefPtr<nsISerialEventTarget> aStsThread, + RefPtr<VideoSessionConduit> aConduit, + const RefPtr<dom::MediaStreamTrack>& aTrack, + const PrincipalHandle& aPrincipalHandle); + + void Shutdown() override; + + bool IsVideo() const override { return true; } + + void MakePrincipalPrivate_s() override; + + void OnRtpPacketReceived() override; + + private: + void UpdateListener(); + + class PipelineRenderer; + friend class PipelineRenderer; + + // Separate class to allow ref counting + class PipelineListener; + + const RefPtr<PipelineRenderer> mRenderer; + const RefPtr<PipelineListener> mListener; + WatchManager<MediaPipelineReceiveVideo> mWatchManager; +}; + +} // namespace mozilla +#endif diff --git a/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp b/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp new file mode 100644 index 0000000000..1acb73e9f2 --- /dev/null +++ b/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp @@ -0,0 +1,153 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: softtabstop=2:shiftwidth=2:expandtab + * */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: bcampen@mozilla.com + +#include "MediaPipelineFilter.h" + +#include "api/rtp_headers.h" +#include "api/rtp_parameters.h" +#include "mozilla/Logging.h" + +// defined in MediaPipeline.cpp +extern mozilla::LazyLogModule gMediaPipelineLog; + +#define DEBUG_LOG(x) MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, x) + +namespace mozilla { +MediaPipelineFilter::MediaPipelineFilter( + const std::vector<webrtc::RtpExtension>& aExtMap) + : mExtMap(aExtMap) {} + +void MediaPipelineFilter::SetRemoteMediaStreamId( + const Maybe<std::string>& aMid) { + if (aMid != mRemoteMid) { + DEBUG_LOG(("MediaPipelineFilter added new remote RTP MID: '%s'.", + aMid.valueOr("").c_str())); + mRemoteMid = aMid; + mRemoteMidBindings.clear(); + } +} + +bool MediaPipelineFilter::Filter(const webrtc::RTPHeader& header) { + DEBUG_LOG(("MediaPipelineFilter inspecting seq# %u SSRC: %u", + header.sequenceNumber, header.ssrc)); + + auto fromStreamId = [](const std::string& aId) { + return Maybe<std::string>(aId.empty() ? Nothing() : Some(aId)); + }; + + // + // MID Based Filtering + // + + const auto mid = fromStreamId(header.extension.mid); + + // Check to see if a bound SSRC is moved to a new MID + if (mRemoteMidBindings.count(header.ssrc) == 1 && mid && mRemoteMid != mid) { + mRemoteMidBindings.erase(header.ssrc); + } + // Bind an SSRC if a matching MID is found + if (mid && mRemoteMid == mid) { + DEBUG_LOG(("MediaPipelineFilter learned SSRC: %u for MID: '%s'", + header.ssrc, mRemoteMid.value().c_str())); + mRemoteMidBindings.insert(header.ssrc); + } + // Check for matching MID + if (!mRemoteMidBindings.empty()) { + MOZ_ASSERT(mRemoteMid != Nothing()); + if (mRemoteMidBindings.count(header.ssrc) == 1) { + DEBUG_LOG( + ("MediaPipelineFilter SSRC: %u matched for MID: '%s'." + " passing packet", + header.ssrc, mRemoteMid.value().c_str())); + return true; + } + DEBUG_LOG( + ("MediaPipelineFilter SSRC: %u did not match bound SSRC(s) for" + " MID: '%s'. ignoring packet", + header.ssrc, mRemoteMid.value().c_str())); + for (const uint32_t ssrc : mRemoteMidBindings) { + DEBUG_LOG(("MID %s is associated with SSRC: %u", + mRemoteMid.value().c_str(), ssrc)); + } + return false; + } + + // + // RTP-STREAM-ID based filtering (for tests only) + // + + // + // Remote SSRC based filtering + // + + if (remote_ssrc_set_.count(header.ssrc)) { + DEBUG_LOG( + ("MediaPipelineFilter SSRC: %u matched remote SSRC set." + " passing packet", + header.ssrc)); + return true; + } + DEBUG_LOG( + ("MediaPipelineFilter SSRC: %u did not match any of %zu" + " remote SSRCS.", + header.ssrc, remote_ssrc_set_.size())); + + // + // PT, payload type, last ditch effort filtering + // + + if (payload_type_set_.count(header.payloadType)) { + DEBUG_LOG( + ("MediaPipelineFilter payload-type: %u matched %zu" + " unique payload type. learning ssrc. passing packet", + header.ssrc, remote_ssrc_set_.size())); + // Actual match. We need to update the ssrc map so we can route rtcp + // sender reports correctly (these use a different payload-type field) + AddRemoteSSRC(header.ssrc); + return true; + } + DEBUG_LOG( + ("MediaPipelineFilter payload-type: %u did not match any of %zu" + " unique payload-types.", + header.payloadType, payload_type_set_.size())); + DEBUG_LOG( + ("MediaPipelineFilter packet failed to match any criteria." + " ignoring packet")); + return false; +} + +void MediaPipelineFilter::AddRemoteSSRC(uint32_t ssrc) { + remote_ssrc_set_.insert(ssrc); +} + +void MediaPipelineFilter::AddUniquePT(uint8_t payload_type) { + payload_type_set_.insert(payload_type); +} + +void MediaPipelineFilter::Update(const MediaPipelineFilter& filter_update) { + // We will not stomp the remote_ssrc_set_ if the update has no ssrcs, + // because we don't want to unlearn any remote ssrcs unless the other end + // has explicitly given us a new set. + if (!filter_update.remote_ssrc_set_.empty()) { + remote_ssrc_set_ = filter_update.remote_ssrc_set_; + } + // We don't want to overwrite the learned binding unless we have changed MIDs + // or the update contains a MID binding. + if (!filter_update.mRemoteMidBindings.empty() || + (filter_update.mRemoteMid && filter_update.mRemoteMid != mRemoteMid)) { + mRemoteMid = filter_update.mRemoteMid; + mRemoteMidBindings = filter_update.mRemoteMidBindings; + } + payload_type_set_ = filter_update.payload_type_set_; + + // Use extmapping from new filter + mExtMap = filter_update.mExtMap; +} + +} // end namespace mozilla diff --git a/dom/media/webrtc/transportbridge/MediaPipelineFilter.h b/dom/media/webrtc/transportbridge/MediaPipelineFilter.h new file mode 100644 index 0000000000..9b40bceda8 --- /dev/null +++ b/dom/media/webrtc/transportbridge/MediaPipelineFilter.h @@ -0,0 +1,89 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: softtabstop=2:shiftwidth=2:expandtab + * */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: bcampen@mozilla.com + +#ifndef mediapipelinefilter_h__ +#define mediapipelinefilter_h__ + +#include <cstddef> +#include <stdint.h> +#include <string> + +#include <set> +#include <vector> + +#include "mozilla/Maybe.h" + +namespace webrtc { +struct RTPHeader; +struct RtpExtension; +} // namespace webrtc + +namespace mozilla { + +// TODO @@NG update documentation after initial review + +// A class that handles the work of filtering RTP packets that arrive at a +// MediaPipeline. This is primarily important for the use of BUNDLE (ie; +// multiple m-lines share the same RTP stream). There are three ways that this +// can work; +// +// 1) In our SDP, we include a media-level extmap parameter with a unique +// integer of our choosing, with the hope that the other side will include +// this value in a header in the first few RTP packets it sends us. This +// allows us to perform correlation in cases where the other side has not +// informed us of the ssrcs it will be sending (either because it did not +// include them in its SDP, or their SDP has not arrived yet) +// and also gives us the opportunity to learn SSRCs from packets so adorned. +// +// 2) If the remote endpoint includes SSRC media-level attributes in its SDP, +// we can simply use this information to populate the filter. The only +// shortcoming here is when RTP packets arrive before the answer does. See +// above. +// +// 3) As a fallback, we can try to use payload type IDs to perform correlation, +// but only when the type id is unique to this media section. +// This too allows us to learn about SSRCs (mostly useful for filtering +// sender reports later). +class MediaPipelineFilter { + public: + MediaPipelineFilter() = default; + explicit MediaPipelineFilter( + const std::vector<webrtc::RtpExtension>& aExtMap); + + // Checks whether this packet passes the filter, possibly updating the filter + // in the process (if the MID or payload types are used, they can teach + // the filter about ssrcs) + bool Filter(const webrtc::RTPHeader& header); + + void AddRemoteSSRC(uint32_t ssrc); + + void SetRemoteMediaStreamId(const Maybe<std::string>& aMid); + + // When a payload type id is unique to our media section, add it here. + void AddUniquePT(uint8_t payload_type); + + void Update(const MediaPipelineFilter& filter_update); + + std::vector<webrtc::RtpExtension> GetExtmap() const { return mExtMap; } + + private: + // The number of filters we manage here is quite small, so I am optimizing + // for readability. + std::set<uint32_t> remote_ssrc_set_; + std::set<uint8_t> payload_type_set_; + Maybe<std::string> mRemoteMid; + std::set<uint32_t> mRemoteMidBindings; + // RID extension can be set by tests and is sticky, the rest of + // the mapping is not. + std::vector<webrtc::RtpExtension> mExtMap; +}; + +} // end namespace mozilla + +#endif // mediapipelinefilter_h__ diff --git a/dom/media/webrtc/transportbridge/RtpLogger.cpp b/dom/media/webrtc/transportbridge/RtpLogger.cpp new file mode 100644 index 0000000000..aac1fad197 --- /dev/null +++ b/dom/media/webrtc/transportbridge/RtpLogger.cpp @@ -0,0 +1,67 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: nohlmeier@mozilla.com + +#include "RtpLogger.h" +#include "mozilla/Logging.h" + +#include <ctime> +#include <iomanip> +#include <sstream> +#ifdef _WIN32 +# include <time.h> +# include <sys/timeb.h> +#else +# include <sys/time.h> +#endif + +// Logging context +using namespace mozilla; + +mozilla::LazyLogModule gRtpLoggerLog("RtpLogger"); + +namespace mozilla { + +bool RtpLogger::IsPacketLoggingOn() { + return MOZ_LOG_TEST(gRtpLoggerLog, LogLevel::Debug); +} + +void RtpLogger::LogPacket(const MediaPacket& packet, bool input, + std::string desc) { + if (MOZ_LOG_TEST(gRtpLoggerLog, LogLevel::Debug)) { + bool isRtp = (packet.type() == MediaPacket::RTP); + std::stringstream ss; + /* This creates text2pcap compatible format, e.g.: + * RTCP_PACKET O 10:36:26.864934 000000 80 c8 00 06 6d ... + */ + ss << (input ? "I " : "O "); + std::time_t t = std::time(nullptr); + std::tm tm = *std::localtime(&t); + char buf[9]; + if (0 < strftime(buf, sizeof(buf), "%H:%M:%S", &tm)) { + ss << buf; + } + ss << std::setfill('0'); +#ifdef _WIN32 + struct timeb tb; + ftime(&tb); + ss << "." << (tb.millitm) << " "; +#else + struct timeval tv; + gettimeofday(&tv, NULL); + ss << "." << (tv.tv_usec) << " "; +#endif + ss << " 000000"; + ss << std::hex << std::setfill('0'); + for (size_t i = 0; i < packet.len(); ++i) { + ss << " " << std::setw(2) << (int)packet.data()[i]; + } + MOZ_LOG(gRtpLoggerLog, LogLevel::Debug, + ("%s%s%s", desc.c_str(), (isRtp ? " RTP_PACKET " : " RTCP_PACKET "), + ss.str().c_str())); + } +} + +} // namespace mozilla diff --git a/dom/media/webrtc/transportbridge/RtpLogger.h b/dom/media/webrtc/transportbridge/RtpLogger.h new file mode 100644 index 0000000000..fcfaede6e2 --- /dev/null +++ b/dom/media/webrtc/transportbridge/RtpLogger.h @@ -0,0 +1,28 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Original author: nohlmeier@mozilla.com + +#ifndef rtplogger_h__ +#define rtplogger_h__ + +#include "transport/mediapacket.h" + +namespace mozilla { + +/* This class logs RTP and RTCP packets in hex in a format compatible to + * text2pcap. + * Example to convert the MOZ log file into a PCAP file: + * egrep '(RTP_PACKET|RTCP_PACKET)' moz.log | \ + * text2pcap -D -n -l 1 -i 17 -u 1234,1235 -t '%H:%M:%S.' - rtp.pcap + */ +class RtpLogger { + public: + static bool IsPacketLoggingOn(); + static void LogPacket(const MediaPacket& packet, bool input, + std::string desc); +}; + +} // namespace mozilla +#endif diff --git a/dom/media/webrtc/transportbridge/moz.build b/dom/media/webrtc/transportbridge/moz.build new file mode 100644 index 0000000000..290dd26a0a --- /dev/null +++ b/dom/media/webrtc/transportbridge/moz.build @@ -0,0 +1,27 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +include("/dom/media/webrtc/third_party_build/webrtc.mozbuild") + +LOCAL_INCLUDES += [ + "!/ipc/ipdl/_ipdlheaders", + "/dom/media", + "/dom/media/webrtc", + "/ipc/chromium/src", + "/media/libyuv/libyuv/include", + "/media/webrtc", + "/third_party/libsrtp/src/crypto/include", + "/third_party/libsrtp/src/include", + "/third_party/libwebrtc", + "/third_party/libwebrtc/third_party/abseil-cpp", +] + +UNIFIED_SOURCES += [ + "MediaPipeline.cpp", + "MediaPipelineFilter.cpp", + "RtpLogger.cpp", +] + +FINAL_LIBRARY = "xul" |