summaryrefslogtreecommitdiffstats
path: root/third_party/libwebrtc/test/network/cross_traffic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/libwebrtc/test/network/cross_traffic.cc')
-rw-r--r--third_party/libwebrtc/test/network/cross_traffic.cc322
1 files changed, 322 insertions, 0 deletions
diff --git a/third_party/libwebrtc/test/network/cross_traffic.cc b/third_party/libwebrtc/test/network/cross_traffic.cc
new file mode 100644
index 0000000000..0a817a2d39
--- /dev/null
+++ b/third_party/libwebrtc/test/network/cross_traffic.cc
@@ -0,0 +1,322 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/network/cross_traffic.h"
+
+#include <math.h>
+
+#include <utility>
+
+#include "absl/memory/memory.h"
+#include "absl/types/optional.h"
+#include "cross_traffic.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/numerics/safe_minmax.h"
+
+namespace webrtc {
+namespace test {
+
+RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config,
+ CrossTrafficRoute* traffic_route)
+ : config_(config),
+ traffic_route_(traffic_route),
+ random_(config_.random_seed) {
+ sequence_checker_.Detach();
+}
+RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default;
+
+void RandomWalkCrossTraffic::Process(Timestamp at_time) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ if (last_process_time_.IsMinusInfinity()) {
+ last_process_time_ = at_time;
+ }
+ TimeDelta delta = at_time - last_process_time_;
+ last_process_time_ = at_time;
+
+ if (at_time - last_update_time_ >= config_.update_interval) {
+ intensity_ += random_.Gaussian(config_.bias, config_.variance) *
+ sqrt((at_time - last_update_time_).seconds<double>());
+ intensity_ = rtc::SafeClamp(intensity_, 0.0, 1.0);
+ last_update_time_ = at_time;
+ }
+ pending_size_ += TrafficRate() * delta;
+
+ if (pending_size_ >= config_.min_packet_size &&
+ at_time >= last_send_time_ + config_.min_packet_interval) {
+ traffic_route_->SendPacket(pending_size_.bytes());
+ pending_size_ = DataSize::Zero();
+ last_send_time_ = at_time;
+ }
+}
+
+TimeDelta RandomWalkCrossTraffic::GetProcessInterval() const {
+ return config_.min_packet_interval;
+}
+
+DataRate RandomWalkCrossTraffic::TrafficRate() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ return config_.peak_rate * intensity_;
+}
+
+ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() {
+ return ColumnPrinter::Lambda(
+ "random_walk_cross_traffic_rate",
+ [this](rtc::SimpleStringBuilder& sb) {
+ sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
+ },
+ 32);
+}
+
+PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(
+ PulsedPeaksConfig config,
+ CrossTrafficRoute* traffic_route)
+ : config_(config), traffic_route_(traffic_route) {
+ sequence_checker_.Detach();
+}
+PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default;
+
+void PulsedPeaksCrossTraffic::Process(Timestamp at_time) {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ TimeDelta time_since_toggle = at_time - last_update_time_;
+ if (time_since_toggle.IsInfinite() ||
+ (sending_ && time_since_toggle >= config_.send_duration)) {
+ sending_ = false;
+ last_update_time_ = at_time;
+ } else if (!sending_ && time_since_toggle >= config_.hold_duration) {
+ sending_ = true;
+ last_update_time_ = at_time;
+ // Start sending period.
+ last_send_time_ = at_time;
+ }
+
+ if (sending_) {
+ DataSize pending_size = config_.peak_rate * (at_time - last_send_time_);
+
+ if (pending_size >= config_.min_packet_size &&
+ at_time >= last_send_time_ + config_.min_packet_interval) {
+ traffic_route_->SendPacket(pending_size.bytes());
+ last_send_time_ = at_time;
+ }
+ }
+}
+
+TimeDelta PulsedPeaksCrossTraffic::GetProcessInterval() const {
+ return config_.min_packet_interval;
+}
+
+DataRate PulsedPeaksCrossTraffic::TrafficRate() const {
+ RTC_DCHECK_RUN_ON(&sequence_checker_);
+ return sending_ ? config_.peak_rate : DataRate::Zero();
+}
+
+ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
+ return ColumnPrinter::Lambda(
+ "pulsed_peaks_cross_traffic_rate",
+ [this](rtc::SimpleStringBuilder& sb) {
+ sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
+ },
+ 32);
+}
+
+TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
+ TaskQueueBase* task_queue,
+ EmulatedRoute* send_route,
+ EmulatedRoute* ret_route)
+ : clock_(clock),
+ task_queue_(task_queue),
+ request_route_(send_route,
+ [this](TcpPacket packet, Timestamp) {
+ OnRequest(std::move(packet));
+ }),
+ response_route_(ret_route,
+ [this](TcpPacket packet, Timestamp arrival_time) {
+ OnResponse(std::move(packet), arrival_time);
+ }) {}
+
+void TcpMessageRouteImpl::SendMessage(size_t size,
+ std::function<void()> on_received) {
+ task_queue_->PostTask(
+ [this, size, handler = std::move(on_received)] {
+ // If we are currently sending a message we won't reset the connection,
+ // we'll act as if the messages are sent in the same TCP stream. This is
+ // intended to simulate recreation of a TCP session for each message
+ // in the typical case while avoiding the complexity overhead of
+ // maintaining multiple virtual TCP sessions in parallel.
+ if (pending_.empty() && in_flight_.empty()) {
+ cwnd_ = 10;
+ ssthresh_ = INFINITY;
+ }
+ int64_t data_left = static_cast<int64_t>(size);
+ int64_t kMaxPacketSize = 1200;
+ int64_t kMinPacketSize = 4;
+ Message message{std::move(handler)};
+ while (data_left > 0) {
+ int64_t packet_size = std::min(data_left, kMaxPacketSize);
+ int fragment_id = next_fragment_id_++;
+ pending_.push_back(MessageFragment{
+ fragment_id,
+ static_cast<size_t>(std::max(kMinPacketSize, packet_size))});
+ message.pending_fragment_ids.insert(fragment_id);
+ data_left -= packet_size;
+ }
+ messages_.emplace_back(message);
+ SendPackets(clock_->CurrentTime());
+ });
+}
+
+void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
+ for (auto it = messages_.begin(); it != messages_.end(); ++it) {
+ if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
+ it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
+ if (it->pending_fragment_ids.empty()) {
+ it->handler();
+ messages_.erase(it);
+ }
+ break;
+ }
+ }
+ const size_t kAckPacketSize = 20;
+ response_route_.SendPacket(kAckPacketSize, packet_info);
+}
+
+void TcpMessageRouteImpl::OnResponse(TcpPacket packet_info, Timestamp at_time) {
+ auto it = in_flight_.find(packet_info.sequence_number);
+ if (it != in_flight_.end()) {
+ last_rtt_ = at_time - packet_info.send_time;
+ in_flight_.erase(it);
+ }
+ auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
+ for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
+ lost_it = in_flight_.erase(lost_it)) {
+ pending_.push_front(lost_it->second.fragment);
+ }
+
+ if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
+ HandleLoss(at_time);
+ } else if (cwnd_ <= ssthresh_) {
+ cwnd_ += 1;
+ } else {
+ cwnd_ += 1.0f / cwnd_;
+ }
+ last_acked_seq_num_ =
+ std::max(packet_info.sequence_number, last_acked_seq_num_);
+ SendPackets(at_time);
+}
+
+void TcpMessageRouteImpl::HandleLoss(Timestamp at_time) {
+ if (at_time - last_reduction_time_ < last_rtt_)
+ return;
+ last_reduction_time_ = at_time;
+ ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
+ cwnd_ = ssthresh_;
+}
+
+void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
+ const TimeDelta kPacketTimeout = TimeDelta::Seconds(1);
+ int cwnd = std::ceil(cwnd_);
+ int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
+ while (packets_to_send-- > 0 && !pending_.empty()) {
+ auto seq_num = next_sequence_number_++;
+ TcpPacket send;
+ send.sequence_number = seq_num;
+ send.send_time = at_time;
+ send.fragment = pending_.front();
+ pending_.pop_front();
+ request_route_.SendPacket(send.fragment.size, send);
+ in_flight_.insert({seq_num, send});
+ task_queue_->PostDelayedTask(
+ [this, seq_num] {
+ HandlePacketTimeout(seq_num, clock_->CurrentTime());
+ },
+ kPacketTimeout);
+ }
+}
+
+void TcpMessageRouteImpl::HandlePacketTimeout(int seq_num, Timestamp at_time) {
+ auto lost = in_flight_.find(seq_num);
+ if (lost != in_flight_.end()) {
+ pending_.push_front(lost->second.fragment);
+ in_flight_.erase(lost);
+ HandleLoss(at_time);
+ SendPackets(at_time);
+ }
+}
+
+FakeTcpCrossTraffic::FakeTcpCrossTraffic(FakeTcpConfig config,
+ EmulatedRoute* send_route,
+ EmulatedRoute* ret_route)
+ : conf_(config), route_(this, send_route, ret_route) {}
+
+TimeDelta FakeTcpCrossTraffic::GetProcessInterval() const {
+ return conf_.process_interval;
+}
+
+void FakeTcpCrossTraffic::Process(Timestamp at_time) {
+ SendPackets(at_time);
+}
+
+void FakeTcpCrossTraffic::OnRequest(int sequence_number, Timestamp at_time) {
+ const size_t kAckPacketSize = 20;
+ route_.SendResponse(kAckPacketSize, sequence_number);
+}
+
+void FakeTcpCrossTraffic::OnResponse(int sequence_number, Timestamp at_time) {
+ ack_received_ = true;
+ auto it = in_flight_.find(sequence_number);
+ if (it != in_flight_.end()) {
+ last_rtt_ = at_time - in_flight_.at(sequence_number);
+ in_flight_.erase(sequence_number);
+ }
+ if (sequence_number - last_acked_seq_num_ > 1) {
+ HandleLoss(at_time);
+ } else if (cwnd_ <= ssthresh_) {
+ cwnd_ += 1;
+ } else {
+ cwnd_ += 1.0f / cwnd_;
+ }
+ last_acked_seq_num_ = std::max(sequence_number, last_acked_seq_num_);
+ SendPackets(at_time);
+}
+
+void FakeTcpCrossTraffic::HandleLoss(Timestamp at_time) {
+ if (at_time - last_reduction_time_ < last_rtt_)
+ return;
+ last_reduction_time_ = at_time;
+ ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
+ cwnd_ = ssthresh_;
+}
+
+void FakeTcpCrossTraffic::SendPackets(Timestamp at_time) {
+ int cwnd = std::ceil(cwnd_);
+ int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
+ bool timeouts = false;
+ for (auto it = in_flight_.begin(); it != in_flight_.end();) {
+ if (it->second < at_time - conf_.packet_timeout) {
+ it = in_flight_.erase(it);
+ timeouts = true;
+ } else {
+ ++it;
+ }
+ }
+ if (timeouts)
+ HandleLoss(at_time);
+ for (int i = 0; i < packets_to_send; ++i) {
+ if ((total_sent_ + conf_.packet_size) > conf_.send_limit) {
+ break;
+ }
+ in_flight_.insert({next_sequence_number_, at_time});
+ route_.SendRequest(conf_.packet_size.bytes<size_t>(),
+ next_sequence_number_++);
+ total_sent_ += conf_.packet_size;
+ }
+}
+
+} // namespace test
+} // namespace webrtc