diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/libwebrtc/rtc_tools/data_channel_benchmark | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/libwebrtc/rtc_tools/data_channel_benchmark')
8 files changed, 1194 insertions, 0 deletions
diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/BUILD.gn b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/BUILD.gn new file mode 100644 index 0000000000..040061b3e8 --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/BUILD.gn @@ -0,0 +1,63 @@ +# Copyright 2021 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import("//third_party/grpc/grpc_library.gni") +import("../../webrtc.gni") + +grpc_library("signaling_grpc_proto") { + testonly = true + sources = [ "peer_connection_signaling.proto" ] +} + +rtc_library("signaling_interface") { + sources = [ "signaling_interface.h" ] + deps = [ "../../api:libjingle_peerconnection_api" ] +} + +rtc_library("grpc_signaling") { + testonly = true + sources = [ + "grpc_signaling.cc", + "grpc_signaling.h", + ] + deps = [ + ":signaling_grpc_proto", + ":signaling_interface", + "../../api:libjingle_peerconnection_api", + "../../rtc_base:threading", + "//third_party/grpc:grpc++", + ] + + defines = [ "GPR_FORBID_UNREACHABLE_CODE=0" ] +} + +rtc_executable("data_channel_benchmark") { + testonly = true + sources = [ + "data_channel_benchmark.cc", + "peer_connection_client.cc", + "peer_connection_client.h", + ] + deps = [ + ":grpc_signaling", + ":signaling_interface", + "../../api:create_peerconnection_factory", + "../../api:libjingle_peerconnection_api", + "../../api:rtc_error", + "../../api:scoped_refptr", + "../../api/audio_codecs:builtin_audio_decoder_factory", + "../../api/audio_codecs:builtin_audio_encoder_factory", + "../../api/video_codecs:builtin_video_decoder_factory", + "../../api/video_codecs:builtin_video_encoder_factory", + "../../rtc_base:logging", + "../../rtc_base:refcount", + "../../rtc_base:rtc_event", + "../../rtc_base:ssl", + "../../rtc_base:threading", + "../../system_wrappers:field_trial", + "//third_party/abseil-cpp/absl/cleanup:cleanup", + "//third_party/abseil-cpp/absl/flags:flag", + "//third_party/abseil-cpp/absl/flags:parse", + ] +} diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc new file mode 100644 index 0000000000..33776f37aa --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + * + * Data Channel Benchmarking tool. + * + * Create a server using: ./data_channel_benchmark --server --port 12345 + * Start the flow of data from the server to a client using: + * ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196 + * The throughput is reported on the server console. + * + * The negotiation does not require a 3rd party server and is done over a gRPC + * transport. No TURN server is configured, so both peers need to be reachable + * using STUN only. + */ +#include <inttypes.h> + +#include <charconv> + +#include "absl/cleanup/cleanup.h" +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "rtc_base/event.h" +#include "rtc_base/ssl_adapter.h" +#include "rtc_base/thread.h" +#include "rtc_tools/data_channel_benchmark/grpc_signaling.h" +#include "rtc_tools/data_channel_benchmark/peer_connection_client.h" +#include "system_wrappers/include/field_trial.h" + +ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)"); +ABSL_FLAG(bool, server, false, "Server mode"); +ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client"); +ABSL_FLAG(std::string, address, "localhost", "Connect to server address"); +ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)"); +ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)"); +ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size"); +ABSL_FLAG(std::string, + force_fieldtrials, + "", + "Field trials control experimental feature code which can be forced. " + "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/" + " will assign the group Enable to field trial WebRTC-FooFeature."); + +struct SetupMessage { + size_t packet_size; + size_t transfer_size; + + std::string ToString() { + char buffer[64]; + rtc::SimpleStringBuilder sb(buffer); + sb << packet_size << "," << transfer_size; + + return sb.str(); + } + + static SetupMessage FromString(absl::string_view sv) { + SetupMessage result; + auto parameters = rtc::split(sv, ','); + std::from_chars(parameters[0].data(), + parameters[0].data() + parameters[0].size(), + result.packet_size, 10); + std::from_chars(parameters[1].data(), + parameters[1].data() + parameters[1].size(), + result.transfer_size, 10); + return result; + } +}; + +class DataChannelObserverImpl : public webrtc::DataChannelObserver { + public: + explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc) + : dc_(dc), bytes_received_(0) {} + void OnStateChange() override { + RTC_LOG(LS_INFO) << "State changed to " << dc_->state(); + switch (dc_->state()) { + case webrtc::DataChannelInterface::DataState::kOpen: + open_event_.Set(); + break; + case webrtc::DataChannelInterface::DataState::kClosed: + closed_event_.Set(); + break; + default: + break; + } + } + void OnMessage(const webrtc::DataBuffer& buffer) override { + bytes_received_ += buffer.data.size(); + if (bytes_received_threshold_ && + bytes_received_ >= bytes_received_threshold_) { + bytes_received_event_.Set(); + } + + if (setup_message_.empty() && !buffer.binary) { + setup_message_.assign(buffer.data.cdata<char>(), buffer.data.size()); + setup_message_event_.Set(); + } + } + void OnBufferedAmountChange(uint64_t sent_data_size) override { + if (dc_->buffered_amount() < + webrtc::DataChannelInterface::MaxSendQueueSize() / 2) + low_buffered_threshold_event_.Set(); + else + low_buffered_threshold_event_.Reset(); + } + + bool WaitForOpenState() { + return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen || + open_event_.Wait(rtc::Event::kForever); + } + bool WaitForClosedState() { + return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed || + closed_event_.Wait(rtc::Event::kForever); + } + + // Set how many received bytes are required until + // WaitForBytesReceivedThreshold return true. + void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) { + bytes_received_threshold_ = bytes_received_threshold; + if (bytes_received_ >= bytes_received_threshold_) + bytes_received_event_.Set(); + } + // Wait until the received byte count reaches the desired value. + bool WaitForBytesReceivedThreshold() { + return (bytes_received_threshold_ && + bytes_received_ >= bytes_received_threshold_) || + bytes_received_event_.Wait(rtc::Event::kForever); + } + + bool WaitForLowbufferedThreshold() { + return low_buffered_threshold_event_.Wait(rtc::Event::kForever); + } + std::string SetupMessage() { return setup_message_; } + bool WaitForSetupMessage() { + return setup_message_event_.Wait(rtc::Event::kForever); + } + + private: + webrtc::DataChannelInterface* dc_; + rtc::Event open_event_; + rtc::Event closed_event_; + rtc::Event bytes_received_event_; + absl::optional<uint64_t> bytes_received_threshold_; + uint64_t bytes_received_; + rtc::Event low_buffered_threshold_event_; + std::string setup_message_; + rtc::Event setup_message_event_; +}; + +int RunServer() { + bool oneshot = absl::GetFlag(FLAGS_oneshot); + uint16_t port = absl::GetFlag(FLAGS_port); + + auto signaling_thread = rtc::Thread::Create(); + signaling_thread->Start(); + { + auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( + signaling_thread.get()); + + auto grpc_server = webrtc::GrpcSignalingServerInterface::Create( + [factory = rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>( + factory)](webrtc::SignalingInterface* signaling) { + webrtc::PeerConnectionClient client(factory.get(), signaling); + client.StartPeerConnection(); + auto peer_connection = client.peerConnection(); + + // Set up the data channel + auto dc_or_error = + peer_connection->CreateDataChannelOrError("benchmark", nullptr); + auto data_channel = dc_or_error.MoveValue(); + auto data_channel_observer = + std::make_unique<DataChannelObserverImpl>(data_channel.get()); + data_channel->RegisterObserver(data_channel_observer.get()); + absl::Cleanup unregister_observer( + [data_channel] { data_channel->UnregisterObserver(); }); + + // Wait for a first message from the remote peer. + // It configures how much data should be sent and how big the packets + // should be. + // First message is "packet_size,transfer_size". + data_channel_observer->WaitForSetupMessage(); + auto parameters = + SetupMessage::FromString(data_channel_observer->SetupMessage()); + + // Wait for the sender and receiver peers to stabilize (send all ACKs) + // This makes it easier to isolate the sending part when profiling. + absl::SleepFor(absl::Seconds(1)); + + std::string data(parameters.packet_size, '0'); + size_t remaining_data = parameters.transfer_size; + + auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); + + while (remaining_data) { + if (remaining_data < data.size()) + data.resize(remaining_data); + + rtc::CopyOnWriteBuffer buffer(data); + webrtc::DataBuffer data_buffer(buffer, true); + if (!data_channel->Send(data_buffer)) { + // If the send() call failed, the buffers are full. + // We wait until there's more room. + data_channel_observer->WaitForLowbufferedThreshold(); + continue; + } + remaining_data -= buffer.size(); + fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n", + (parameters.transfer_size - remaining_data), + parameters.transfer_size, + (100 - remaining_data * 100 / parameters.transfer_size)); + } + + // Receiver signals the data channel close event when it has received + // all the data it requested. + data_channel_observer->WaitForClosedState(); + + auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); + auto duration_ms = (end_time - begin_time).ms<size_t>(); + double throughput = (parameters.transfer_size / 1024. / 1024.) / + (duration_ms / 1000.); + printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput); + }, + port, oneshot); + grpc_server->Start(); + + printf("Server listening on port %d\n", grpc_server->SelectedPort()); + grpc_server->Wait(); + } + + signaling_thread->Quit(); + return 0; +} + +int RunClient() { + uint16_t port = absl::GetFlag(FLAGS_port); + std::string server_address = absl::GetFlag(FLAGS_address); + size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024; + size_t packet_size = absl::GetFlag(FLAGS_packet_size); + + auto signaling_thread = rtc::Thread::Create(); + signaling_thread->Start(); + { + auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( + signaling_thread.get()); + auto grpc_client = webrtc::GrpcSignalingClientInterface::Create( + server_address + ":" + std::to_string(port)); + webrtc::PeerConnectionClient client(factory.get(), + grpc_client->signaling_client()); + + // Set up the callback to receive the data channel from the sender. + rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel; + rtc::Event got_data_channel; + client.SetOnDataChannel( + [&data_channel, &got_data_channel]( + rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { + data_channel = channel; + got_data_channel.Set(); + }); + + // Connect to the server. + if (!grpc_client->Start()) { + fprintf(stderr, "Failed to connect to server\n"); + return 1; + } + + // Wait for the data channel to be received + got_data_channel.Wait(rtc::Event::kForever); + + // DataChannel needs an observer to start draining the read queue + DataChannelObserverImpl observer(data_channel.get()); + observer.SetBytesReceivedThreshold(transfer_size); + data_channel->RegisterObserver(&observer); + absl::Cleanup unregister_observer( + [data_channel] { data_channel->UnregisterObserver(); }); + + // Send a configuration string to the server to tell it to send + // 'packet_size' bytes packets and send a total of 'transfer_size' MB. + observer.WaitForOpenState(); + SetupMessage setup_message = { + .packet_size = packet_size, + .transfer_size = transfer_size, + }; + if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) { + fprintf(stderr, "Failed to send parameter string\n"); + return 1; + } + + // Wait until we have received all the data + observer.WaitForBytesReceivedThreshold(); + + // Close the data channel, signaling to the server we have received + // all the requested data. + data_channel->Close(); + } + + signaling_thread->Quit(); + + return 0; +} + +int main(int argc, char** argv) { + rtc::InitializeSSL(); + absl::ParseCommandLine(argc, argv); + + // Make sure that higher severity number means more logs by reversing the + // rtc::LoggingSeverity values. + auto logging_severity = + std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose)); + rtc::LogMessage::LogToDebug( + static_cast<rtc::LoggingSeverity>(logging_severity)); + + bool is_server = absl::GetFlag(FLAGS_server); + std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials); + + webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str()); + + return is_server ? RunServer() : RunClient(); +} diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.cc b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.cc new file mode 100644 index 0000000000..8db717fc71 --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.cc @@ -0,0 +1,267 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "rtc_tools/data_channel_benchmark/grpc_signaling.h" + +#include <grpc/support/log.h> +#include <grpcpp/grpcpp.h> + +#include <string> +#include <utility> + +#include "api/jsep.h" +#include "api/jsep_ice_candidate.h" +#include "rtc_base/thread.h" +#include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h" + +namespace webrtc { +namespace { + +using GrpcSignaling::IceCandidate; +using GrpcSignaling::PeerConnectionSignaling; +using GrpcSignaling::SessionDescription; +using GrpcSignaling::SignalingMessage; + +template <class T> +class SessionData : public webrtc::SignalingInterface { + public: + SessionData() {} + explicit SessionData(T* stream) : stream_(stream) {} + void SetStream(T* stream) { stream_ = stream; } + + void SendIceCandidate(const IceCandidateInterface* candidate) override { + RTC_LOG(LS_INFO) << "SendIceCandidate"; + std::string serialized_candidate; + if (!candidate->ToString(&serialized_candidate)) { + RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate"; + return; + } + + SignalingMessage message; + IceCandidate* proto_candidate = message.mutable_candidate(); + proto_candidate->set_description(serialized_candidate); + proto_candidate->set_mid(candidate->sdp_mid()); + proto_candidate->set_mline_index(candidate->sdp_mline_index()); + + stream_->Write(message); + } + + void SendDescription(const SessionDescriptionInterface* sdp) override { + RTC_LOG(LS_INFO) << "SendDescription"; + + std::string serialized_sdp; + sdp->ToString(&serialized_sdp); + + SignalingMessage message; + if (sdp->GetType() == SdpType::kOffer) + message.mutable_description()->set_type(SessionDescription::OFFER); + else if (sdp->GetType() == SdpType::kAnswer) + message.mutable_description()->set_type(SessionDescription::ANSWER); + message.mutable_description()->set_content(serialized_sdp); + + stream_->Write(message); + } + + void OnRemoteDescription( + std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)> + callback) override { + RTC_LOG(LS_INFO) << "OnRemoteDescription"; + remote_description_callback_ = callback; + } + + void OnIceCandidate( + std::function<void(std::unique_ptr<IceCandidateInterface> candidate)> + callback) override { + RTC_LOG(LS_INFO) << "OnIceCandidate"; + ice_candidate_callback_ = callback; + } + + T* stream_; + + std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)> + ice_candidate_callback_; + std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)> + remote_description_callback_; +}; + +using ServerSessionData = + SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>; +using ClientSessionData = + SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>; + +template <class MessageType, class StreamReader, class SessionData> +void ProcessMessages(StreamReader* stream, SessionData* session) { + MessageType message; + + while (stream->Read(&message)) { + switch (message.Content_case()) { + case SignalingMessage::ContentCase::kCandidate: { + webrtc::SdpParseError error; + auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>( + message.candidate().mid(), message.candidate().mline_index()); + if (!jsep_candidate->Initialize(message.candidate().description(), + &error)) { + RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '" + << message.candidate().description() << "'"; + RTC_LOG(LS_ERROR) + << "Error at line " << error.line << ":" << error.description; + continue; + } + + session->ice_candidate_callback_(std::move(jsep_candidate)); + break; + } + case SignalingMessage::ContentCase::kDescription: { + auto& description = message.description(); + auto content = description.content(); + + auto sdp = webrtc::CreateSessionDescription( + description.type() == SessionDescription::OFFER + ? webrtc::SdpType::kOffer + : webrtc::SdpType::kAnswer, + description.content()); + session->remote_description_callback_(std::move(sdp)); + break; + } + default: + RTC_DCHECK_NOTREACHED(); + } + } +} + +class GrpcNegotiationServer : public GrpcSignalingServerInterface, + public PeerConnectionSignaling::Service { + public: + GrpcNegotiationServer( + std::function<void(webrtc::SignalingInterface*)> callback, + int port, + bool oneshot) + : connect_callback_(std::move(callback)), + requested_port_(port), + oneshot_(oneshot) {} + ~GrpcNegotiationServer() override { + Stop(); + if (server_stop_thread_) + server_stop_thread_->Stop(); + } + + void Start() override { + std::string server_address = "[::]"; + + grpc::ServerBuilder builder; + builder.AddListeningPort( + server_address + ":" + std::to_string(requested_port_), + grpc::InsecureServerCredentials(), &selected_port_); + builder.RegisterService(this); + server_ = builder.BuildAndStart(); + } + + void Wait() override { server_->Wait(); } + + void Stop() override { server_->Shutdown(); } + + int SelectedPort() override { return selected_port_; } + + grpc::Status Connect( + grpc::ServerContext* context, + grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream) + override { + if (oneshot_) { + // Request the termination of the server early so we don't serve another + // client in parallel. + server_stop_thread_ = rtc::Thread::Create(); + server_stop_thread_->Start(); + server_stop_thread_->PostTask([this] { Stop(); }); + } + + ServerSessionData session(stream); + + auto reading_thread = rtc::Thread::Create(); + reading_thread->Start(); + reading_thread->PostTask([&session, &stream] { + ProcessMessages<SignalingMessage>(stream, &session); + }); + + connect_callback_(&session); + + reading_thread->Stop(); + + return grpc::Status::OK; + } + + private: + std::function<void(webrtc::SignalingInterface*)> connect_callback_; + int requested_port_; + int selected_port_; + bool oneshot_; + + std::unique_ptr<grpc::Server> server_; + std::unique_ptr<rtc::Thread> server_stop_thread_; +}; + +class GrpcNegotiationClient : public GrpcSignalingClientInterface { + public: + explicit GrpcNegotiationClient(const std::string& server) { + channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials()); + stub_ = PeerConnectionSignaling::NewStub(channel_); + } + + ~GrpcNegotiationClient() override { + context_.TryCancel(); + if (reading_thread_) + reading_thread_->Stop(); + } + + bool Start() override { + if (!channel_->WaitForConnected( + absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) { + return false; + } + + stream_ = stub_->Connect(&context_); + session_.SetStream(stream_.get()); + + reading_thread_ = rtc::Thread::Create(); + reading_thread_->Start(); + reading_thread_->PostTask([this] { + ProcessMessages<SignalingMessage>(stream_.get(), &session_); + }); + + return true; + } + + webrtc::SignalingInterface* signaling_client() override { return &session_; } + + private: + std::shared_ptr<grpc::Channel> channel_; + std::unique_ptr<PeerConnectionSignaling::Stub> stub_; + std::unique_ptr<rtc::Thread> reading_thread_; + grpc::ClientContext context_; + std::unique_ptr< + ::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>> + stream_; + ClientSessionData session_; +}; +} // namespace + +std::unique_ptr<GrpcSignalingServerInterface> +GrpcSignalingServerInterface::Create( + std::function<void(webrtc::SignalingInterface*)> callback, + int port, + bool oneshot) { + return std::make_unique<GrpcNegotiationServer>(std::move(callback), port, + oneshot); +} + +std::unique_ptr<GrpcSignalingClientInterface> +GrpcSignalingClientInterface::Create(const std::string& server) { + return std::make_unique<GrpcNegotiationClient>(server); +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.h b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.h new file mode 100644 index 0000000000..15799d22b7 --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/grpc_signaling.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_ +#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_ + +#include <memory> +#include <string> + +#include "api/jsep.h" +#include "rtc_tools/data_channel_benchmark/signaling_interface.h" + +namespace webrtc { + +// This class defines a server enabling clients to perform a PeerConnection +// negotiation directly over gRPC. +// When a client connects, a callback is run to handle the request. +class GrpcSignalingServerInterface { + public: + virtual ~GrpcSignalingServerInterface() = default; + + // Start listening for connections. + virtual void Start() = 0; + + // Wait for the gRPC server to terminate. + virtual void Wait() = 0; + + // Stop the gRPC server instance. + virtual void Stop() = 0; + + // The port the server is listening on. + virtual int SelectedPort() = 0; + + // Create a gRPC server listening on |port| that will run |callback| on each + // request. If |oneshot| is true, it will terminate after serving one request. + static std::unique_ptr<GrpcSignalingServerInterface> Create( + std::function<void(webrtc::SignalingInterface*)> callback, + int port, + bool oneshot); +}; + +// This class defines a client that can connect to a server and perform a +// PeerConnection negotiation directly over gRPC. +class GrpcSignalingClientInterface { + public: + virtual ~GrpcSignalingClientInterface() = default; + + // Connect the client to the gRPC server. + virtual bool Start() = 0; + virtual webrtc::SignalingInterface* signaling_client() = 0; + + // Create a client to connnect to a server at |server_address|. + static std::unique_ptr<GrpcSignalingClientInterface> Create( + const std::string& server_address); +}; + +} // namespace webrtc +#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_ diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.cc b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.cc new file mode 100644 index 0000000000..cd02e7118a --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.cc @@ -0,0 +1,300 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "rtc_tools/data_channel_benchmark/peer_connection_client.h" + +#include <memory> +#include <string> +#include <utility> + +#include "api/audio_codecs/builtin_audio_decoder_factory.h" +#include "api/audio_codecs/builtin_audio_encoder_factory.h" +#include "api/create_peerconnection_factory.h" +#include "api/jsep.h" +#include "api/peer_connection_interface.h" +#include "api/rtc_error.h" +#include "api/scoped_refptr.h" +#include "api/set_remote_description_observer_interface.h" +#include "api/video_codecs/builtin_video_decoder_factory.h" +#include "api/video_codecs/builtin_video_encoder_factory.h" +#include "rtc_base/logging.h" +#include "rtc_base/thread.h" + +namespace { + +constexpr char kStunServer[] = "stun:stun.l.google.com:19302"; + +class SetLocalDescriptionObserverAdapter + : public webrtc::SetLocalDescriptionObserverInterface { + public: + using Callback = std::function<void(webrtc::RTCError)>; + static rtc::scoped_refptr<SetLocalDescriptionObserverAdapter> Create( + Callback callback) { + return rtc::scoped_refptr<SetLocalDescriptionObserverAdapter>( + new rtc::RefCountedObject<SetLocalDescriptionObserverAdapter>( + std::move(callback))); + } + + explicit SetLocalDescriptionObserverAdapter(Callback callback) + : callback_(std::move(callback)) {} + ~SetLocalDescriptionObserverAdapter() override = default; + + private: + void OnSetLocalDescriptionComplete(webrtc::RTCError error) override { + callback_(std::move(error)); + } + + Callback callback_; +}; + +class SetRemoteDescriptionObserverAdapter + : public webrtc::SetRemoteDescriptionObserverInterface { + public: + using Callback = std::function<void(webrtc::RTCError)>; + static rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter> Create( + Callback callback) { + return rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter>( + new rtc::RefCountedObject<SetRemoteDescriptionObserverAdapter>( + std::move(callback))); + } + + explicit SetRemoteDescriptionObserverAdapter(Callback callback) + : callback_(std::move(callback)) {} + ~SetRemoteDescriptionObserverAdapter() override = default; + + private: + void OnSetRemoteDescriptionComplete(webrtc::RTCError error) override { + callback_(std::move(error)); + } + + Callback callback_; +}; + +class CreateSessionDescriptionObserverAdapter + : public webrtc::CreateSessionDescriptionObserver { + public: + using Success = std::function<void(webrtc::SessionDescriptionInterface*)>; + using Failure = std::function<void(webrtc::RTCError)>; + + static rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter> Create( + Success success, + Failure failure) { + return rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter>( + new rtc::RefCountedObject<CreateSessionDescriptionObserverAdapter>( + std::move(success), std::move(failure))); + } + + CreateSessionDescriptionObserverAdapter(Success success, Failure failure) + : success_(std::move(success)), failure_(std::move(failure)) {} + ~CreateSessionDescriptionObserverAdapter() override = default; + + private: + void OnSuccess(webrtc::SessionDescriptionInterface* desc) override { + success_(desc); + } + + void OnFailure(webrtc::RTCError error) override { + failure_(std::move(error)); + } + + Success success_; + Failure failure_; +}; + +} // namespace + +namespace webrtc { + +PeerConnectionClient::PeerConnectionClient( + webrtc::PeerConnectionFactoryInterface* factory, + webrtc::SignalingInterface* signaling) + : signaling_(signaling) { + signaling_->OnIceCandidate( + [&](std::unique_ptr<webrtc::IceCandidateInterface> candidate) { + AddIceCandidate(std::move(candidate)); + }); + signaling_->OnRemoteDescription( + [&](std::unique_ptr<webrtc::SessionDescriptionInterface> sdp) { + SetRemoteDescription(std::move(sdp)); + }); + InitializePeerConnection(factory); +} + +PeerConnectionClient::~PeerConnectionClient() { + Disconnect(); +} + +rtc::scoped_refptr<PeerConnectionFactoryInterface> +PeerConnectionClient::CreateDefaultFactory(rtc::Thread* signaling_thread) { + auto factory = webrtc::CreatePeerConnectionFactory( + /*network_thread=*/nullptr, /*worker_thread=*/nullptr, + /*signaling_thread*/ signaling_thread, + /*default_adm=*/nullptr, webrtc::CreateBuiltinAudioEncoderFactory(), + webrtc::CreateBuiltinAudioDecoderFactory(), + webrtc::CreateBuiltinVideoEncoderFactory(), + webrtc::CreateBuiltinVideoDecoderFactory(), + /*audio_mixer=*/nullptr, /*audio_processing=*/nullptr); + + if (!factory) { + RTC_LOG(LS_ERROR) << "Failed to initialize PeerConnectionFactory"; + return nullptr; + } + + return factory; +} + +bool PeerConnectionClient::InitializePeerConnection( + webrtc::PeerConnectionFactoryInterface* factory) { + RTC_CHECK(factory) + << "Must call InitializeFactory before InitializePeerConnection"; + + webrtc::PeerConnectionInterface::RTCConfiguration config; + webrtc::PeerConnectionInterface::IceServer server; + server.urls.push_back(kStunServer); + config.servers.push_back(server); + config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan; + + webrtc::PeerConnectionDependencies dependencies(this); + auto result = + factory->CreatePeerConnectionOrError(config, std::move(dependencies)); + + if (!result.ok()) { + RTC_LOG(LS_ERROR) << "Failed to create PeerConnection: " + << result.error().message(); + DeletePeerConnection(); + return false; + } + peer_connection_ = result.MoveValue(); + RTC_LOG(LS_INFO) << "PeerConnection created successfully"; + return true; +} + +bool PeerConnectionClient::StartPeerConnection() { + RTC_LOG(LS_INFO) << "Creating offer"; + + peer_connection_->SetLocalDescription( + SetLocalDescriptionObserverAdapter::Create([this]( + webrtc::RTCError error) { + if (error.ok()) + signaling_->SendDescription(peer_connection_->local_description()); + })); + + return true; +} + +bool PeerConnectionClient::IsConnected() { + return peer_connection_->peer_connection_state() == + webrtc::PeerConnectionInterface::PeerConnectionState::kConnected; +} + +// Disconnect from the call. +void PeerConnectionClient::Disconnect() { + for (auto& data_channel : data_channels_) { + data_channel->Close(); + data_channel.release(); + } + data_channels_.clear(); + DeletePeerConnection(); +} + +// Delete the WebRTC PeerConnection. +void PeerConnectionClient::DeletePeerConnection() { + RTC_LOG(LS_INFO); + + if (peer_connection_) { + peer_connection_->Close(); + } + peer_connection_.release(); +} + +void PeerConnectionClient::OnIceConnectionChange( + webrtc::PeerConnectionInterface::IceConnectionState new_state) { + if (new_state == webrtc::PeerConnectionInterface::IceConnectionState:: + kIceConnectionCompleted) { + RTC_LOG(LS_INFO) << "State is updating to connected"; + } else if (new_state == webrtc::PeerConnectionInterface::IceConnectionState:: + kIceConnectionDisconnected) { + RTC_LOG(LS_INFO) << "Disconnecting from peer"; + Disconnect(); + } +} + +void PeerConnectionClient::OnIceGatheringChange( + webrtc::PeerConnectionInterface::IceGatheringState new_state) { + if (new_state == webrtc::PeerConnectionInterface::kIceGatheringComplete) { + RTC_LOG(LS_INFO) << "Client is ready to receive remote SDP"; + } +} + +void PeerConnectionClient::OnIceCandidate( + const webrtc::IceCandidateInterface* candidate) { + signaling_->SendIceCandidate(candidate); +} + +void PeerConnectionClient::OnDataChannel( + rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { + RTC_LOG(LS_INFO) << __FUNCTION__ << " remote datachannel created"; + if (on_data_channel_callback_) + on_data_channel_callback_(channel); + data_channels_.push_back(channel); +} + +void PeerConnectionClient::SetOnDataChannel( + std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)> + callback) { + on_data_channel_callback_ = callback; +} + +void PeerConnectionClient::OnNegotiationNeededEvent(uint32_t event_id) { + RTC_LOG(LS_INFO) << "OnNegotiationNeededEvent"; + + peer_connection_->SetLocalDescription( + SetLocalDescriptionObserverAdapter::Create([this]( + webrtc::RTCError error) { + if (error.ok()) + signaling_->SendDescription(peer_connection_->local_description()); + })); +} + +bool PeerConnectionClient::SetRemoteDescription( + std::unique_ptr<webrtc::SessionDescriptionInterface> desc) { + RTC_LOG(LS_INFO) << "SetRemoteDescription"; + auto type = desc->GetType(); + + peer_connection_->SetRemoteDescription( + std::move(desc), + SetRemoteDescriptionObserverAdapter::Create([&](webrtc::RTCError) { + RTC_LOG(LS_INFO) << "SetRemoteDescription done"; + + if (type == webrtc::SdpType::kOffer) { + // Got an offer from the remote, need to set an answer and send it. + peer_connection_->SetLocalDescription( + SetLocalDescriptionObserverAdapter::Create( + [this](webrtc::RTCError error) { + if (error.ok()) + signaling_->SendDescription( + peer_connection_->local_description()); + })); + } + })); + + return true; +} + +void PeerConnectionClient::AddIceCandidate( + std::unique_ptr<webrtc::IceCandidateInterface> candidate) { + RTC_LOG(LS_INFO) << "AddIceCandidate"; + + peer_connection_->AddIceCandidate( + std::move(candidate), [](const webrtc::RTCError& error) { + RTC_LOG(LS_INFO) << "Failed to add candidate: " << error.message(); + }); +} + +} // namespace webrtc diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.h b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.h new file mode 100644 index 0000000000..62b205c2ed --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_client.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_ +#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_ + +#include <stdint.h> + +#include <memory> +#include <string> +#include <vector> + +#include "api/jsep.h" +#include "api/peer_connection_interface.h" +#include "api/rtp_receiver_interface.h" +#include "api/scoped_refptr.h" +#include "api/set_local_description_observer_interface.h" +#include "rtc_base/logging.h" +#include "rtc_base/thread.h" +#include "rtc_tools/data_channel_benchmark/signaling_interface.h" + +namespace webrtc { + +// Handles all the details for creating a PeerConnection and negotiation using a +// SignalingInterface object. +class PeerConnectionClient : public webrtc::PeerConnectionObserver { + public: + explicit PeerConnectionClient(webrtc::PeerConnectionFactoryInterface* factory, + webrtc::SignalingInterface* signaling); + + ~PeerConnectionClient() override; + + PeerConnectionClient(const PeerConnectionClient&) = delete; + PeerConnectionClient& operator=(const PeerConnectionClient&) = delete; + + // Set the local description and send offer using the SignalingInterface, + // initiating the negotiation process. + bool StartPeerConnection(); + + // Whether the peer connection is connected to the remote peer. + bool IsConnected(); + + // Disconnect from the call. + void Disconnect(); + + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peerConnection() { + return peer_connection_; + } + + // Set a callback to run when a DataChannel is created by the remote peer. + void SetOnDataChannel( + std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)> + callback); + + std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>& + dataChannels() { + return data_channels_; + } + + // Creates a default PeerConnectionFactory object. + static rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> + CreateDefaultFactory(rtc::Thread* signaling_thread); + + private: + void AddIceCandidate( + std::unique_ptr<webrtc::IceCandidateInterface> candidate); + bool SetRemoteDescription( + std::unique_ptr<webrtc::SessionDescriptionInterface> desc); + + // Initialize the PeerConnection with a given PeerConnectionFactory. + bool InitializePeerConnection( + webrtc::PeerConnectionFactoryInterface* factory); + void DeletePeerConnection(); + + // PeerConnectionObserver implementation. + void OnSignalingChange( + webrtc::PeerConnectionInterface::SignalingState new_state) override { + RTC_LOG(LS_INFO) << __FUNCTION__ << " new state: " << new_state; + } + void OnDataChannel( + rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override; + void OnNegotiationNeededEvent(uint32_t event_id) override; + void OnIceConnectionChange( + webrtc::PeerConnectionInterface::IceConnectionState new_state) override; + void OnIceGatheringChange( + webrtc::PeerConnectionInterface::IceGatheringState new_state) override; + void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override; + void OnIceConnectionReceivingChange(bool receiving) override { + RTC_LOG(LS_INFO) << __FUNCTION__ << " receiving? " << receiving; + } + + rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_; + std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)> + on_data_channel_callback_; + std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>> data_channels_; + webrtc::SignalingInterface* signaling_; +}; + +} // namespace webrtc + +#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_ diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto new file mode 100644 index 0000000000..9bd0aae912 --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package webrtc.GrpcSignaling; + +service PeerConnectionSignaling { + rpc Connect(stream SignalingMessage) returns (stream SignalingMessage) {} +} + +message SignalingMessage { + oneof Content { + SessionDescription description = 1; + IceCandidate candidate = 2; + } +} + +message SessionDescription { + enum SessionDescriptionType { + OFFER = 0; + ANSWER = 1; + } + SessionDescriptionType type = 1; + string content = 2; +} + +message IceCandidate { + string mid = 1; + int32 mline_index = 2; + string description = 3; +}
\ No newline at end of file diff --git a/third_party/libwebrtc/rtc_tools/data_channel_benchmark/signaling_interface.h b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/signaling_interface.h new file mode 100644 index 0000000000..77c811acb3 --- /dev/null +++ b/third_party/libwebrtc/rtc_tools/data_channel_benchmark/signaling_interface.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_ +#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_ + +#include <memory> + +#include "api/jsep.h" + +namespace webrtc { +class SignalingInterface { + public: + virtual ~SignalingInterface() = default; + + // Send an ICE candidate over the transport. + virtual void SendIceCandidate( + const webrtc::IceCandidateInterface* candidate) = 0; + + // Send a local description over the transport. + virtual void SendDescription( + const webrtc::SessionDescriptionInterface* sdp) = 0; + + // Set a callback when receiving a description from the transport. + virtual void OnRemoteDescription( + std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface> + sdp)> callback) = 0; + + // Set a callback when receiving an ICE candidate from the transport. + virtual void OnIceCandidate( + std::function<void(std::unique_ptr<webrtc::IceCandidateInterface> + candidate)> callback) = 0; +}; +} // namespace webrtc + +#endif // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_ |