/* * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "rtc_tools/data_channel_benchmark/grpc_signaling.h" #include #include #include #include #include "api/jsep.h" #include "api/jsep_ice_candidate.h" #include "rtc_base/thread.h" #include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h" namespace webrtc { namespace { using GrpcSignaling::IceCandidate; using GrpcSignaling::PeerConnectionSignaling; using GrpcSignaling::SessionDescription; using GrpcSignaling::SignalingMessage; template class SessionData : public webrtc::SignalingInterface { public: SessionData() {} explicit SessionData(T* stream) : stream_(stream) {} void SetStream(T* stream) { stream_ = stream; } void SendIceCandidate(const IceCandidateInterface* candidate) override { RTC_LOG(LS_INFO) << "SendIceCandidate"; std::string serialized_candidate; if (!candidate->ToString(&serialized_candidate)) { RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate"; return; } SignalingMessage message; IceCandidate* proto_candidate = message.mutable_candidate(); proto_candidate->set_description(serialized_candidate); proto_candidate->set_mid(candidate->sdp_mid()); proto_candidate->set_mline_index(candidate->sdp_mline_index()); stream_->Write(message); } void SendDescription(const SessionDescriptionInterface* sdp) override { RTC_LOG(LS_INFO) << "SendDescription"; std::string serialized_sdp; sdp->ToString(&serialized_sdp); SignalingMessage message; if (sdp->GetType() == SdpType::kOffer) message.mutable_description()->set_type(SessionDescription::OFFER); else if (sdp->GetType() == SdpType::kAnswer) message.mutable_description()->set_type(SessionDescription::ANSWER); message.mutable_description()->set_content(serialized_sdp); stream_->Write(message); } void OnRemoteDescription( std::function sdp)> callback) override { RTC_LOG(LS_INFO) << "OnRemoteDescription"; remote_description_callback_ = callback; } void OnIceCandidate( std::function candidate)> callback) override { RTC_LOG(LS_INFO) << "OnIceCandidate"; ice_candidate_callback_ = callback; } T* stream_; std::function)> ice_candidate_callback_; std::function)> remote_description_callback_; }; using ServerSessionData = SessionData>; using ClientSessionData = SessionData>; template void ProcessMessages(StreamReader* stream, SessionData* session) { MessageType message; while (stream->Read(&message)) { switch (message.Content_case()) { case SignalingMessage::ContentCase::kCandidate: { webrtc::SdpParseError error; auto jsep_candidate = std::make_unique( message.candidate().mid(), message.candidate().mline_index()); if (!jsep_candidate->Initialize(message.candidate().description(), &error)) { RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '" << message.candidate().description() << "'"; RTC_LOG(LS_ERROR) << "Error at line " << error.line << ":" << error.description; continue; } session->ice_candidate_callback_(std::move(jsep_candidate)); break; } case SignalingMessage::ContentCase::kDescription: { auto& description = message.description(); auto content = description.content(); auto sdp = webrtc::CreateSessionDescription( description.type() == SessionDescription::OFFER ? webrtc::SdpType::kOffer : webrtc::SdpType::kAnswer, description.content()); session->remote_description_callback_(std::move(sdp)); break; } default: RTC_DCHECK_NOTREACHED(); } } } class GrpcNegotiationServer : public GrpcSignalingServerInterface, public PeerConnectionSignaling::Service { public: GrpcNegotiationServer( std::function callback, int port, bool oneshot) : connect_callback_(std::move(callback)), requested_port_(port), oneshot_(oneshot) {} ~GrpcNegotiationServer() override { Stop(); if (server_stop_thread_) server_stop_thread_->Stop(); } void Start() override { std::string server_address = "[::]"; grpc::ServerBuilder builder; builder.AddListeningPort( server_address + ":" + std::to_string(requested_port_), grpc::InsecureServerCredentials(), &selected_port_); builder.RegisterService(this); server_ = builder.BuildAndStart(); } void Wait() override { server_->Wait(); } void Stop() override { server_->Shutdown(); } int SelectedPort() override { return selected_port_; } grpc::Status Connect( grpc::ServerContext* context, grpc::ServerReaderWriter* stream) override { if (oneshot_) { // Request the termination of the server early so we don't serve another // client in parallel. server_stop_thread_ = rtc::Thread::Create(); server_stop_thread_->Start(); server_stop_thread_->PostTask([this] { Stop(); }); } ServerSessionData session(stream); auto reading_thread = rtc::Thread::Create(); reading_thread->Start(); reading_thread->PostTask([&session, &stream] { ProcessMessages(stream, &session); }); connect_callback_(&session); reading_thread->Stop(); return grpc::Status::OK; } private: std::function connect_callback_; int requested_port_; int selected_port_; bool oneshot_; std::unique_ptr server_; std::unique_ptr server_stop_thread_; }; class GrpcNegotiationClient : public GrpcSignalingClientInterface { public: explicit GrpcNegotiationClient(const std::string& server) { channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials()); stub_ = PeerConnectionSignaling::NewStub(channel_); } ~GrpcNegotiationClient() override { context_.TryCancel(); if (reading_thread_) reading_thread_->Stop(); } bool Start() override { if (!channel_->WaitForConnected( absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) { return false; } stream_ = stub_->Connect(&context_); session_.SetStream(stream_.get()); reading_thread_ = rtc::Thread::Create(); reading_thread_->Start(); reading_thread_->PostTask([this] { ProcessMessages(stream_.get(), &session_); }); return true; } webrtc::SignalingInterface* signaling_client() override { return &session_; } private: std::shared_ptr channel_; std::unique_ptr stub_; std::unique_ptr reading_thread_; grpc::ClientContext context_; std::unique_ptr< ::grpc::ClientReaderWriter> stream_; ClientSessionData session_; }; } // namespace std::unique_ptr GrpcSignalingServerInterface::Create( std::function callback, int port, bool oneshot) { return std::make_unique(std::move(callback), port, oneshot); } std::unique_ptr GrpcSignalingClientInterface::Create(const std::string& server) { return std::make_unique(server); } } // namespace webrtc