summaryrefslogtreecommitdiffstats
path: root/dom/media/webrtc/transportbridge
diff options
context:
space:
mode:
Diffstat (limited to 'dom/media/webrtc/transportbridge')
-rw-r--r--dom/media/webrtc/transportbridge/MediaPipeline.cpp1682
-rw-r--r--dom/media/webrtc/transportbridge/MediaPipeline.h425
-rw-r--r--dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp153
-rw-r--r--dom/media/webrtc/transportbridge/MediaPipelineFilter.h89
-rw-r--r--dom/media/webrtc/transportbridge/RtpLogger.cpp67
-rw-r--r--dom/media/webrtc/transportbridge/RtpLogger.h28
-rw-r--r--dom/media/webrtc/transportbridge/moz.build27
7 files changed, 2471 insertions, 0 deletions
diff --git a/dom/media/webrtc/transportbridge/MediaPipeline.cpp b/dom/media/webrtc/transportbridge/MediaPipeline.cpp
new file mode 100644
index 0000000000..6cb0afc36e
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/MediaPipeline.cpp
@@ -0,0 +1,1682 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: ekr@rtfm.com
+
+#include "MediaPipeline.h"
+
+#include <inttypes.h>
+#include <math.h>
+#include <sstream>
+
+#include "AudioSegment.h"
+#include "AudioConverter.h"
+#include "DOMMediaStream.h"
+#include "ImageContainer.h"
+#include "ImageTypes.h"
+#include "MediaEngine.h"
+#include "MediaSegment.h"
+#include "MediaTrackGraphImpl.h"
+#include "MediaTrackListener.h"
+#include "MediaStreamTrack.h"
+#include "jsapi/RemoteTrackSource.h"
+#include "RtpLogger.h"
+#include "VideoFrameConverter.h"
+#include "VideoSegment.h"
+#include "VideoStreamTrack.h"
+#include "VideoUtils.h"
+#include "mozilla/Logging.h"
+#include "mozilla/NullPrincipal.h"
+#include "mozilla/PeerIdentity.h"
+#include "mozilla/Preferences.h"
+#include "mozilla/SharedThreadPool.h"
+#include "mozilla/Sprintf.h"
+#include "mozilla/StaticPrefs_media.h"
+#include "mozilla/TaskQueue.h"
+#include "mozilla/UniquePtr.h"
+#include "mozilla/UniquePtrExtensions.h"
+#include "mozilla/dom/RTCStatsReportBinding.h"
+#include "mozilla/dom/Document.h"
+#include "mozilla/gfx/Point.h"
+#include "mozilla/gfx/Types.h"
+#include "nsError.h"
+#include "nsThreadUtils.h"
+#include "transport/runnable_utils.h"
+#include "jsapi/MediaTransportHandler.h"
+#include "jsapi/PeerConnectionImpl.h"
+#include "Tracing.h"
+#include "libwebrtcglue/WebrtcImageBuffer.h"
+#include "common_video/include/video_frame_buffer.h"
+#include "modules/rtp_rtcp/include/rtp_rtcp.h"
+#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
+#include "modules/rtp_rtcp/source/rtp_packet_received.h"
+
+// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
+// 48KHz)
+#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
+static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
+ AUDIO_SAMPLE_BUFFER_MAX_BYTES,
+ "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
+
+using namespace mozilla;
+using namespace mozilla::dom;
+using namespace mozilla::gfx;
+using namespace mozilla::layers;
+
+mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
+
+namespace mozilla {
+
+// An async inserter for audio data, to avoid running audio codec encoders
+// on the MTG/input audio thread. Basically just bounces all the audio
+// data to a single audio processing/input queue. We could if we wanted to
+// use multiple threads and a TaskQueue.
+class AudioProxyThread {
+ public:
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
+
+ explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
+ : mConduit(std::move(aConduit)),
+ mTaskQueue(TaskQueue::Create(
+ GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")),
+ mAudioConverter(nullptr) {
+ MOZ_ASSERT(mConduit);
+ MOZ_COUNT_CTOR(AudioProxyThread);
+ }
+
+ // This function is the identity if aInputRate is supported.
+ // Else, it returns a rate that is supported, that ensure no loss in audio
+ // quality: the sampling rate returned is always greater to the inputed
+ // sampling-rate, if they differ..
+ uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
+ AudioSessionConduit* conduit =
+ static_cast<AudioSessionConduit*>(mConduit.get());
+ if (conduit->IsSamplingFreqSupported(aInputRate)) {
+ return aInputRate;
+ }
+ if (aInputRate < 16000) {
+ return 16000;
+ }
+ if (aInputRate < 32000) {
+ return 32000;
+ }
+ if (aInputRate < 44100) {
+ return 44100;
+ }
+ return 48000;
+ }
+
+ // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
+ // something the conduit can work with (or send silence if the track is not
+ // enabled), and send the audio in 10ms chunks to the conduit.
+ void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
+ bool aEnabled) {
+ MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
+
+ // Convert to interleaved 16-bits integer audio, with a maximum of two
+ // channels (since the WebRTC.org code below makes the assumption that the
+ // input audio is either mono or stereo), with a sample-rate rate that is
+ // 16, 32, 44.1, or 48kHz.
+ uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
+ int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
+
+ // We take advantage of the fact that the common case (microphone directly
+ // to PeerConnection, that is, a normal call), the samples are already
+ // 16-bits mono, so the representation in interleaved and planar is the
+ // same, and we can just use that.
+ if (aEnabled && outputChannels == 1 &&
+ aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
+ const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
+ PacketizeAndSend(samples, transmissionRate, outputChannels,
+ aChunk.mDuration);
+ return;
+ }
+
+ uint32_t sampleCount = aChunk.mDuration * outputChannels;
+ if (mInterleavedAudio.Length() < sampleCount) {
+ mInterleavedAudio.SetLength(sampleCount);
+ }
+
+ if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
+ PodZero(mInterleavedAudio.Elements(), sampleCount);
+ } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
+ DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
+ aChunk.mVolume, outputChannels,
+ mInterleavedAudio.Elements());
+ } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
+ DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
+ aChunk.mVolume, outputChannels,
+ mInterleavedAudio.Elements());
+ }
+ int16_t* inputAudio = mInterleavedAudio.Elements();
+ size_t inputAudioFrameCount = aChunk.mDuration;
+
+ AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
+ AudioConfig::FORMAT_S16);
+ AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
+ transmissionRate, AudioConfig::FORMAT_S16);
+ // Resample to an acceptable sample-rate for the sending side
+ if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
+ mAudioConverter->OutputConfig() != outputConfig) {
+ mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
+ }
+
+ int16_t* processedAudio = nullptr;
+ size_t framesProcessed =
+ mAudioConverter->Process(inputAudio, inputAudioFrameCount);
+
+ if (framesProcessed == 0) {
+ // In place conversion not possible, use a buffer.
+ framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
+ inputAudioFrameCount);
+ processedAudio = mOutputAudio.Data();
+ } else {
+ processedAudio = inputAudio;
+ }
+
+ PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
+ framesProcessed);
+ }
+
+ // This packetizes aAudioData in 10ms chunks and sends it.
+ // aAudioData is interleaved audio data at a rate and with a channel count
+ // that is appropriate to send with the conduit.
+ void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
+ uint32_t aChannels, uint32_t aFrameCount) {
+ MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
+ MOZ_ASSERT(aChannels == 1 || aChannels == 2);
+ MOZ_ASSERT(aAudioData);
+
+ uint32_t audio_10ms = aRate / 100;
+
+ if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms ||
+ mPacketizer->mChannels != aChannels) {
+ // It's the right thing to drop the bit of audio still in the packetizer:
+ // we don't want to send to the conduit audio that has two different
+ // rates while telling it that it has a constante rate.
+ mPacketizer =
+ MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
+ mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
+ }
+
+ mPacketizer->Input(aAudioData, aFrameCount);
+
+ while (mPacketizer->PacketsAvailable()) {
+ mPacketizer->Output(mPacket.get());
+ auto frame = std::make_unique<webrtc::AudioFrame>();
+ // UpdateFrame makes a copy of the audio data.
+ frame->UpdateFrame(frame->timestamp_, mPacket.get(),
+ mPacketizer->mPacketSize, aRate, frame->speech_type_,
+ frame->vad_activity_, mPacketizer->mChannels);
+ mConduit->SendAudioFrame(std::move(frame));
+ }
+ }
+
+ void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
+ bool aEnabled) {
+ RefPtr<AudioProxyThread> self = this;
+ nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
+ "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
+ self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
+ }));
+ MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
+ Unused << rv;
+ }
+
+ protected:
+ virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); }
+
+ const RefPtr<AudioSessionConduit> mConduit;
+ const RefPtr<TaskQueue> mTaskQueue;
+ // Only accessed on mTaskQueue
+ UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
+ // A buffer to hold a single packet of audio.
+ UniquePtr<int16_t[]> mPacket;
+ nsTArray<int16_t> mInterleavedAudio;
+ AlignedShortBuffer mOutputAudio;
+ UniquePtr<AudioConverter> mAudioConverter;
+};
+
+MediaPipeline::MediaPipeline(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ DirectionType aDirection,
+ RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<MediaSessionConduit> aConduit)
+ : mConduit(std::move(aConduit)),
+ mDirection(aDirection),
+ mCallThread(std::move(aCallThread)),
+ mStsThread(aStsThread),
+ mActive(false, "MediaPipeline::mActive"),
+ mLevel(0),
+ mTransportHandler(std::move(aTransportHandler)),
+ mRtpPacketsSent(0),
+ mRtcpPacketsSent(0),
+ mRtpPacketsReceived(0),
+ mRtcpPacketsReceived(0),
+ mRtpBytesSent(0),
+ mRtpBytesReceived(0),
+ mPc(aPc),
+ mFilter(),
+ mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()),
+ mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {}
+
+MediaPipeline::~MediaPipeline() {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("Destroying MediaPipeline: %s", mDescription.c_str()));
+}
+
+void MediaPipeline::Start() {
+ MOZ_ASSERT(NS_IsMainThread());
+ mActive = true;
+}
+
+void MediaPipeline::Stop() {
+ MOZ_ASSERT(NS_IsMainThread());
+ mActive = false;
+}
+
+void MediaPipeline::Shutdown() {
+ MOZ_ASSERT(NS_IsMainThread());
+ Stop();
+
+ RUN_ON_THREAD(mStsThread,
+ WrapRunnable(RefPtr<MediaPipeline>(this),
+ &MediaPipeline::DetachTransport_s),
+ NS_DISPATCH_NORMAL);
+}
+
+void MediaPipeline::DetachTransport_s() {
+ ASSERT_ON_THREAD(mStsThread);
+
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("%s in %s", mDescription.c_str(), __FUNCTION__));
+
+ disconnect_all();
+ mRtpState = TransportLayer::TS_NONE;
+ mRtcpState = TransportLayer::TS_NONE;
+ mTransportId.clear();
+ mConduit->SetTransportActive(false);
+ mRtpSendEventListener.DisconnectIfExists();
+ mSenderRtcpSendEventListener.DisconnectIfExists();
+ mReceiverRtcpSendEventListener.DisconnectIfExists();
+}
+
+void MediaPipeline::UpdateTransport_m(
+ const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
+ mStsThread->Dispatch(NS_NewRunnableFunction(
+ __func__, [aTransportId, filter = std::move(aFilter),
+ self = RefPtr<MediaPipeline>(this)]() mutable {
+ self->UpdateTransport_s(aTransportId, std::move(filter));
+ }));
+}
+
+void MediaPipeline::UpdateTransport_s(
+ const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
+ ASSERT_ON_THREAD(mStsThread);
+ if (!mSignalsConnected) {
+ mTransportHandler->SignalStateChange.connect(
+ this, &MediaPipeline::RtpStateChange);
+ mTransportHandler->SignalRtcpStateChange.connect(
+ this, &MediaPipeline::RtcpStateChange);
+ mTransportHandler->SignalEncryptedSending.connect(
+ this, &MediaPipeline::EncryptedPacketSending);
+ mTransportHandler->SignalPacketReceived.connect(
+ this, &MediaPipeline::PacketReceived);
+ mTransportHandler->SignalAlpnNegotiated.connect(
+ this, &MediaPipeline::AlpnNegotiated);
+ mSignalsConnected = true;
+ }
+
+ if (aTransportId != mTransportId) {
+ mTransportId = aTransportId;
+ mRtpState = mTransportHandler->GetState(mTransportId, false);
+ mRtcpState = mTransportHandler->GetState(mTransportId, true);
+ CheckTransportStates();
+ }
+
+ if (mFilter) {
+ for (const auto& extension : mFilter->GetExtmap()) {
+ mRtpHeaderExtensionMap->Deregister(extension.uri);
+ }
+ }
+ if (mFilter && aFilter) {
+ // Use the new filter, but don't forget any remote SSRCs that we've learned
+ // by receiving traffic.
+ mFilter->Update(*aFilter);
+ } else {
+ mFilter = std::move(aFilter);
+ }
+ if (mFilter) {
+ for (const auto& extension : mFilter->GetExtmap()) {
+ mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri);
+ }
+ }
+}
+
+void MediaPipeline::GetContributingSourceStats(
+ const nsString& aInboundRtpStreamId,
+ FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
+ ASSERT_ON_THREAD(mStsThread);
+ // Get the expiry from now
+ DOMHighResTimeStamp expiry =
+ RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow());
+ for (auto info : mCsrcStats) {
+ if (!info.second.Expired(expiry)) {
+ RTCRTPContributingSourceStats stats;
+ info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
+ if (!aArr.AppendElement(stats, fallible)) {
+ mozalloc_handle_oom(0);
+ }
+ }
+ }
+}
+
+void MediaPipeline::RtpStateChange(const std::string& aTransportId,
+ TransportLayer::State aState) {
+ if (mTransportId != aTransportId) {
+ return;
+ }
+ mRtpState = aState;
+ CheckTransportStates();
+}
+
+void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
+ TransportLayer::State aState) {
+ if (mTransportId != aTransportId) {
+ return;
+ }
+ mRtcpState = aState;
+ CheckTransportStates();
+}
+
+void MediaPipeline::CheckTransportStates() {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (mRtpState == TransportLayer::TS_CLOSED ||
+ mRtpState == TransportLayer::TS_ERROR ||
+ mRtcpState == TransportLayer::TS_CLOSED ||
+ mRtcpState == TransportLayer::TS_ERROR) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
+ ("RTP Transport failed for pipeline %p flow %s", this,
+ mDescription.c_str()));
+
+ NS_WARNING(
+ "MediaPipeline Transport failed. This is not properly cleaned up yet");
+ // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
+ // connection was good and now it is bad.
+ // TODO(ekr@rtfm.com): Report up so that the PC knows we
+ // have experienced an error.
+ mConduit->SetTransportActive(false);
+ mRtpSendEventListener.DisconnectIfExists();
+ mSenderRtcpSendEventListener.DisconnectIfExists();
+ mReceiverRtcpSendEventListener.DisconnectIfExists();
+ return;
+ }
+
+ if (mRtpState == TransportLayer::TS_OPEN) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("RTP Transport ready for pipeline %p flow %s", this,
+ mDescription.c_str()));
+ }
+
+ if (mRtcpState == TransportLayer::TS_OPEN) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("RTCP Transport ready for pipeline %p flow %s", this,
+ mDescription.c_str()));
+ }
+
+ if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
+ if (mDirection == DirectionType::TRANSMIT) {
+ mConduit->ConnectSenderRtcpEvent(mSenderRtcpReceiveEvent);
+ mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect(
+ mStsThread, this, &MediaPipeline::SendPacket);
+ mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect(
+ mStsThread, this, &MediaPipeline::SendPacket);
+ } else {
+ mConduit->ConnectReceiverRtcpEvent(mReceiverRtcpReceiveEvent);
+ mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent);
+ mReceiverRtcpSendEventListener =
+ mConduit->ReceiverRtcpSendEvent().Connect(mStsThread, this,
+ &MediaPipeline::SendPacket);
+ }
+ mConduit->SetTransportActive(true);
+ TransportReady_s();
+ }
+}
+
+void MediaPipeline::SendPacket(MediaPacket&& aPacket) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ const bool isRtp = aPacket.type() == MediaPacket::RTP;
+
+ if (isRtp && mRtpState != TransportLayer::TS_OPEN) {
+ return;
+ }
+
+ if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) {
+ return;
+ }
+
+ aPacket.sdp_level() = Some(Level());
+
+ if (RtpLogger::IsPacketLoggingOn()) {
+ RtpLogger::LogPacket(aPacket, false, mDescription);
+ }
+
+ if (isRtp) {
+ mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true,
+ aPacket.data(), aPacket.len());
+ IncrementRtpPacketsSent(aPacket);
+ } else {
+ mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true,
+ aPacket.data(), aPacket.len());
+ IncrementRtcpPacketsSent();
+ }
+
+ MOZ_LOG(
+ gMediaPipelineLog, LogLevel::Debug,
+ ("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP")));
+
+ mTransportHandler->SendPacket(mTransportId, std::move(aPacket));
+}
+
+void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) {
+ ASSERT_ON_THREAD(mStsThread);
+ ++mRtpPacketsSent;
+ mRtpBytesSent += aPacket.len();
+
+ if (!(mRtpPacketsSent % 100)) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
+ mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
+ }
+}
+
+void MediaPipeline::IncrementRtcpPacketsSent() {
+ ASSERT_ON_THREAD(mStsThread);
+ ++mRtcpPacketsSent;
+ if (!(mRtcpPacketsSent % 100)) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("RTCP sent packet count for %s Pipeline %p: %u",
+ mDescription.c_str(), this, mRtcpPacketsSent));
+ }
+}
+
+void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
+ ASSERT_ON_THREAD(mStsThread);
+ ++mRtpPacketsReceived;
+ mRtpBytesReceived += aBytes;
+ if (!(mRtpPacketsReceived % 100)) {
+ MOZ_LOG(
+ gMediaPipelineLog, LogLevel::Info,
+ ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
+ mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
+ }
+}
+
+void MediaPipeline::IncrementRtcpPacketsReceived() {
+ ASSERT_ON_THREAD(mStsThread);
+ ++mRtcpPacketsReceived;
+ if (!(mRtcpPacketsReceived % 100)) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("RTCP received packet count for %s Pipeline %p: %u",
+ mDescription.c_str(), this, mRtcpPacketsReceived));
+ }
+}
+
+void MediaPipeline::RtpPacketReceived(const MediaPacket& packet) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (mDirection == DirectionType::TRANSMIT) {
+ return;
+ }
+
+ if (!packet.len()) {
+ return;
+ }
+
+ webrtc::RTPHeader header;
+ rtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len());
+ webrtc::RtpPacketReceived pktHeader(mRtpHeaderExtensionMap.get());
+ if (!pktHeader.Parse(packet_buffer)) {
+ return;
+ }
+ pktHeader.GetHeader(&header);
+
+ if (mFilter && !mFilter->Filter(header)) {
+ return;
+ }
+
+ // Make sure to only get the time once, and only if we need it by
+ // using getTimestamp() for access
+ DOMHighResTimeStamp now = 0.0;
+ bool hasTime = false;
+
+ // Remove expired RtpCSRCStats
+ if (!mCsrcStats.empty()) {
+ if (!hasTime) {
+ now = GetTimestampMaker().GetNow();
+ hasTime = true;
+ }
+ auto expiry = RtpCSRCStats::GetExpiryFromTime(now);
+ for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
+ if (p->second.Expired(expiry)) {
+ p = mCsrcStats.erase(p);
+ continue;
+ }
+ p++;
+ }
+ }
+
+ // Add new RtpCSRCStats
+ if (header.numCSRCs) {
+ for (auto i = 0; i < header.numCSRCs; i++) {
+ if (!hasTime) {
+ now = GetTimestampMaker().GetNow();
+ hasTime = true;
+ }
+ auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
+ if (csrcInfo == mCsrcStats.end()) {
+ mCsrcStats.insert(std::make_pair(
+ header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now)));
+ } else {
+ csrcInfo->second.SetTimestamp(now);
+ }
+ }
+ }
+
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("%s received RTP packet.", mDescription.c_str()));
+ IncrementRtpPacketsReceived(packet.len());
+ OnRtpPacketReceived();
+
+ RtpLogger::LogPacket(packet, true, mDescription);
+
+ // Might be nice to pass ownership of the buffer in this case, but it is a
+ // small optimization in a rare case.
+ mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
+ packet.encrypted_data(), packet.encrypted_len());
+
+ mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
+ packet.len());
+
+ mRtpReceiveEvent.Notify(packet.Clone(), header);
+}
+
+void MediaPipeline::RtcpPacketReceived(const MediaPacket& packet) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (!packet.len()) {
+ return;
+ }
+
+ // We do not filter RTCP. This is because a compound RTCP packet can contain
+ // any collection of RTCP packets, and webrtc.org already knows how to filter
+ // out what it is interested in, and what it is not. Maybe someday we should
+ // have a TransportLayer that breaks up compound RTCP so we can filter them
+ // individually, but I doubt that will matter much.
+
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("%s received RTCP packet.", mDescription.c_str()));
+ IncrementRtcpPacketsReceived();
+
+ RtpLogger::LogPacket(packet, true, mDescription);
+
+ // Might be nice to pass ownership of the buffer in this case, but it is a
+ // small optimization in a rare case.
+ mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false,
+ packet.encrypted_data(), packet.encrypted_len());
+
+ mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false,
+ packet.data(), packet.len());
+
+ if (StaticPrefs::media_webrtc_net_force_disable_rtcp_reception()) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("%s RTCP packet forced to be dropped", mDescription.c_str()));
+ return;
+ }
+
+ if (mDirection == DirectionType::TRANSMIT) {
+ mSenderRtcpReceiveEvent.Notify(packet.Clone());
+ } else {
+ mReceiverRtcpReceiveEvent.Notify(packet.Clone());
+ }
+}
+
+void MediaPipeline::PacketReceived(const std::string& aTransportId,
+ const MediaPacket& packet) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (mTransportId != aTransportId) {
+ return;
+ }
+
+ MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
+ MOZ_ASSERT(mRtcpState == mRtpState);
+
+ switch (packet.type()) {
+ case MediaPacket::RTP:
+ RtpPacketReceived(packet);
+ break;
+ case MediaPacket::RTCP:
+ RtcpPacketReceived(packet);
+ break;
+ default:;
+ }
+}
+
+void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
+ bool aPrivacyRequested) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (aPrivacyRequested) {
+ MakePrincipalPrivate_s();
+ }
+}
+
+void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
+ const MediaPacket& aPacket) {
+ ASSERT_ON_THREAD(mStsThread);
+
+ if (mTransportId == aTransportId) {
+ dom::mozPacketDumpType type;
+ if (aPacket.type() == MediaPacket::SRTP) {
+ type = dom::mozPacketDumpType::Srtp;
+ } else if (aPacket.type() == MediaPacket::SRTCP) {
+ type = dom::mozPacketDumpType::Srtcp;
+ } else if (aPacket.type() == MediaPacket::DTLS) {
+ // TODO(bug 1497936): Implement packet dump for DTLS
+ return;
+ } else {
+ MOZ_ASSERT(false);
+ return;
+ }
+ mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
+ }
+}
+
+class MediaPipelineTransmit::PipelineListener
+ : public DirectMediaTrackListener {
+ friend class MediaPipelineTransmit;
+
+ public:
+ explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
+ : mConduit(std::move(aConduit)),
+ mActive(false),
+ mEnabled(false),
+ mDirectConnect(false) {}
+
+ ~PipelineListener() {
+ if (mConverter) {
+ mConverter->Shutdown();
+ }
+ }
+
+ void SetActive(bool aActive) {
+ mActive = aActive;
+ if (mConverter) {
+ mConverter->SetActive(aActive);
+ }
+ }
+ void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
+
+ // These are needed since nested classes don't have access to any particular
+ // instance of the parent
+ void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
+ mAudioProcessing = std::move(aProxy);
+ }
+
+ void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
+ mConverter = std::move(aConverter);
+ }
+
+ void OnVideoFrameConverted(webrtc::VideoFrame aVideoFrame) {
+ MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
+ static_cast<VideoSessionConduit*>(mConduit.get())
+ ->SendVideoFrame(std::move(aVideoFrame));
+ }
+
+ // Implement MediaTrackListener
+ void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
+ const MediaSegment& aQueuedMedia) override;
+ void NotifyEnabledStateChanged(MediaTrackGraph* aGraph,
+ bool aEnabled) override;
+
+ // Implement DirectMediaTrackListener
+ void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
+ const MediaSegment& aMedia) override;
+ void NotifyDirectListenerInstalled(InstallationResult aResult) override;
+ void NotifyDirectListenerUninstalled() override;
+
+ private:
+ void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
+
+ const RefPtr<MediaSessionConduit> mConduit;
+ RefPtr<AudioProxyThread> mAudioProcessing;
+ RefPtr<VideoFrameConverter> mConverter;
+
+ // active is true if there is a transport to send on
+ mozilla::Atomic<bool> mActive;
+ // enabled is true if the media access control permits sending
+ // actual content; when false you get black/silence
+ mozilla::Atomic<bool> mEnabled;
+
+ // Written and read on the MediaTrackGraph thread
+ bool mDirectConnect;
+};
+
+MediaPipelineTransmit::MediaPipelineTransmit(
+ const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
+ bool aIsVideo, RefPtr<MediaSessionConduit> aConduit)
+ : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
+ std::move(aCallThread), std::move(aStsThread),
+ std::move(aConduit)),
+ mWatchManager(this, AbstractThread::MainThread()),
+ mIsVideo(aIsVideo),
+ mListener(new PipelineListener(mConduit)),
+ mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"),
+ mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") {
+ if (!IsVideo()) {
+ mAudioProcessing =
+ MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit());
+ mListener->SetAudioProxy(mAudioProcessing);
+ } else { // Video
+ mConverter = MakeAndAddRef<VideoFrameConverter>(GetTimestampMaker());
+ mFrameListener = mConverter->VideoFrameConvertedEvent().Connect(
+ mConverter->mTaskQueue,
+ [listener = mListener](webrtc::VideoFrame aFrame) {
+ listener->OnVideoFrameConverted(std::move(aFrame));
+ });
+ mListener->SetVideoFrameConverter(mConverter);
+ }
+
+ mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState);
+ mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState);
+ mWatchManager.Watch(mSendTrackOverride,
+ &MediaPipelineTransmit::UpdateSendState);
+
+ mDescription = GenerateDescription();
+}
+
+MediaPipelineTransmit::~MediaPipelineTransmit() {
+ mFrameListener.DisconnectIfExists();
+
+ MOZ_ASSERT(!mTransmitting);
+ MOZ_ASSERT(!mDomTrack.Ref());
+}
+
+void MediaPipelineTransmit::Shutdown() {
+ MediaPipeline::Shutdown();
+ MOZ_ASSERT(!mActive);
+ mWatchManager.Shutdown();
+ if (mDomTrack.Ref()) {
+ mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
+ mDomTrack = nullptr;
+ }
+ mUnsettingSendTrack = false;
+ UpdateSendState();
+ MOZ_ASSERT(!mTransmitting);
+}
+
+void MediaPipeline::SetDescription_s(const std::string& description) {
+ ASSERT_ON_THREAD(mStsThread);
+ mDescription = description;
+}
+
+std::string MediaPipelineTransmit::GenerateDescription() const {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ std::stringstream description;
+ description << mPc << "| ";
+ description << (mIsVideo ? "Transmit video[" : "Transmit audio[");
+
+ if (mDomTrack.Ref()) {
+ nsString nsTrackId;
+ mDomTrack.Ref()->GetId(nsTrackId);
+ description << NS_ConvertUTF16toUTF8(nsTrackId).get();
+ } else if (mSendTrackOverride.Ref()) {
+ description << "override " << mSendTrackOverride.Ref().get();
+ } else {
+ description << "no track";
+ }
+
+ description << "]";
+
+ return description.str();
+}
+
+void MediaPipelineTransmit::UpdateSendState() {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ // This runs because either mActive, mDomTrack or mSendTrackOverride changed,
+ // or because mSendTrack was unset async. Based on these inputs this method
+ // is responsible for hooking up mSendTrack to mListener in order to feed data
+ // to the conduit.
+ //
+ // If we are inactive, or if the send track does not match what we want to
+ // send (mDomTrack or mSendTrackOverride), we must stop feeding data to the
+ // conduit. NB that removing the listener from mSendTrack is async, and we
+ // must wait for it to resolve before adding mListener to another track.
+ // mUnsettingSendTrack gates us until the listener has been removed from
+ // mSendTrack.
+ //
+ // If we are active and the send track does match what we want to send, we
+ // make sure mListener is added to the send track. Either now, or if we're
+ // still waiting for another send track to be removed, during a future call to
+ // this method.
+
+ if (mUnsettingSendTrack) {
+ // We must wait for the send track to be unset before we can set it again,
+ // to avoid races. Once unset this function is triggered again.
+ return;
+ }
+
+ const bool wasTransmitting = mTransmitting;
+
+ const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed();
+ const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended();
+ const bool haveLiveOverrideTrack =
+ mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed();
+ const bool mustRemoveSendTrack =
+ haveLiveSendTrack && !mSendTrackOverride.Ref() &&
+ (!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource);
+
+ mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) &&
+ !mustRemoveSendTrack;
+
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, "
+ "sendTrack=%p (%s), domTrack=%p (%s), "
+ "sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d",
+ this, wasTransmitting, mActive.Ref(), mSendTrack.get(),
+ haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(),
+ haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(),
+ haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack,
+ mTransmitting));
+
+ if (!wasTransmitting && mTransmitting) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("Attaching pipeline %p to track %p conduit type=%s", this,
+ mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
+ if (mDescriptionInvalidated) {
+ // Only update the description when we attach to a track, as detaching is
+ // always a longer async step than updating the description. Updating on
+ // detach would cause the wrong track id to be attributed in logs.
+ RUN_ON_THREAD(mStsThread,
+ WrapRunnable(RefPtr<MediaPipeline>(this),
+ &MediaPipelineTransmit::SetDescription_s,
+ GenerateDescription()),
+ NS_DISPATCH_NORMAL);
+ mDescriptionInvalidated = false;
+ }
+ if (mSendTrackOverride.Ref()) {
+ // Special path that allows unittests to avoid mDomTrack and the graph by
+ // manually calling SetSendTrack.
+ mSendTrack = mSendTrackOverride.Ref();
+ } else {
+ mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack(
+ mDomTrack.Ref()->GetTrack()->mType);
+ mSendPortSource = mDomTrack.Ref()->GetTrack();
+ mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get());
+ }
+ if (mIsVideo) {
+ mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId);
+ }
+ mSendTrack->QueueSetAutoend(false);
+ if (mIsVideo) {
+ mSendTrack->AddDirectListener(mListener);
+ }
+ mSendTrack->AddListener(mListener);
+ }
+
+ if (wasTransmitting && !mTransmitting) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("Detaching pipeline %p from track %p conduit type=%s", this,
+ mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
+ mUnsettingSendTrack = true;
+ if (mIsVideo) {
+ mSendTrack->RemoveDirectListener(mListener);
+ }
+ mSendTrack->RemoveListener(mListener)->Then(
+ GetMainThreadSerialEventTarget(), __func__,
+ [this, self = RefPtr<MediaPipelineTransmit>(this)] {
+ mUnsettingSendTrack = false;
+ mSendTrack = nullptr;
+ if (!mWatchManager.IsShutdown()) {
+ mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState);
+ }
+ });
+ if (!mSendTrackOverride.Ref()) {
+ // If an override is set it may be re-used.
+ mSendTrack->Destroy();
+ mSendPort->Destroy();
+ mSendPort = nullptr;
+ mSendPortSource = nullptr;
+ }
+ }
+}
+
+bool MediaPipelineTransmit::Transmitting() const {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ return mActive;
+}
+
+bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
+
+void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) {
+ MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref());
+
+ PeerConnectionWrapper pcw(mPc);
+ if (pcw.impl()) {
+ Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc();
+ if (doc) {
+ UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity());
+ } else {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("Can't update sink principal; document gone"));
+ }
+ }
+}
+
+void MediaPipelineTransmit::UpdateSinkIdentity(
+ nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) {
+ MOZ_ASSERT(NS_IsMainThread());
+
+ if (!mDomTrack.Ref()) {
+ // Nothing to do here
+ return;
+ }
+
+ bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal());
+ if (!enableTrack) {
+ // first try didn't work, but there's a chance that this is still available
+ // if our track is bound to a peerIdentity, and the peer connection (our
+ // sink) is bound to the same identity, then we can enable the track.
+ const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity();
+ if (aSinkIdentity && trackIdentity) {
+ enableTrack = (*aSinkIdentity == *trackIdentity);
+ }
+ }
+
+ mListener->SetEnabled(enableTrack);
+}
+
+void MediaPipelineTransmit::TransportReady_s() {
+ ASSERT_ON_THREAD(mStsThread);
+ // Call base ready function.
+ MediaPipeline::TransportReady_s();
+ mListener->SetActive(true);
+}
+
+nsresult MediaPipelineTransmit::SetTrack(RefPtr<MediaStreamTrack> aDomTrack) {
+ MOZ_ASSERT(NS_IsMainThread());
+ if (mDomTrack.Ref()) {
+ mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
+ }
+
+ if (aDomTrack) {
+ nsString nsTrackId;
+ aDomTrack->GetId(nsTrackId);
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("Reattaching pipeline to track %p track %s conduit type: %s",
+ aDomTrack.get(), NS_ConvertUTF16toUTF8(nsTrackId).get(),
+ mIsVideo ? "video" : "audio"));
+ }
+
+ mDescriptionInvalidated = true;
+ mDomTrack = std::move(aDomTrack);
+ if (mDomTrack.Ref()) {
+ mDomTrack.Ref()->AddPrincipalChangeObserver(this);
+ PrincipalChanged(mDomTrack.Ref());
+ }
+
+ return NS_OK;
+}
+
+RefPtr<dom::MediaStreamTrack> MediaPipelineTransmit::GetTrack() const {
+ MOZ_ASSERT(NS_IsMainThread());
+ return mDomTrack;
+}
+
+void MediaPipelineTransmit::SetSendTrackOverride(
+ RefPtr<ProcessedMediaTrack> aSendTrack) {
+ MOZ_ASSERT(NS_IsMainThread());
+ MOZ_RELEASE_ASSERT(!mSendTrack);
+ MOZ_RELEASE_ASSERT(!mSendPort);
+ MOZ_RELEASE_ASSERT(!mSendTrackOverride.Ref());
+ mDescriptionInvalidated = true;
+ mSendTrackOverride = std::move(aSendTrack);
+}
+
+// Called if we're attached with AddDirectListener()
+void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
+ MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) {
+ MOZ_LOG(
+ gMediaPipelineLog, LogLevel::Debug,
+ ("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
+ ", duration=%" PRId64,
+ this, aOffset, aMedia.GetDuration()));
+ TRACE_COMMENT(
+ "MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData", "%s",
+ aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio");
+ NewData(aMedia, aGraph->GraphRate());
+}
+
+void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
+ MediaTrackGraph* aGraph, TrackTime aOffset,
+ const MediaSegment& aQueuedMedia) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("MediaPipeline::NotifyQueuedChanges()"));
+
+ if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
+ // We always get video from the direct listener.
+ return;
+ }
+
+ TRACE("MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges (Audio)");
+
+ if (mDirectConnect) {
+ // ignore non-direct data if we're also getting direct data
+ return;
+ }
+
+ size_t rate;
+ if (aGraph) {
+ rate = aGraph->GraphRate();
+ } else {
+ // When running tests, graph may be null. In that case use a default.
+ rate = 16000;
+ }
+ NewData(aQueuedMedia, rate);
+}
+
+void MediaPipelineTransmit::PipelineListener::NotifyEnabledStateChanged(
+ MediaTrackGraph* aGraph, bool aEnabled) {
+ if (mConduit->type() != MediaSessionConduit::VIDEO) {
+ return;
+ }
+ MOZ_ASSERT(mConverter);
+ mConverter->SetTrackEnabled(aEnabled);
+}
+
+void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
+ InstallationResult aResult) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
+ ("MediaPipeline::NotifyDirectListenerInstalled() listener=%p,"
+ " result=%d",
+ this, static_cast<int32_t>(aResult)));
+
+ mDirectConnect = InstallationResult::SUCCESS == aResult;
+}
+
+void MediaPipelineTransmit::PipelineListener::
+ NotifyDirectListenerUninstalled() {
+ MOZ_LOG(
+ gMediaPipelineLog, LogLevel::Info,
+ ("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p", this));
+
+ if (mConduit->type() == MediaSessionConduit::VIDEO) {
+ // Reset the converter's track-enabled state. If re-added to a new track
+ // later and that track is disabled, we will be signaled explicitly.
+ MOZ_ASSERT(mConverter);
+ mConverter->SetTrackEnabled(true);
+ }
+
+ mDirectConnect = false;
+}
+
+void MediaPipelineTransmit::PipelineListener::NewData(
+ const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
+ if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
+ ? MediaSessionConduit::AUDIO
+ : MediaSessionConduit::VIDEO)) {
+ MOZ_ASSERT(false,
+ "The media type should always be correct since the "
+ "listener is locked to a specific track");
+ return;
+ }
+
+ // TODO(ekr@rtfm.com): For now assume that we have only one
+ // track type and it's destined for us
+ // See bug 784517
+ if (aMedia.GetType() == MediaSegment::AUDIO) {
+ MOZ_RELEASE_ASSERT(aRate > 0);
+
+ if (!mActive) {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("Discarding audio packets because transport not ready"));
+ return;
+ }
+
+ const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
+ for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
+ iter.Next()) {
+ mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
+ }
+ } else {
+ const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);
+
+ for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
+ iter.Next()) {
+ mConverter->QueueVideoChunk(*iter, !mEnabled);
+ }
+ }
+}
+
+class GenericReceiveListener : public MediaTrackListener {
+ public:
+ explicit GenericReceiveListener(const RefPtr<dom::MediaStreamTrack>& aTrack)
+ : mTrackSource(new nsMainThreadPtrHolder<RemoteTrackSource>(
+ "GenericReceiveListener::mTrackSource",
+ &static_cast<RemoteTrackSource&>(aTrack->GetSource()))),
+ mSource(mTrackSource->mStream),
+ mTrackingId(mTrackSource->mTrackingId),
+ mIsAudio(aTrack->AsAudioStreamTrack()),
+ mEnabled(false),
+ mMaybeTrackNeedsUnmute(true) {
+ MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
+ MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack");
+ }
+
+ virtual ~GenericReceiveListener() = default;
+
+ void SetEnabled(bool aEnabled) {
+ if (mEnabled == aEnabled) {
+ return;
+ }
+ mEnabled = aEnabled;
+ if (aEnabled) {
+ mMaybeTrackNeedsUnmute = true;
+ }
+ if (mIsAudio && !mSource->IsDestroyed()) {
+ mSource->SetPullingEnabled(mEnabled);
+ }
+ }
+
+ void OnRtpReceived() {
+ if (mMaybeTrackNeedsUnmute) {
+ mMaybeTrackNeedsUnmute = false;
+ GetMainThreadEventTarget()->Dispatch(
+ NewRunnableMethod("GenericReceiveListener::OnRtpReceived_m", this,
+ &GenericReceiveListener::OnRtpReceived_m));
+ }
+ }
+
+ void OnRtpReceived_m() {
+ if (mEnabled) {
+ mTrackSource->SetMuted(false);
+ }
+ }
+
+ void EndTrack() {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
+ ("GenericReceiveListener ending track"));
+
+ if (!mSource->IsDestroyed()) {
+ // This breaks the cycle with the SourceMediaTrack
+ mSource->RemoveListener(this);
+ mSource->End();
+ mSource->Destroy();
+ }
+
+ GetMainThreadEventTarget()->Dispatch(
+ NewRunnableMethod("RemoteTrackSource::ForceEnded", mTrackSource.get(),
+ &RemoteTrackSource::ForceEnded));
+ }
+
+ protected:
+ const nsMainThreadPtrHandle<RemoteTrackSource> mTrackSource;
+ const RefPtr<SourceMediaTrack> mSource;
+ const TrackingId mTrackingId;
+ const bool mIsAudio;
+ // Main thread only.
+ bool mEnabled;
+ // Any thread.
+ Atomic<bool> mMaybeTrackNeedsUnmute;
+};
+
+MediaPipelineReceive::MediaPipelineReceive(
+ const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<MediaSessionConduit> aConduit)
+ : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE,
+ std::move(aCallThread), std::move(aStsThread),
+ std::move(aConduit)) {}
+
+MediaPipelineReceive::~MediaPipelineReceive() = default;
+
+class MediaPipelineReceiveAudio::PipelineListener
+ : public GenericReceiveListener {
+ public:
+ PipelineListener(const RefPtr<dom::MediaStreamTrack>& aTrack,
+ RefPtr<MediaSessionConduit> aConduit,
+ const PrincipalHandle& aPrincipalHandle)
+ : GenericReceiveListener(aTrack),
+ mConduit(std::move(aConduit)),
+ // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
+ // This is an artificial limitation, it would however require more
+ // changes to support any rates. If the sampling rate is not-supported,
+ // we will use 48kHz instead.
+ mRate(static_cast<AudioSessionConduit*>(mConduit.get())
+ ->IsSamplingFreqSupported(mSource->Graph()->GraphRate())
+ ? mSource->Graph()->GraphRate()
+ : WEBRTC_MAX_SAMPLE_RATE),
+ mTaskQueue(TaskQueue::Create(
+ GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
+ "AudioPipelineListener")),
+ mPlayedTicks(0),
+ mAudioFrame(std::make_unique<webrtc::AudioFrame>()),
+ mPrincipalHandle(aPrincipalHandle),
+ mForceSilence(false) {}
+
+ void Init() {
+ mSource->SetAppendDataSourceRate(mRate);
+ mSource->AddListener(this);
+ }
+
+ // Implement MediaTrackListener
+ void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData,
+ TrackTime aDesiredTime) override {
+ NotifyPullImpl(aDesiredTime);
+ }
+
+ void MakePrincipalPrivate_s() {
+ mForceSilence = true;
+
+ GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction(
+ "MediaPipelineReceiveAudio::PipelineListener::MakePrincipalPrivate_s",
+ [self = RefPtr<PipelineListener>(this), this] {
+ class Message : public ControlMessage {
+ public:
+ Message(RefPtr<PipelineListener> aListener,
+ const PrincipalHandle& aPrivatePrincipal)
+ : ControlMessage(nullptr),
+ mListener(std::move(aListener)),
+ mPrivatePrincipal(aPrivatePrincipal) {}
+
+ void Run() override {
+ mListener->mPrincipalHandle = mPrivatePrincipal;
+ mListener->mForceSilence = false;
+ }
+
+ const RefPtr<PipelineListener> mListener;
+ PrincipalHandle mPrivatePrincipal;
+ };
+
+ RefPtr<nsIPrincipal> privatePrincipal =
+ NullPrincipal::CreateWithInheritedAttributes(
+ mTrackSource->GetPrincipal());
+ mTrackSource->SetPrincipal(privatePrincipal);
+
+ if (mSource->IsDestroyed()) {
+ return;
+ }
+
+ mSource->GraphImpl()->AppendMessage(
+ MakeUnique<Message>(this, MakePrincipalHandle(privatePrincipal)));
+ }));
+ }
+
+ private:
+ ~PipelineListener() = default;
+
+ void NotifyPullImpl(TrackTime aDesiredTime) {
+ TRACE_COMMENT("PiplineListener::NotifyPullImpl", "PipelineListener %p",
+ this);
+ uint32_t samplesPer10ms = mRate / 100;
+
+ // mSource's rate is not necessarily the same as the graph rate, since there
+ // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
+ // 48kHz are supported. The audio frames we get here is going to be
+ // resampled when inserted into the graph. aDesiredTime and mPlayedTicks are
+ // in the graph rate.
+
+ while (mPlayedTicks < aDesiredTime) {
+ // This fetches 10ms of data, either mono or stereo
+ MediaConduitErrorCode err =
+ static_cast<AudioSessionConduit*>(mConduit.get())
+ ->GetAudioFrame(mRate, mAudioFrame.get());
+
+ if (err != kMediaConduitNoError) {
+ // Insert silence on conduit/GIPS failure (extremely unlikely)
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
+ ("Audio conduit failed (%d) to return data @ %" PRId64
+ " (desired %" PRId64 " -> %f)",
+ err, mPlayedTicks, aDesiredTime,
+ mSource->TrackTimeToSeconds(aDesiredTime)));
+ constexpr size_t mono = 1;
+ mAudioFrame->UpdateFrame(
+ mAudioFrame->timestamp_, nullptr, samplesPer10ms, mRate,
+ mAudioFrame->speech_type_, mAudioFrame->vad_activity_,
+ std::max(mono, mAudioFrame->num_channels()));
+ }
+
+ MOZ_LOG(
+ gMediaPipelineLog, LogLevel::Debug,
+ ("Audio conduit returned buffer for %zu channels, %zu frames",
+ mAudioFrame->num_channels(), mAudioFrame->samples_per_channel()));
+
+ AudioSegment segment;
+ if (mForceSilence || mAudioFrame->muted()) {
+ segment.AppendNullData(mAudioFrame->samples_per_channel());
+ } else {
+ CheckedInt<size_t> bufferSize(sizeof(uint16_t));
+ bufferSize *= mAudioFrame->samples_per_channel();
+ bufferSize *= mAudioFrame->num_channels();
+ RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize);
+ int16_t* samplesData = static_cast<int16_t*>(samples->Data());
+ AutoTArray<int16_t*, 2> channels;
+ AutoTArray<const int16_t*, 2> outputChannels;
+
+ channels.SetLength(mAudioFrame->num_channels());
+
+ size_t offset = 0;
+ for (size_t i = 0; i < mAudioFrame->num_channels(); i++) {
+ channels[i] = samplesData + offset;
+ offset += mAudioFrame->samples_per_channel();
+ }
+
+ DeinterleaveAndConvertBuffer(
+ mAudioFrame->data(), mAudioFrame->samples_per_channel(),
+ mAudioFrame->num_channels(), channels.Elements());
+
+ outputChannels.AppendElements(channels);
+
+ segment.AppendFrames(samples.forget(), outputChannels,
+ mAudioFrame->samples_per_channel(),
+ mPrincipalHandle);
+ }
+
+ // Handle track not actually added yet or removed/finished
+ if (TrackTime appended = mSource->AppendData(&segment)) {
+ mPlayedTicks += appended;
+ } else {
+ MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed"));
+ // we can't un-read the data, but that's ok since we don't want to
+ // buffer - but don't i-loop!
+ break;
+ }
+ }
+ }
+
+ const RefPtr<MediaSessionConduit> mConduit;
+ // This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and
+ // tries to be the same as the graph rate. If the graph rate is higher than
+ // 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate,
+ // audio is resampled to the graph rate.
+ const TrackRate mRate;
+ const RefPtr<TaskQueue> mTaskQueue;
+ // Number of frames of data that has been added to the SourceMediaTrack in
+ // the graph's rate. Graph thread only.
+ TrackTicks mPlayedTicks;
+ // Allocation of an audio frame used as a scratch buffer when reading data out
+ // of libwebrtc for forwarding into the graph. Graph thread only.
+ std::unique_ptr<webrtc::AudioFrame> mAudioFrame;
+ // Principal handle used when appending data to the SourceMediaTrack. Graph
+ // thread only.
+ PrincipalHandle mPrincipalHandle;
+ // Set to true on the sts thread if privacy is requested when ALPN was
+ // negotiated. Set to false again when mPrincipalHandle is private.
+ Atomic<bool> mForceSilence;
+};
+
+MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
+ const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<AudioSessionConduit> aConduit,
+ const RefPtr<dom::MediaStreamTrack>& aTrack,
+ const PrincipalHandle& aPrincipalHandle)
+ : MediaPipelineReceive(aPc, std::move(aTransportHandler),
+ std::move(aCallThread), std::move(aStsThread),
+ std::move(aConduit)),
+ mListener(aTrack
+ ? new PipelineListener(aTrack, mConduit, aPrincipalHandle)
+ : nullptr),
+ mWatchManager(this, AbstractThread::MainThread()) {
+ mDescription = mPc + "| Receive audio";
+ if (mListener) {
+ mListener->Init();
+ }
+ mWatchManager.Watch(mActive, &MediaPipelineReceiveAudio::UpdateListener);
+}
+
+void MediaPipelineReceiveAudio::Shutdown() {
+ MOZ_ASSERT(NS_IsMainThread());
+ MediaPipeline::Shutdown();
+ mWatchManager.Shutdown();
+ if (mListener) {
+ mListener->EndTrack();
+ }
+}
+
+void MediaPipelineReceiveAudio::MakePrincipalPrivate_s() {
+ ASSERT_ON_THREAD(mStsThread);
+ if (mListener) {
+ mListener->MakePrincipalPrivate_s();
+ }
+}
+
+void MediaPipelineReceiveAudio::OnRtpPacketReceived() {
+ ASSERT_ON_THREAD(mStsThread);
+ if (mListener) {
+ mListener->OnRtpReceived();
+ }
+}
+
+void MediaPipelineReceiveAudio::UpdateListener() {
+ MOZ_ASSERT(NS_IsMainThread());
+ if (mListener) {
+ mListener->SetEnabled(mActive.Ref());
+ }
+}
+
+class MediaPipelineReceiveVideo::PipelineListener
+ : public GenericReceiveListener {
+ public:
+ PipelineListener(const RefPtr<dom::MediaStreamTrack>& aTrack,
+ const PrincipalHandle& aPrincipalHandle)
+ : GenericReceiveListener(aTrack),
+ mImageContainer(
+ MakeAndAddRef<ImageContainer>(ImageContainer::ASYNCHRONOUS)),
+ mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"),
+ mPrincipalHandle(aPrincipalHandle) {}
+
+ void Init() { mSource->AddListener(this); }
+
+ void MakePrincipalPrivate_s() {
+ {
+ MutexAutoLock lock(mMutex);
+ mForceDropFrames = true;
+ }
+
+ GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction(
+ __func__, [self = RefPtr<PipelineListener>(this), this] {
+ RefPtr<nsIPrincipal> privatePrincipal =
+ NullPrincipal::CreateWithInheritedAttributes(
+ mTrackSource->GetPrincipal());
+ mTrackSource->SetPrincipal(privatePrincipal);
+
+ MutexAutoLock lock(mMutex);
+ mPrincipalHandle = MakePrincipalHandle(privatePrincipal);
+ mForceDropFrames = false;
+ }));
+ }
+
+ void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
+ uint32_t aTimeStamp, int64_t aRenderTime) {
+ PrincipalHandle principal;
+ {
+ MutexAutoLock lock(mMutex);
+ if (mForceDropFrames) {
+ return;
+ }
+ principal = mPrincipalHandle;
+ }
+ RefPtr<Image> image;
+ if (aBuffer.type() == webrtc::VideoFrameBuffer::Type::kNative) {
+ // We assume that only native handles are used with the
+ // WebrtcMediaDataCodec decoder.
+ const ImageBuffer* imageBuffer =
+ static_cast<const ImageBuffer*>(&aBuffer);
+ image = imageBuffer->GetNativeImage();
+ } else {
+ MOZ_ASSERT(aBuffer.type() == webrtc::VideoFrameBuffer::Type::kI420);
+ rtc::scoped_refptr<const webrtc::I420BufferInterface> i420(
+ aBuffer.GetI420());
+
+ MOZ_ASSERT(i420->DataY());
+ // Create a video frame using |buffer|.
+ PerformanceRecorder<CopyVideoStage> rec(
+ "MediaPipelineReceiveVideo::CopyToImage"_ns, mTrackingId,
+ i420->width(), i420->height());
+
+ RefPtr<PlanarYCbCrImage> yuvImage =
+ mImageContainer->CreatePlanarYCbCrImage();
+
+ PlanarYCbCrData yuvData;
+ yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY());
+ yuvData.mYStride = i420->StrideY();
+ MOZ_ASSERT(i420->StrideU() == i420->StrideV());
+ yuvData.mCbCrStride = i420->StrideU();
+ yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU());
+ yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV());
+ yuvData.mPictureRect = IntRect(0, 0, i420->width(), i420->height());
+ yuvData.mStereoMode = StereoMode::MONO;
+ // This isn't the best default.
+ yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601;
+ yuvData.mChromaSubsampling =
+ gfx::ChromaSubsampling::HALF_WIDTH_AND_HEIGHT;
+
+ if (!yuvImage->CopyData(yuvData)) {
+ MOZ_ASSERT(false);
+ return;
+ }
+ rec.Record();
+
+ image = std::move(yuvImage);
+ }
+
+ VideoSegment segment;
+ auto size = image->GetSize();
+ segment.AppendFrame(image.forget(), size, principal);
+ mSource->AppendData(&segment);
+ }
+
+ private:
+ RefPtr<layers::ImageContainer> mImageContainer;
+ Mutex mMutex MOZ_UNANNOTATED; // Protects the below members.
+ PrincipalHandle mPrincipalHandle;
+ // Set to true on the sts thread if privacy is requested when ALPN was
+ // negotiated. Set to false again when mPrincipalHandle is private.
+ bool mForceDropFrames = false;
+};
+
+class MediaPipelineReceiveVideo::PipelineRenderer
+ : public mozilla::VideoRenderer {
+ public:
+ explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
+ : mPipeline(aPipeline) {}
+
+ void Detach() { mPipeline = nullptr; }
+
+ // Implement VideoRenderer
+ void FrameSizeChange(unsigned int aWidth, unsigned int aHeight) override {}
+ void RenderVideoFrame(const webrtc::VideoFrameBuffer& aBuffer,
+ uint32_t aTimeStamp, int64_t aRenderTime) override {
+ mPipeline->mListener->RenderVideoFrame(aBuffer, aTimeStamp, aRenderTime);
+ }
+
+ private:
+ MediaPipelineReceiveVideo* mPipeline; // Raw pointer to avoid cycles
+};
+
+MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
+ const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<VideoSessionConduit> aConduit,
+ const RefPtr<dom::MediaStreamTrack>& aTrack,
+ const PrincipalHandle& aPrincipalHandle)
+ : MediaPipelineReceive(aPc, std::move(aTransportHandler),
+ std::move(aCallThread), std::move(aStsThread),
+ std::move(aConduit)),
+ mRenderer(new PipelineRenderer(this)),
+ mListener(aTrack ? new PipelineListener(aTrack, aPrincipalHandle)
+ : nullptr),
+ mWatchManager(this, AbstractThread::MainThread()) {
+ mDescription = mPc + "| Receive video";
+ if (mListener) {
+ mListener->Init();
+ }
+ static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer);
+ mWatchManager.Watch(mActive, &MediaPipelineReceiveVideo::UpdateListener);
+}
+
+void MediaPipelineReceiveVideo::Shutdown() {
+ MOZ_ASSERT(NS_IsMainThread());
+ MediaPipeline::Shutdown();
+ mWatchManager.Shutdown();
+
+ // stop generating video and thus stop invoking the PipelineRenderer
+ // and PipelineListener - the renderer has a raw ptr to the Pipeline to
+ // avoid cycles, and the render callbacks are invoked from a different
+ // thread so simple null-checks would cause TSAN bugs without locks.
+ static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
+ if (mListener) {
+ mListener->EndTrack();
+ }
+}
+
+void MediaPipelineReceiveVideo::MakePrincipalPrivate_s() {
+ ASSERT_ON_THREAD(mStsThread);
+ if (mListener) {
+ mListener->MakePrincipalPrivate_s();
+ }
+}
+
+void MediaPipelineReceiveVideo::OnRtpPacketReceived() {
+ ASSERT_ON_THREAD(mStsThread);
+ if (mListener) {
+ mListener->OnRtpReceived();
+ }
+}
+
+void MediaPipelineReceiveVideo::UpdateListener() {
+ MOZ_ASSERT(NS_IsMainThread());
+ if (mListener) {
+ mListener->SetEnabled(mActive.Ref());
+ }
+}
+
+const dom::RTCStatsTimestampMaker& MediaPipeline::GetTimestampMaker() const {
+ return mConduit->GetTimestampMaker();
+}
+
+DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
+ const DOMHighResTimeStamp aTime) {
+ // DOMHighResTimeStamp is a unit measured in ms
+ return aTime + EXPIRY_TIME_MILLISECONDS;
+}
+
+MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
+ const DOMHighResTimeStamp aTime)
+ : mCsrc(aCsrc), mTimestamp(aTime) {}
+
+void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
+ dom::RTCRTPContributingSourceStats& aWebidlObj,
+ const nsString& aInboundRtpStreamId) const {
+ nsString statId = u"csrc_"_ns + aInboundRtpStreamId;
+ statId.AppendLiteral("_");
+ statId.AppendInt(mCsrc);
+ aWebidlObj.mId.Construct(statId);
+ aWebidlObj.mType.Construct(RTCStatsType::Csrc);
+ aWebidlObj.mTimestamp.Construct(mTimestamp);
+ aWebidlObj.mContributorSsrc.Construct(mCsrc);
+ aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
+}
+
+} // namespace mozilla
diff --git a/dom/media/webrtc/transportbridge/MediaPipeline.h b/dom/media/webrtc/transportbridge/MediaPipeline.h
new file mode 100644
index 0000000000..6fe3f5f287
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/MediaPipeline.h
@@ -0,0 +1,425 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: ekr@rtfm.com
+
+#ifndef mediapipeline_h__
+#define mediapipeline_h__
+
+#include <map>
+
+#include "transport/sigslot.h"
+#include "transport/transportlayer.h" // For TransportLayer::State
+
+#include "libwebrtcglue/MediaConduitInterface.h"
+#include "mozilla/ReentrantMonitor.h"
+#include "mozilla/Atomics.h"
+#include "mozilla/StateMirroring.h"
+#include "transport/mediapacket.h"
+#include "transport/runnable_utils.h"
+#include "AudioPacketizer.h"
+#include "MediaEventSource.h"
+#include "MediaPipelineFilter.h"
+#include "MediaSegment.h"
+#include "PrincipalChangeObserver.h"
+#include "jsapi/PacketDumper.h"
+
+#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
+
+// Should come from MediaEngine.h, but that's a pain to include here
+// because of the MOZILLA_EXTERNAL_LINKAGE stuff.
+#define WEBRTC_MAX_SAMPLE_RATE 48000
+
+class nsIPrincipal;
+
+namespace mozilla {
+class AudioProxyThread;
+class MediaInputPort;
+class MediaPipelineFilter;
+class MediaTransportHandler;
+class PeerIdentity;
+class ProcessedMediaTrack;
+class SourceMediaTrack;
+class VideoFrameConverter;
+
+namespace dom {
+class MediaStreamTrack;
+struct RTCRTPContributingSourceStats;
+} // namespace dom
+
+// A class that represents the pipeline of audio and video
+// The dataflow looks like:
+//
+// TRANSMIT
+// CaptureDevice -> stream -> [us] -> conduit -> [us] -> transport -> network
+//
+// RECEIVE
+// network -> transport -> [us] -> conduit -> [us] -> stream -> Playout
+//
+// The boxes labeled [us] are just bridge logic implemented in this class
+//
+// We have to deal with a number of threads:
+//
+// GSM:
+// * Assembles the pipeline
+// SocketTransportService
+// * Receives notification that ICE and DTLS have completed
+// * Processes incoming network data and passes it to the conduit
+// * Processes outgoing RTP and RTCP
+// MediaTrackGraph
+// * Receives outgoing data from the MediaTrackGraph
+// * Receives pull requests for more data from the
+// MediaTrackGraph
+// One or another GIPS threads
+// * Receives RTCP messages to send to the other side
+// * Processes video frames GIPS wants to render
+//
+// For a transmitting conduit, "output" is RTP and "input" is RTCP.
+// For a receiving conduit, "input" is RTP and "output" is RTCP.
+//
+
+class MediaPipeline : public sigslot::has_slots<> {
+ public:
+ enum class DirectionType { TRANSMIT, RECEIVE };
+ MediaPipeline(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ DirectionType aDirection, RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<MediaSessionConduit> aConduit);
+
+ void Start();
+ void Stop();
+
+ void SetLevel(size_t aLevel) { mLevel = aLevel; }
+
+ // Main thread shutdown.
+ virtual void Shutdown();
+
+ void UpdateTransport_m(const std::string& aTransportId,
+ UniquePtr<MediaPipelineFilter>&& aFilter);
+
+ void UpdateTransport_s(const std::string& aTransportId,
+ UniquePtr<MediaPipelineFilter>&& aFilter);
+
+ virtual DirectionType Direction() const { return mDirection; }
+ size_t Level() const { return mLevel; }
+ virtual bool IsVideo() const = 0;
+
+ class RtpCSRCStats {
+ public:
+ // Gets an expiration time for CRC info given a reference time,
+ // this reference time would normally be the time of calling.
+ // This value can then be used to check if a RtpCSRCStats
+ // has expired via Expired(...)
+ static DOMHighResTimeStamp GetExpiryFromTime(
+ const DOMHighResTimeStamp aTime);
+
+ RtpCSRCStats(const uint32_t aCsrc, const DOMHighResTimeStamp aTime);
+ ~RtpCSRCStats() = default;
+ // Initialize a webidl representation suitable for adding to a report.
+ // This assumes that the webidl object is empty.
+ // @param aWebidlObj the webidl binding object to popluate
+ // @param aInboundRtpStreamId the associated RTCInboundRTPStreamStats.id
+ void GetWebidlInstance(dom::RTCRTPContributingSourceStats& aWebidlObj,
+ const nsString& aInboundRtpStreamId) const;
+ void SetTimestamp(const DOMHighResTimeStamp aTime) { mTimestamp = aTime; }
+ // Check if the RtpCSRCStats has expired, checks against a
+ // given expiration time.
+ bool Expired(const DOMHighResTimeStamp aExpiry) const {
+ return mTimestamp < aExpiry;
+ }
+
+ private:
+ static const double constexpr EXPIRY_TIME_MILLISECONDS = 10 * 1000;
+ const uint32_t mCsrc;
+ DOMHighResTimeStamp mTimestamp;
+ };
+
+ // Gets the gathered contributing source stats for the last expiration period.
+ // @param aId the stream id to use for populating inboundRtpStreamId field
+ // @param aArr the array to append the stats objects to
+ void GetContributingSourceStats(
+ const nsString& aInboundRtpStreamId,
+ FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const;
+
+ int32_t RtpPacketsSent() const { return mRtpPacketsSent; }
+ int64_t RtpBytesSent() const { return mRtpBytesSent; }
+ int32_t RtcpPacketsSent() const { return mRtcpPacketsSent; }
+ int32_t RtpPacketsReceived() const { return mRtpPacketsReceived; }
+ int64_t RtpBytesReceived() const { return mRtpBytesReceived; }
+ int32_t RtcpPacketsReceived() const { return mRtcpPacketsReceived; }
+
+ const dom::RTCStatsTimestampMaker& GetTimestampMaker() const;
+
+ // Thread counting
+ NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline)
+
+ protected:
+ virtual ~MediaPipeline();
+
+ // The transport is ready
+ virtual void TransportReady_s() {}
+
+ void IncrementRtpPacketsSent(const MediaPacket& aPacket);
+ void IncrementRtcpPacketsSent();
+ void IncrementRtpPacketsReceived(int aBytes);
+ virtual void OnRtpPacketReceived() {}
+ void IncrementRtcpPacketsReceived();
+
+ virtual void SendPacket(MediaPacket&& packet);
+
+ // Process slots on transports
+ void RtpStateChange(const std::string& aTransportId, TransportLayer::State);
+ void RtcpStateChange(const std::string& aTransportId, TransportLayer::State);
+ virtual void CheckTransportStates();
+ void PacketReceived(const std::string& aTransportId,
+ const MediaPacket& packet);
+ void AlpnNegotiated(const std::string& aAlpn, bool aPrivacyRequested);
+
+ void RtpPacketReceived(const MediaPacket& packet);
+ void RtcpPacketReceived(const MediaPacket& packet);
+
+ void EncryptedPacketSending(const std::string& aTransportId,
+ const MediaPacket& aPacket);
+
+ void SetDescription_s(const std::string& description);
+
+ // Called when ALPN is negotiated and is requesting privacy, so receive
+ // pipelines do not enter data into the graph under a content principal.
+ virtual void MakePrincipalPrivate_s() {}
+
+ public:
+ const RefPtr<MediaSessionConduit> mConduit;
+ const DirectionType mDirection;
+
+ // Pointers to the threads we need. Initialized at creation
+ // and used all over the place.
+ const RefPtr<AbstractThread> mCallThread;
+ const RefPtr<nsISerialEventTarget> mStsThread;
+
+ protected:
+ // True if we should be actively transmitting or receiving data. Main thread
+ // only.
+ Watchable<bool> mActive;
+ Atomic<size_t> mLevel;
+ std::string mTransportId;
+ const RefPtr<MediaTransportHandler> mTransportHandler;
+
+ TransportLayer::State mRtpState = TransportLayer::TS_NONE;
+ TransportLayer::State mRtcpState = TransportLayer::TS_NONE;
+ bool mSignalsConnected = false;
+
+ // Only safe to access from STS thread.
+ int32_t mRtpPacketsSent;
+ int32_t mRtcpPacketsSent;
+ int32_t mRtpPacketsReceived;
+ int32_t mRtcpPacketsReceived;
+ int64_t mRtpBytesSent;
+ int64_t mRtpBytesReceived;
+
+ // Only safe to access from STS thread.
+ std::map<uint32_t, RtpCSRCStats> mCsrcStats;
+
+ // Written in c'tor. Read on STS and main thread.
+ const std::string mPc;
+
+ // String describing this MediaPipeline for logging purposes. Only safe to
+ // access from STS thread.
+ std::string mDescription;
+
+ // Written in c'tor, all following accesses are on the STS thread.
+ UniquePtr<MediaPipelineFilter> mFilter;
+ const UniquePtr<webrtc::RtpHeaderExtensionMap> mRtpHeaderExtensionMap;
+
+ RefPtr<PacketDumper> mPacketDumper;
+
+ MediaEventProducerExc<MediaPacket, webrtc::RTPHeader> mRtpReceiveEvent;
+ MediaEventProducerExc<MediaPacket> mSenderRtcpReceiveEvent;
+ MediaEventProducerExc<MediaPacket> mReceiverRtcpReceiveEvent;
+
+ MediaEventListener mRtpSendEventListener;
+ MediaEventListener mSenderRtcpSendEventListener;
+ MediaEventListener mReceiverRtcpSendEventListener;
+
+ private:
+ bool IsRtp(const unsigned char* aData, size_t aLen) const;
+ // Must be called on the STS thread. Must be called after Shutdown().
+ void DetachTransport_s();
+};
+
+// A specialization of pipeline for reading from an input device
+// and transmitting to the network.
+class MediaPipelineTransmit
+ : public MediaPipeline,
+ public dom::PrincipalChangeObserver<dom::MediaStreamTrack> {
+ public:
+ // Set aRtcpTransport to nullptr to use rtcp-mux
+ MediaPipelineTransmit(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread, bool aIsVideo,
+ RefPtr<MediaSessionConduit> aConduit);
+
+ void Shutdown() override;
+
+ bool Transmitting() const;
+
+ // written and used from MainThread
+ bool IsVideo() const override;
+
+ // When the principal of the domtrack changes, it calls through to here
+ // so that we can determine whether to enable track transmission.
+ // In cases where the peer isn't yet identified, we disable the pipeline (not
+ // the stream, that would potentially affect others), so that it sends
+ // black/silence. Once the peer is identified, re-enable those streams.
+ virtual void UpdateSinkIdentity(nsIPrincipal* aPrincipal,
+ const PeerIdentity* aSinkIdentity);
+
+ // for monitoring changes in track ownership
+ void PrincipalChanged(dom::MediaStreamTrack* aTrack) override;
+
+ // Override MediaPipeline::TransportReady_s.
+ void TransportReady_s() override;
+
+ // Replace a track with a different one.
+ nsresult SetTrack(RefPtr<dom::MediaStreamTrack> aDomTrack);
+
+ // Used to correlate stats
+ RefPtr<dom::MediaStreamTrack> GetTrack() const;
+
+ // For test use only. This allows a send track to be set without a
+ // corresponding dom track.
+ void SetSendTrackOverride(RefPtr<ProcessedMediaTrack> aSendTrack);
+
+ // Separate classes to allow ref counting
+ class PipelineListener;
+ class VideoFrameFeeder;
+
+ protected:
+ ~MediaPipelineTransmit();
+
+ // Updates mDescription (async) with information about the track we are
+ // transmitting.
+ std::string GenerateDescription() const;
+
+ // Sets up mSendPort and mSendTrack to feed mConduit if we are transmitting
+ // and have a dom track but no send track. Main thread only.
+ void UpdateSendState();
+
+ private:
+ WatchManager<MediaPipelineTransmit> mWatchManager;
+ const bool mIsVideo;
+ const RefPtr<PipelineListener> mListener;
+ RefPtr<AudioProxyThread> mAudioProcessing;
+ RefPtr<VideoFrameConverter> mConverter;
+ MediaEventListener mFrameListener;
+ Watchable<RefPtr<dom::MediaStreamTrack>> mDomTrack;
+ // Input port connecting mDomTrack's MediaTrack to mSendTrack.
+ RefPtr<MediaInputPort> mSendPort;
+ // The source track of the mSendTrack. Main thread only.
+ RefPtr<ProcessedMediaTrack> mSendPortSource;
+ // True if a parameter affecting mDescription has changed. To avoid updating
+ // the description unnecessarily. Main thread only.
+ bool mDescriptionInvalidated = true;
+ // Set true once we trigger the async removal of mSendTrack. Set false once
+ // the async removal is done. Main thread only.
+ bool mUnsettingSendTrack = false;
+ // MediaTrack that we send over the network. This allows changing mDomTrack.
+ // Because changing mSendTrack is async and can be racy (when changing from a
+ // track in one graph to a track in another graph), it is set very strictly.
+ // If mSendTrack is null it can be set by UpdateSendState().
+ // If it is non-null it can only be set to null, and only by the
+ // RemoveListener MozPromise handler, as seen in UpdateSendState.
+ RefPtr<ProcessedMediaTrack> mSendTrack;
+ // When this is set and we are active, this track will be used as mSendTrack.
+ // Allows unittests to insert a send track without requiring a dom track or a
+ // graph. Main thread only.
+ Watchable<RefPtr<ProcessedMediaTrack>> mSendTrackOverride;
+ // True when mSendTrack is set, not destroyed and mActive is true. mListener
+ // is attached to mSendTrack when this is true. Main thread only.
+ bool mTransmitting = false;
+};
+
+// A specialization of pipeline for reading from the network and
+// rendering media.
+class MediaPipelineReceive : public MediaPipeline {
+ public:
+ // Set aRtcpTransport to nullptr to use rtcp-mux
+ MediaPipelineReceive(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<MediaSessionConduit> aConduit);
+
+ protected:
+ ~MediaPipelineReceive();
+};
+
+// A specialization of pipeline for reading from the network and
+// rendering audio.
+class MediaPipelineReceiveAudio : public MediaPipelineReceive {
+ public:
+ MediaPipelineReceiveAudio(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<AudioSessionConduit> aConduit,
+ const RefPtr<dom::MediaStreamTrack>& aTrack,
+ const PrincipalHandle& aPrincipalHandle);
+
+ void Shutdown() override;
+
+ bool IsVideo() const override { return false; }
+
+ void MakePrincipalPrivate_s() override;
+
+ void OnRtpPacketReceived() override;
+
+ private:
+ void UpdateListener();
+
+ // Separate class to allow ref counting
+ class PipelineListener;
+
+ const RefPtr<PipelineListener> mListener;
+ WatchManager<MediaPipelineReceiveAudio> mWatchManager;
+};
+
+// A specialization of pipeline for reading from the network and
+// rendering video.
+class MediaPipelineReceiveVideo : public MediaPipelineReceive {
+ public:
+ MediaPipelineReceiveVideo(const std::string& aPc,
+ RefPtr<MediaTransportHandler> aTransportHandler,
+ RefPtr<AbstractThread> aCallThread,
+ RefPtr<nsISerialEventTarget> aStsThread,
+ RefPtr<VideoSessionConduit> aConduit,
+ const RefPtr<dom::MediaStreamTrack>& aTrack,
+ const PrincipalHandle& aPrincipalHandle);
+
+ void Shutdown() override;
+
+ bool IsVideo() const override { return true; }
+
+ void MakePrincipalPrivate_s() override;
+
+ void OnRtpPacketReceived() override;
+
+ private:
+ void UpdateListener();
+
+ class PipelineRenderer;
+ friend class PipelineRenderer;
+
+ // Separate class to allow ref counting
+ class PipelineListener;
+
+ const RefPtr<PipelineRenderer> mRenderer;
+ const RefPtr<PipelineListener> mListener;
+ WatchManager<MediaPipelineReceiveVideo> mWatchManager;
+};
+
+} // namespace mozilla
+#endif
diff --git a/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp b/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp
new file mode 100644
index 0000000000..1acb73e9f2
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/MediaPipelineFilter.cpp
@@ -0,0 +1,153 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: softtabstop=2:shiftwidth=2:expandtab
+ * */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: bcampen@mozilla.com
+
+#include "MediaPipelineFilter.h"
+
+#include "api/rtp_headers.h"
+#include "api/rtp_parameters.h"
+#include "mozilla/Logging.h"
+
+// defined in MediaPipeline.cpp
+extern mozilla::LazyLogModule gMediaPipelineLog;
+
+#define DEBUG_LOG(x) MOZ_LOG(gMediaPipelineLog, LogLevel::Debug, x)
+
+namespace mozilla {
+MediaPipelineFilter::MediaPipelineFilter(
+ const std::vector<webrtc::RtpExtension>& aExtMap)
+ : mExtMap(aExtMap) {}
+
+void MediaPipelineFilter::SetRemoteMediaStreamId(
+ const Maybe<std::string>& aMid) {
+ if (aMid != mRemoteMid) {
+ DEBUG_LOG(("MediaPipelineFilter added new remote RTP MID: '%s'.",
+ aMid.valueOr("").c_str()));
+ mRemoteMid = aMid;
+ mRemoteMidBindings.clear();
+ }
+}
+
+bool MediaPipelineFilter::Filter(const webrtc::RTPHeader& header) {
+ DEBUG_LOG(("MediaPipelineFilter inspecting seq# %u SSRC: %u",
+ header.sequenceNumber, header.ssrc));
+
+ auto fromStreamId = [](const std::string& aId) {
+ return Maybe<std::string>(aId.empty() ? Nothing() : Some(aId));
+ };
+
+ //
+ // MID Based Filtering
+ //
+
+ const auto mid = fromStreamId(header.extension.mid);
+
+ // Check to see if a bound SSRC is moved to a new MID
+ if (mRemoteMidBindings.count(header.ssrc) == 1 && mid && mRemoteMid != mid) {
+ mRemoteMidBindings.erase(header.ssrc);
+ }
+ // Bind an SSRC if a matching MID is found
+ if (mid && mRemoteMid == mid) {
+ DEBUG_LOG(("MediaPipelineFilter learned SSRC: %u for MID: '%s'",
+ header.ssrc, mRemoteMid.value().c_str()));
+ mRemoteMidBindings.insert(header.ssrc);
+ }
+ // Check for matching MID
+ if (!mRemoteMidBindings.empty()) {
+ MOZ_ASSERT(mRemoteMid != Nothing());
+ if (mRemoteMidBindings.count(header.ssrc) == 1) {
+ DEBUG_LOG(
+ ("MediaPipelineFilter SSRC: %u matched for MID: '%s'."
+ " passing packet",
+ header.ssrc, mRemoteMid.value().c_str()));
+ return true;
+ }
+ DEBUG_LOG(
+ ("MediaPipelineFilter SSRC: %u did not match bound SSRC(s) for"
+ " MID: '%s'. ignoring packet",
+ header.ssrc, mRemoteMid.value().c_str()));
+ for (const uint32_t ssrc : mRemoteMidBindings) {
+ DEBUG_LOG(("MID %s is associated with SSRC: %u",
+ mRemoteMid.value().c_str(), ssrc));
+ }
+ return false;
+ }
+
+ //
+ // RTP-STREAM-ID based filtering (for tests only)
+ //
+
+ //
+ // Remote SSRC based filtering
+ //
+
+ if (remote_ssrc_set_.count(header.ssrc)) {
+ DEBUG_LOG(
+ ("MediaPipelineFilter SSRC: %u matched remote SSRC set."
+ " passing packet",
+ header.ssrc));
+ return true;
+ }
+ DEBUG_LOG(
+ ("MediaPipelineFilter SSRC: %u did not match any of %zu"
+ " remote SSRCS.",
+ header.ssrc, remote_ssrc_set_.size()));
+
+ //
+ // PT, payload type, last ditch effort filtering
+ //
+
+ if (payload_type_set_.count(header.payloadType)) {
+ DEBUG_LOG(
+ ("MediaPipelineFilter payload-type: %u matched %zu"
+ " unique payload type. learning ssrc. passing packet",
+ header.ssrc, remote_ssrc_set_.size()));
+ // Actual match. We need to update the ssrc map so we can route rtcp
+ // sender reports correctly (these use a different payload-type field)
+ AddRemoteSSRC(header.ssrc);
+ return true;
+ }
+ DEBUG_LOG(
+ ("MediaPipelineFilter payload-type: %u did not match any of %zu"
+ " unique payload-types.",
+ header.payloadType, payload_type_set_.size()));
+ DEBUG_LOG(
+ ("MediaPipelineFilter packet failed to match any criteria."
+ " ignoring packet"));
+ return false;
+}
+
+void MediaPipelineFilter::AddRemoteSSRC(uint32_t ssrc) {
+ remote_ssrc_set_.insert(ssrc);
+}
+
+void MediaPipelineFilter::AddUniquePT(uint8_t payload_type) {
+ payload_type_set_.insert(payload_type);
+}
+
+void MediaPipelineFilter::Update(const MediaPipelineFilter& filter_update) {
+ // We will not stomp the remote_ssrc_set_ if the update has no ssrcs,
+ // because we don't want to unlearn any remote ssrcs unless the other end
+ // has explicitly given us a new set.
+ if (!filter_update.remote_ssrc_set_.empty()) {
+ remote_ssrc_set_ = filter_update.remote_ssrc_set_;
+ }
+ // We don't want to overwrite the learned binding unless we have changed MIDs
+ // or the update contains a MID binding.
+ if (!filter_update.mRemoteMidBindings.empty() ||
+ (filter_update.mRemoteMid && filter_update.mRemoteMid != mRemoteMid)) {
+ mRemoteMid = filter_update.mRemoteMid;
+ mRemoteMidBindings = filter_update.mRemoteMidBindings;
+ }
+ payload_type_set_ = filter_update.payload_type_set_;
+
+ // Use extmapping from new filter
+ mExtMap = filter_update.mExtMap;
+}
+
+} // end namespace mozilla
diff --git a/dom/media/webrtc/transportbridge/MediaPipelineFilter.h b/dom/media/webrtc/transportbridge/MediaPipelineFilter.h
new file mode 100644
index 0000000000..9b40bceda8
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/MediaPipelineFilter.h
@@ -0,0 +1,89 @@
+/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* vim: softtabstop=2:shiftwidth=2:expandtab
+ * */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: bcampen@mozilla.com
+
+#ifndef mediapipelinefilter_h__
+#define mediapipelinefilter_h__
+
+#include <cstddef>
+#include <stdint.h>
+#include <string>
+
+#include <set>
+#include <vector>
+
+#include "mozilla/Maybe.h"
+
+namespace webrtc {
+struct RTPHeader;
+struct RtpExtension;
+} // namespace webrtc
+
+namespace mozilla {
+
+// TODO @@NG update documentation after initial review
+
+// A class that handles the work of filtering RTP packets that arrive at a
+// MediaPipeline. This is primarily important for the use of BUNDLE (ie;
+// multiple m-lines share the same RTP stream). There are three ways that this
+// can work;
+//
+// 1) In our SDP, we include a media-level extmap parameter with a unique
+// integer of our choosing, with the hope that the other side will include
+// this value in a header in the first few RTP packets it sends us. This
+// allows us to perform correlation in cases where the other side has not
+// informed us of the ssrcs it will be sending (either because it did not
+// include them in its SDP, or their SDP has not arrived yet)
+// and also gives us the opportunity to learn SSRCs from packets so adorned.
+//
+// 2) If the remote endpoint includes SSRC media-level attributes in its SDP,
+// we can simply use this information to populate the filter. The only
+// shortcoming here is when RTP packets arrive before the answer does. See
+// above.
+//
+// 3) As a fallback, we can try to use payload type IDs to perform correlation,
+// but only when the type id is unique to this media section.
+// This too allows us to learn about SSRCs (mostly useful for filtering
+// sender reports later).
+class MediaPipelineFilter {
+ public:
+ MediaPipelineFilter() = default;
+ explicit MediaPipelineFilter(
+ const std::vector<webrtc::RtpExtension>& aExtMap);
+
+ // Checks whether this packet passes the filter, possibly updating the filter
+ // in the process (if the MID or payload types are used, they can teach
+ // the filter about ssrcs)
+ bool Filter(const webrtc::RTPHeader& header);
+
+ void AddRemoteSSRC(uint32_t ssrc);
+
+ void SetRemoteMediaStreamId(const Maybe<std::string>& aMid);
+
+ // When a payload type id is unique to our media section, add it here.
+ void AddUniquePT(uint8_t payload_type);
+
+ void Update(const MediaPipelineFilter& filter_update);
+
+ std::vector<webrtc::RtpExtension> GetExtmap() const { return mExtMap; }
+
+ private:
+ // The number of filters we manage here is quite small, so I am optimizing
+ // for readability.
+ std::set<uint32_t> remote_ssrc_set_;
+ std::set<uint8_t> payload_type_set_;
+ Maybe<std::string> mRemoteMid;
+ std::set<uint32_t> mRemoteMidBindings;
+ // RID extension can be set by tests and is sticky, the rest of
+ // the mapping is not.
+ std::vector<webrtc::RtpExtension> mExtMap;
+};
+
+} // end namespace mozilla
+
+#endif // mediapipelinefilter_h__
diff --git a/dom/media/webrtc/transportbridge/RtpLogger.cpp b/dom/media/webrtc/transportbridge/RtpLogger.cpp
new file mode 100644
index 0000000000..aac1fad197
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/RtpLogger.cpp
@@ -0,0 +1,67 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: nohlmeier@mozilla.com
+
+#include "RtpLogger.h"
+#include "mozilla/Logging.h"
+
+#include <ctime>
+#include <iomanip>
+#include <sstream>
+#ifdef _WIN32
+# include <time.h>
+# include <sys/timeb.h>
+#else
+# include <sys/time.h>
+#endif
+
+// Logging context
+using namespace mozilla;
+
+mozilla::LazyLogModule gRtpLoggerLog("RtpLogger");
+
+namespace mozilla {
+
+bool RtpLogger::IsPacketLoggingOn() {
+ return MOZ_LOG_TEST(gRtpLoggerLog, LogLevel::Debug);
+}
+
+void RtpLogger::LogPacket(const MediaPacket& packet, bool input,
+ std::string desc) {
+ if (MOZ_LOG_TEST(gRtpLoggerLog, LogLevel::Debug)) {
+ bool isRtp = (packet.type() == MediaPacket::RTP);
+ std::stringstream ss;
+ /* This creates text2pcap compatible format, e.g.:
+ * RTCP_PACKET O 10:36:26.864934 000000 80 c8 00 06 6d ...
+ */
+ ss << (input ? "I " : "O ");
+ std::time_t t = std::time(nullptr);
+ std::tm tm = *std::localtime(&t);
+ char buf[9];
+ if (0 < strftime(buf, sizeof(buf), "%H:%M:%S", &tm)) {
+ ss << buf;
+ }
+ ss << std::setfill('0');
+#ifdef _WIN32
+ struct timeb tb;
+ ftime(&tb);
+ ss << "." << (tb.millitm) << " ";
+#else
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ss << "." << (tv.tv_usec) << " ";
+#endif
+ ss << " 000000";
+ ss << std::hex << std::setfill('0');
+ for (size_t i = 0; i < packet.len(); ++i) {
+ ss << " " << std::setw(2) << (int)packet.data()[i];
+ }
+ MOZ_LOG(gRtpLoggerLog, LogLevel::Debug,
+ ("%s%s%s", desc.c_str(), (isRtp ? " RTP_PACKET " : " RTCP_PACKET "),
+ ss.str().c_str()));
+ }
+}
+
+} // namespace mozilla
diff --git a/dom/media/webrtc/transportbridge/RtpLogger.h b/dom/media/webrtc/transportbridge/RtpLogger.h
new file mode 100644
index 0000000000..fcfaede6e2
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/RtpLogger.h
@@ -0,0 +1,28 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this file,
+ * You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Original author: nohlmeier@mozilla.com
+
+#ifndef rtplogger_h__
+#define rtplogger_h__
+
+#include "transport/mediapacket.h"
+
+namespace mozilla {
+
+/* This class logs RTP and RTCP packets in hex in a format compatible to
+ * text2pcap.
+ * Example to convert the MOZ log file into a PCAP file:
+ * egrep '(RTP_PACKET|RTCP_PACKET)' moz.log | \
+ * text2pcap -D -n -l 1 -i 17 -u 1234,1235 -t '%H:%M:%S.' - rtp.pcap
+ */
+class RtpLogger {
+ public:
+ static bool IsPacketLoggingOn();
+ static void LogPacket(const MediaPacket& packet, bool input,
+ std::string desc);
+};
+
+} // namespace mozilla
+#endif
diff --git a/dom/media/webrtc/transportbridge/moz.build b/dom/media/webrtc/transportbridge/moz.build
new file mode 100644
index 0000000000..290dd26a0a
--- /dev/null
+++ b/dom/media/webrtc/transportbridge/moz.build
@@ -0,0 +1,27 @@
+# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*-
+# vim: set filetype=python:
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+include("/dom/media/webrtc/third_party_build/webrtc.mozbuild")
+
+LOCAL_INCLUDES += [
+ "!/ipc/ipdl/_ipdlheaders",
+ "/dom/media",
+ "/dom/media/webrtc",
+ "/ipc/chromium/src",
+ "/media/libyuv/libyuv/include",
+ "/media/webrtc",
+ "/third_party/libsrtp/src/crypto/include",
+ "/third_party/libsrtp/src/include",
+ "/third_party/libwebrtc",
+ "/third_party/libwebrtc/third_party/abseil-cpp",
+]
+
+UNIFIED_SOURCES += [
+ "MediaPipeline.cpp",
+ "MediaPipelineFilter.cpp",
+ "RtpLogger.cpp",
+]
+
+FINAL_LIBRARY = "xul"