/* * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. * * Data Channel Benchmarking tool. * * Create a server using: ./data_channel_benchmark --server --port 12345 * Start the flow of data from the server to a client using: * ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196 * The throughput is reported on the server console. * * The negotiation does not require a 3rd party server and is done over a gRPC * transport. No TURN server is configured, so both peers need to be reachable * using STUN only. */ #include #include #include "absl/cleanup/cleanup.h" #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "rtc_base/event.h" #include "rtc_base/ssl_adapter.h" #include "rtc_base/thread.h" #include "rtc_tools/data_channel_benchmark/grpc_signaling.h" #include "rtc_tools/data_channel_benchmark/peer_connection_client.h" #include "system_wrappers/include/field_trial.h" ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)"); ABSL_FLAG(bool, server, false, "Server mode"); ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client"); ABSL_FLAG(std::string, address, "localhost", "Connect to server address"); ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)"); ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)"); ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size"); ABSL_FLAG(std::string, force_fieldtrials, "", "Field trials control experimental feature code which can be forced. " "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/" " will assign the group Enable to field trial WebRTC-FooFeature."); struct SetupMessage { size_t packet_size; size_t transfer_size; std::string ToString() { char buffer[64]; rtc::SimpleStringBuilder sb(buffer); sb << packet_size << "," << transfer_size; return sb.str(); } static SetupMessage FromString(absl::string_view sv) { SetupMessage result; auto parameters = rtc::split(sv, ','); std::from_chars(parameters[0].data(), parameters[0].data() + parameters[0].size(), result.packet_size, 10); std::from_chars(parameters[1].data(), parameters[1].data() + parameters[1].size(), result.transfer_size, 10); return result; } }; class DataChannelServerObserverImpl : public webrtc::DataChannelObserver { public: explicit DataChannelServerObserverImpl(webrtc::DataChannelInterface* dc, rtc::Thread* signaling_thread) : dc_(dc), signaling_thread_(signaling_thread) {} void OnStateChange() override { RTC_LOG(LS_INFO) << "Server state changed to " << dc_->state(); switch (dc_->state()) { case webrtc::DataChannelInterface::DataState::kOpen: break; case webrtc::DataChannelInterface::DataState::kClosed: closed_event_.Set(); break; default: break; } } void OnMessage(const webrtc::DataBuffer& buffer) override { if (!buffer.binary) { std::string setup_message(buffer.data.cdata(), buffer.data.size()); setup_ = SetupMessage::FromString(setup_message); remaining_data_ = setup_.transfer_size; setup_message_event_.Set(); } } void OnBufferedAmountChange(uint64_t sent_data_size) override { remaining_data_ -= sent_data_size; // Allow the transport buffer to be drained before starting again. if (buffer_ && dc_->buffered_amount() <= ok_to_resume_sending_threshold_) { total_queued_up_ += buffer_->size(); dc_->SendAsync(*buffer_, [this, buffer = buffer_](webrtc::RTCError err) { OnSendAsyncComplete(err, buffer); }); buffer_ = nullptr; } } bool IsOkToCallOnTheNetworkThread() override { return true; } bool WaitForClosedState() { return closed_event_.Wait(rtc::Event::kForever); } bool WaitForSetupMessage() { return setup_message_event_.Wait(rtc::Event::kForever); } void StartSending() { RTC_CHECK(remaining_data_) << "Error: no data to send"; std::string data(std::min(setup_.packet_size, remaining_data_), '0'); webrtc::DataBuffer* data_buffer = new webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data), true); total_queued_up_ = data_buffer->size(); dc_->SendAsync(*data_buffer, [this, data_buffer = data_buffer](webrtc::RTCError err) { OnSendAsyncComplete(err, data_buffer); }); } const struct SetupMessage& parameters() const { return setup_; } private: void OnSendAsyncComplete(webrtc::RTCError error, webrtc::DataBuffer* buffer) { total_queued_up_ -= buffer->size(); if (!error.ok()) { RTC_CHECK_EQ(error.type(), webrtc::RTCErrorType::RESOURCE_EXHAUSTED); RTC_CHECK(!buffer_); // Buffer saturated. Retry when OnBufferedAmountChange() detects we can. buffer_ = buffer; return; } signaling_thread_->PostTask([this, buffer = buffer, remaining_data = remaining_data_]() { fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n", (setup_.transfer_size - remaining_data), setup_.transfer_size, (100 - remaining_data * 100 / setup_.transfer_size)); if (!remaining_data) { RTC_CHECK(!total_queued_up_); // We're done. delete buffer; return; } if (remaining_data < buffer->data.size()) { buffer->data.SetSize(remaining_data); } total_queued_up_ += buffer->size(); dc_->SendAsync(*buffer, [this, buffer = buffer](webrtc::RTCError err) { OnSendAsyncComplete(err, buffer); }); }); } webrtc::DataChannelInterface* const dc_; rtc::Thread* const signaling_thread_; rtc::Event closed_event_; rtc::Event setup_message_event_; size_t remaining_data_ = 0u; size_t total_queued_up_ = 0u; struct SetupMessage setup_; webrtc::DataBuffer* buffer_ = nullptr; const uint64_t ok_to_resume_sending_threshold_ = webrtc::DataChannelInterface::MaxSendQueueSize() / 2; }; class DataChannelClientObserverImpl : public webrtc::DataChannelObserver { public: explicit DataChannelClientObserverImpl(webrtc::DataChannelInterface* dc, uint64_t bytes_received_threshold) : dc_(dc), bytes_received_threshold_(bytes_received_threshold) {} void OnStateChange() override { RTC_LOG(LS_INFO) << "Client state changed to " << dc_->state(); switch (dc_->state()) { case webrtc::DataChannelInterface::DataState::kOpen: open_event_.Set(); break; default: break; } } void OnMessage(const webrtc::DataBuffer& buffer) override { bytes_received_ += buffer.data.size(); if (bytes_received_ >= bytes_received_threshold_) { bytes_received_event_.Set(); } } void OnBufferedAmountChange(uint64_t sent_data_size) override {} bool IsOkToCallOnTheNetworkThread() override { return true; } bool WaitForOpenState() { return open_event_.Wait(rtc::Event::kForever); } // Wait until the received byte count reaches the desired value. bool WaitForBytesReceivedThreshold() { return bytes_received_event_.Wait(rtc::Event::kForever); } private: webrtc::DataChannelInterface* const dc_; rtc::Event open_event_; rtc::Event bytes_received_event_; const uint64_t bytes_received_threshold_; uint64_t bytes_received_ = 0u; }; int RunServer() { bool oneshot = absl::GetFlag(FLAGS_oneshot); uint16_t port = absl::GetFlag(FLAGS_port); auto signaling_thread = rtc::Thread::Create(); signaling_thread->Start(); { auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( signaling_thread.get()); auto grpc_server = webrtc::GrpcSignalingServerInterface::Create( [factory = rtc::scoped_refptr( factory), signaling_thread = signaling_thread.get()](webrtc::SignalingInterface* signaling) { webrtc::PeerConnectionClient client(factory.get(), signaling); client.StartPeerConnection(); auto peer_connection = client.peerConnection(); // Set up the data channel auto dc_or_error = peer_connection->CreateDataChannelOrError("benchmark", nullptr); RTC_CHECK(dc_or_error.ok()); auto data_channel = dc_or_error.MoveValue(); auto data_channel_observer = std::make_unique( data_channel.get(), signaling_thread); data_channel->RegisterObserver(data_channel_observer.get()); absl::Cleanup unregister_observer( [data_channel] { data_channel->UnregisterObserver(); }); // Wait for a first message from the remote peer. // It configures how much data should be sent and how big the packets // should be. // First message is "packet_size,transfer_size". data_channel_observer->WaitForSetupMessage(); // Wait for the sender and receiver peers to stabilize (send all ACKs) // This makes it easier to isolate the sending part when profiling. absl::SleepFor(absl::Seconds(1)); auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); data_channel_observer->StartSending(); // Receiver signals the data channel close event when it has received // all the data it requested. data_channel_observer->WaitForClosedState(); auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); auto duration_ms = (end_time - begin_time).ms(); double throughput = (data_channel_observer->parameters().transfer_size / 1024. / 1024.) / (duration_ms / 1000.); printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput); }, port, oneshot); grpc_server->Start(); printf("Server listening on port %d\n", grpc_server->SelectedPort()); grpc_server->Wait(); } signaling_thread->Stop(); return 0; } int RunClient() { uint16_t port = absl::GetFlag(FLAGS_port); std::string server_address = absl::GetFlag(FLAGS_address); size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024; size_t packet_size = absl::GetFlag(FLAGS_packet_size); auto signaling_thread = rtc::Thread::Create(); signaling_thread->Start(); { auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( signaling_thread.get()); auto grpc_client = webrtc::GrpcSignalingClientInterface::Create( server_address + ":" + std::to_string(port)); webrtc::PeerConnectionClient client(factory.get(), grpc_client->signaling_client()); std::unique_ptr observer; // Set up the callback to receive the data channel from the sender. rtc::scoped_refptr data_channel; rtc::Event got_data_channel; client.SetOnDataChannel( [&](rtc::scoped_refptr channel) { data_channel = std::move(channel); // DataChannel needs an observer to drain the read queue. observer = std::make_unique( data_channel.get(), transfer_size); data_channel->RegisterObserver(observer.get()); got_data_channel.Set(); }); // Connect to the server. if (!grpc_client->Start()) { fprintf(stderr, "Failed to connect to server\n"); return 1; } // Wait for the data channel to be received got_data_channel.Wait(rtc::Event::kForever); absl::Cleanup unregister_observer( [data_channel] { data_channel->UnregisterObserver(); }); // Send a configuration string to the server to tell it to send // 'packet_size' bytes packets and send a total of 'transfer_size' MB. observer->WaitForOpenState(); SetupMessage setup_message = { .packet_size = packet_size, .transfer_size = transfer_size, }; if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) { fprintf(stderr, "Failed to send parameter string\n"); return 1; } // Wait until we have received all the data observer->WaitForBytesReceivedThreshold(); // Close the data channel, signaling to the server we have received // all the requested data. data_channel->Close(); } signaling_thread->Stop(); return 0; } int main(int argc, char** argv) { rtc::InitializeSSL(); absl::ParseCommandLine(argc, argv); // Make sure that higher severity number means more logs by reversing the // rtc::LoggingSeverity values. auto logging_severity = std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose)); rtc::LogMessage::LogToDebug( static_cast(logging_severity)); bool is_server = absl::GetFlag(FLAGS_server); std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials); webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str()); return is_server ? RunServer() : RunClient(); }