/* * Copyright 2017 The WebRTC project authors. All Rights Reserved. * * Use of this source code is governed by a BSD-style license * that can be found in the LICENSE file in the root of the source * tree. An additional intellectual property rights grant can be found * in the file PATENTS. All contributing project authors may * be found in the AUTHORS file in the root of the source tree. */ #include "rtc_tools/network_tester/packet_sender.h" #include #include #include #include #include "absl/functional/any_invocable.h" #include "api/task_queue/pending_task_safety_flag.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/time_utils.h" #include "rtc_tools/network_tester/config_reader.h" #include "rtc_tools/network_tester/test_controller.h" namespace webrtc { namespace { absl::AnyInvocable SendPacketTask( PacketSender* packet_sender, rtc::scoped_refptr task_safety_flag, int64_t target_time_ms = rtc::TimeMillis()) { return [target_time_ms, packet_sender, task_safety_flag = std::move(task_safety_flag)]() mutable { if (task_safety_flag->alive() && packet_sender->IsSending()) { packet_sender->SendPacket(); target_time_ms += packet_sender->GetSendIntervalMs(); int64_t delay_ms = std::max(static_cast(0), target_time_ms - rtc::TimeMillis()); TaskQueueBase::Current()->PostDelayedTask( SendPacketTask(packet_sender, std::move(task_safety_flag), target_time_ms), TimeDelta::Millis(delay_ms)); } }; } absl::AnyInvocable UpdateTestSettingTask( PacketSender* packet_sender, std::unique_ptr config_reader, rtc::scoped_refptr task_safety_flag) { return [packet_sender, config_reader = std::move(config_reader), task_safety_flag = std::move(task_safety_flag)]() mutable { if (!task_safety_flag->alive()) { return; } if (absl::optional config = config_reader->GetNextConfig()) { packet_sender->UpdateTestSetting(config->packet_size, config->packet_send_interval_ms); TaskQueueBase::Current()->PostDelayedTask( UpdateTestSettingTask(packet_sender, std::move(config_reader), std::move(task_safety_flag)), TimeDelta::Millis(config->execution_time_ms)); } else { packet_sender->StopSending(); } }; } } // namespace PacketSender::PacketSender( TestController* test_controller, webrtc::TaskQueueBase* worker_queue, rtc::scoped_refptr task_safety_flag, const std::string& config_file_path) : packet_size_(0), send_interval_ms_(0), sequence_number_(0), sending_(false), config_file_path_(config_file_path), test_controller_(test_controller), worker_queue_(worker_queue), task_safety_flag_(task_safety_flag) {} PacketSender::~PacketSender() = default; void PacketSender::StartSending() { worker_queue_checker_.Detach(); worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = true; })); worker_queue_->PostTask(UpdateTestSettingTask( this, std::make_unique(config_file_path_), task_safety_flag_)); worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_)); } void PacketSender::StopSending() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); sending_ = false; test_controller_->OnTestDone(); } bool PacketSender::IsSending() const { RTC_DCHECK_RUN_ON(&worker_queue_checker_); return sending_; } void PacketSender::SendPacket() { RTC_DCHECK_RUN_ON(&worker_queue_checker_); NetworkTesterPacket packet; packet.set_type(NetworkTesterPacket::TEST_DATA); packet.set_sequence_number(sequence_number_++); packet.set_send_timestamp(rtc::TimeMicros()); test_controller_->SendData(packet, packet_size_); } int64_t PacketSender::GetSendIntervalMs() const { RTC_DCHECK_RUN_ON(&worker_queue_checker_); return send_interval_ms_; } void PacketSender::UpdateTestSetting(size_t packet_size, int64_t send_interval_ms) { RTC_DCHECK_RUN_ON(&worker_queue_checker_); send_interval_ms_ = send_interval_ms; packet_size_ = packet_size; } } // namespace webrtc