summaryrefslogtreecommitdiffstats
path: root/src/dmclock/test
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/dmclock/test
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/dmclock/test')
-rw-r--r--src/dmclock/test/CMakeLists.txt34
-rw-r--r--src/dmclock/test/dmcPrCtl.h56
-rw-r--r--src/dmclock/test/test_dmclock_client.cc307
-rw-r--r--src/dmclock/test/test_dmclock_server.cc1140
-rw-r--r--src/dmclock/test/test_test_client.cc138
5 files changed, 1675 insertions, 0 deletions
diff --git a/src/dmclock/test/CMakeLists.txt b/src/dmclock/test/CMakeLists.txt
new file mode 100644
index 00000000..52ab6bfe
--- /dev/null
+++ b/src/dmclock/test/CMakeLists.txt
@@ -0,0 +1,34 @@
+include(CheckIncludeFileCXX)
+check_include_file_cxx("sys/prctl.h" HAVE_SYS_PRCTL_H)
+
+set(support_srcs ../sim/src/test_dmclock.cc)
+set(test_srcs
+ test_test_client.cc
+ test_dmclock_server.cc
+ test_dmclock_client.cc
+ )
+
+set_source_files_properties(${core_srcs} ${test_srcs}
+ PROPERTIES
+ COMPILE_FLAGS "${local_flags}"
+ )
+
+add_executable(dmclock-tests ${test_srcs} ${support_srcs})
+if(HAVE_SYS_PRCTL_H)
+ target_compile_definitions(dmclock-tests PRIVATE "HAVE_SYS_PRCTL_H")
+endif()
+target_include_directories(dmclock-tests PRIVATE
+ ../sim/src ${CMAKE_CURRENT_BINARY_DIR})
+target_include_directories(dmclock-tests PRIVATE SYSTEM
+ "${GTEST_INCLUDE_DIRS}")
+
+target_link_libraries(dmclock-tests LINK_PRIVATE
+ dmclock
+ Threads::Threads
+ GTest::GTest
+ GTest::Main)
+
+add_test(NAME dmclock-tests
+ COMMAND $<TARGET_FILE:dmclock-tests>)
+add_test(NAME dmclock-data-struct-tests
+ COMMAND $<TARGET_FILE:dmclock-data-struct-tests>)
diff --git a/src/dmclock/test/dmcPrCtl.h b/src/dmclock/test/dmcPrCtl.h
new file mode 100644
index 00000000..e0b2216f
--- /dev/null
+++ b/src/dmclock/test/dmcPrCtl.h
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// essentially the same as ceph's PrCtl.h, copied into the dmclock library
+
+#ifdef HAVE_SYS_PRCTL_H
+#include <iostream>
+#include <sys/prctl.h>
+#include <errno.h>
+
+struct PrCtl {
+ int saved_state = -1;
+ int set_dumpable(int new_state) {
+ int r = prctl(PR_SET_DUMPABLE, new_state);
+ if (r) {
+ r = -errno;
+ std::cerr << "warning: unable to " << (new_state ? "set" : "unset")
+ << " dumpable flag: " << strerror(r)
+ << std::endl;
+ }
+ return r;
+ }
+ PrCtl(int new_state = 0) {
+ int r = prctl(PR_GET_DUMPABLE);
+ if (r == -1) {
+ r = errno;
+ std::cerr << "warning: unable to get dumpable flag: " << strerror(r)
+ << std::endl;
+ } else if (r != new_state) {
+ if (!set_dumpable(new_state)) {
+ saved_state = r;
+ }
+ }
+ }
+ ~PrCtl() {
+ if (saved_state < 0) {
+ return;
+ }
+ set_dumpable(saved_state);
+ }
+};
+#else
+struct PrCtl {};
+#endif
diff --git a/src/dmclock/test/test_dmclock_client.cc b/src/dmclock/test/test_dmclock_client.cc
new file mode 100644
index 00000000..58840457
--- /dev/null
+++ b/src/dmclock/test/test_dmclock_client.cc
@@ -0,0 +1,307 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include <chrono>
+#include <mutex>
+#include <functional>
+#include <iostream>
+
+
+#include "dmclock_client.h"
+#include "dmclock_util.h"
+#include "gtest/gtest.h"
+
+
+namespace dmc = crimson::dmclock;
+
+
+namespace crimson {
+ namespace dmclock {
+
+ /*
+ * Allows us to test the code provided with the mutex provided locked.
+ */
+ static void test_locked(std::mutex& mtx, std::function<void()> code) {
+ std::lock_guard<std::mutex> l(mtx);
+ code();
+ }
+
+
+ TEST(dmclock_client, server_erase) {
+ using ServerId = int;
+ // using ClientId = int;
+
+ ServerId server = 101;
+ // ClientId client = 3;
+
+ // dmc::PhaseType resp_params = dmc::PhaseType::reservation;
+
+ dmc::ServiceTracker<ServerId> st(std::chrono::seconds(2),
+ std::chrono::seconds(3));
+
+ auto lock_st = [&](std::function<void()> code) {
+ test_locked(st.data_mtx, code);
+ };
+
+ /* The timeline should be as follows:
+ *
+ * 0 seconds : request created
+ *
+ * 1 seconds : map is size 1
+ *
+ * 2 seconds : clean notes first mark; +2 is base for further calcs
+ *
+ * 4 seconds : clean does nothing except makes another mark
+ *
+ * 5 seconds : when we're secheduled to erase (+2 + 3)
+ *
+ * 5 seconds : since the clean job hasn't run yet, map still size 1
+ *
+ * 6 seconds : clean erases server
+ *
+ * 7 seconds : verified server is gone (map size 0)
+ */
+
+ lock_st([&] () {
+ EXPECT_EQ(0u, st.server_map.size()) <<
+ "server map initially has size 0";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // call for side effects
+ (void) st.get_req_params(server);
+
+ lock_st([&] () {
+ EXPECT_EQ(1u, st.server_map.size()) <<
+ "server map has size 1 after first request";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(4));
+
+ lock_st([&] () {
+ EXPECT_EQ(1u, st.server_map.size()) <<
+ "server map has size 1 just before erase";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ lock_st([&] () {
+ EXPECT_EQ(0u, st.server_map.size()) <<
+ "server map has size 0 just after erase";
+ });
+ } // TEST
+
+
+ TEST(dmclock_client, delta_rho_values_borrowing_tracker) {
+ using ServerId = int;
+ // using ClientId = int;
+
+ ServerId server1 = 101;
+ ServerId server2 = 7;
+ // ClientId client = 3;
+
+ // RespParams<ServerId> resp_params(server, dmc::PhaseType::reservation);
+
+ dmc::ServiceTracker<ServerId,dmc::BorrowingTracker> st(std::chrono::seconds(2),
+ std::chrono::seconds(3));
+ auto rp1 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp1.delta) <<
+ "delta should be 1 with no intervening responses by" <<
+ "other servers";
+ EXPECT_EQ(1u, rp1.rho) <<
+ "rho should be 1 with no intervening reservation responses by" <<
+ "other servers";
+
+ auto rp2 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp2.delta) <<
+ "delta should be 1 with no intervening responses by" <<
+ "other servers";
+ EXPECT_EQ(1u, rp2.rho) <<
+ "rho should be 1 with no intervening reservation responses by" <<
+ "other servers";
+
+ // RESPONSE
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+
+ auto rp3 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp3.delta) <<
+ "delta should be 1 with no intervening responses by" <<
+ "other servers";
+ EXPECT_EQ(1u, rp3.rho) <<
+ "rho should be 1 with no intervening reservation responses by" <<
+ "other servers";
+
+ // RESPONSE
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+
+ auto rp4 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp4.delta) <<
+ "delta should be 2 with one intervening priority response by " <<
+ "another server";
+ EXPECT_EQ(1u, rp4.rho) <<
+ "rho should be 1 with one intervening priority responses by " <<
+ "another server";
+
+ auto rp5 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp5.delta) <<
+ "delta should be 1 with no intervening responses by" <<
+ "other servers";
+ EXPECT_EQ(1u, rp5.rho) <<
+ "rho should be 1 with no intervening reservation responses by" <<
+ "other servers";
+
+ // RESPONSE
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+
+ auto rp6 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp6.delta) <<
+ "delta should be 2 with one intervening reservation response by " <<
+ "another server";
+ EXPECT_EQ(1u, rp6.rho) <<
+ "rho should be 2 with one intervening reservation responses by " <<
+ "another server";
+
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+
+ auto rp7 = st.get_req_params(server1);
+
+ EXPECT_EQ(5u, rp7.delta) <<
+ "delta should be 5 with four intervening responses by " <<
+ "another server";
+ EXPECT_EQ(1u, rp7.rho) <<
+ "rho should be 1 with two intervening reservation responses by " <<
+ "another server";
+
+ auto rp7b = st.get_req_params(server2);
+
+ EXPECT_EQ(9u, rp7b.delta) <<
+ "delta should be 9 with three intervening responses by " <<
+ "another server";
+ EXPECT_EQ(4u, rp7b.rho) <<
+ "rho should be 4 with one intervening reservation responses by " <<
+ "another server";
+
+ auto rp8 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp8.delta) <<
+ "delta should be 1 with no intervening responses by " <<
+ "another server";
+ EXPECT_EQ(1u, rp8.rho) <<
+ "rho should be 1 with no intervening reservation responses by " <<
+ "another server";
+
+ auto rp8b = st.get_req_params(server2);
+ EXPECT_EQ(1u, rp8b.delta) <<
+ "delta should be 1 with no intervening responses by " <<
+ "another server";
+ EXPECT_EQ(1u, rp8b.rho) <<
+ "rho should be 1 with no intervening reservation responses by " <<
+ "another server";
+ } // TEST
+
+
+ // NB: the BorrowingTracker has not been fully tested and the
+ // expected values below have not yet been compared with the
+ // theoretically correct values.
+ TEST(dmclock_client, delta_rho_values_orig_tracker) {
+ using ServerId = int;
+
+ ServerId server1 = 101;
+ ServerId server2 = 7;
+
+ dmc::ServiceTracker<ServerId,OrigTracker>
+ st(std::chrono::seconds(2), std::chrono::seconds(3));
+
+ auto rp1 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp1.delta);
+ EXPECT_EQ(1u, rp1.rho);
+
+ auto rp2 = st.get_req_params(server1);
+
+ EXPECT_EQ(0u, rp2.delta);
+ EXPECT_EQ(0u, rp2.rho);
+
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+
+ auto rp3 = st.get_req_params(server1);
+
+ EXPECT_EQ(0u, rp3.delta);
+ EXPECT_EQ(0u, rp3.rho);
+
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+
+ auto rp4 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp4.delta);
+ EXPECT_EQ(0u, rp4.rho);
+
+ auto rp5 = st.get_req_params(server1);
+
+ EXPECT_EQ(0u, rp5.delta);
+ EXPECT_EQ(0u, rp5.rho);
+
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+
+ auto rp6 = st.get_req_params(server1);
+
+ EXPECT_EQ(1u, rp6.delta);
+ EXPECT_EQ(1u, rp6.rho);
+
+ // auto rp6_b = st.get_req_params(server2);
+
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::reservation, 1u);
+ st.track_resp(server1, dmc::PhaseType::priority, 1u);
+ st.track_resp(server2, dmc::PhaseType::priority, 1u);
+
+ auto rp7 = st.get_req_params(server1);
+
+ EXPECT_EQ(4u, rp7.delta);
+ EXPECT_EQ(2u, rp7.rho);
+
+ auto rp7b = st.get_req_params(server2);
+
+ EXPECT_EQ(3u, rp7b.delta);
+ EXPECT_EQ(1u, rp7b.rho);
+
+ auto rp8 = st.get_req_params(server1);
+
+ EXPECT_EQ(0u, rp8.delta);
+ EXPECT_EQ(0u, rp8.rho);
+
+ auto rp8b = st.get_req_params(server2);
+ EXPECT_EQ(0u, rp8b.delta);
+ EXPECT_EQ(0u, rp8b.rho);
+ } // TEST
+
+ } // namespace dmclock
+} // namespace crimson
diff --git a/src/dmclock/test/test_dmclock_server.cc b/src/dmclock/test/test_dmclock_server.cc
new file mode 100644
index 00000000..03b2cf93
--- /dev/null
+++ b/src/dmclock/test/test_dmclock_server.cc
@@ -0,0 +1,1140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include <memory>
+#include <chrono>
+#include <iostream>
+#include <list>
+#include <vector>
+
+
+#include "dmclock_server.h"
+#include "dmclock_util.h"
+#include "gtest/gtest.h"
+
+// process control to prevent core dumps during gtest death tests
+#include "dmcPrCtl.h"
+
+
+namespace dmc = crimson::dmclock;
+
+
+// we need a request object; an empty one will do
+struct Request {
+};
+
+
+namespace crimson {
+ namespace dmclock {
+
+ /*
+ * Allows us to test the code provided with the mutex provided locked.
+ */
+ static void test_locked(std::mutex& mtx, std::function<void()> code) {
+ std::unique_lock<std::mutex> l(mtx);
+ code();
+ }
+
+
+ TEST(dmclock_server, bad_tag_deathtest) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,true>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 18;
+
+ double reservation = 0.0;
+ double weight = 0.0;
+
+ dmc::ClientInfo ci1(reservation, weight, 0.0);
+ dmc::ClientInfo ci2(reservation, weight, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &ci1;
+ else if (client2 == c) return &ci2;
+ else {
+ ADD_FAILURE() << "got request from neither of two clients";
+ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+ ReqParams req_params(1,1);
+
+ // Disable coredumps
+ PrCtl unset_dumpable;
+
+ EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client1, req_params),
+ "Assertion.*reservation.*max_tag.*"
+ "proportion.*max_tag") <<
+ "we should fail if a client tries to generate a reservation tag "
+ "where reservation and proportion are both 0";
+
+
+ EXPECT_DEATH_IF_SUPPORTED(pq->add_request(Request{}, client2, req_params),
+ "Assertion.*reservation.*max_tag.*"
+ "proportion.*max_tag") <<
+ "we should fail if a client tries to generate a reservation tag "
+ "where reservation and proportion are both 0";
+
+ EXPECT_DEATH_IF_SUPPORTED(Queue(client_info_f, AtLimit::Reject),
+ "Assertion.*Reject.*Delayed") <<
+ "we should fail if a client tries to construct a queue with both "
+ "DelayedTagCalc and AtLimit::Reject";
+ }
+
+
+ TEST(dmclock_server, client_idle_erase) {
+ using ClientId = int;
+ using Queue = dmc::PushPriorityQueue<ClientId,Request>;
+ ClientId client = 17;
+ double reservation = 100.0;
+
+ dmc::ClientInfo ci(reservation, 1.0, 0.0);
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &ci;
+ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [] (const ClientId& c,
+ std::unique_ptr<Request> req,
+ dmc::PhaseType phase,
+ uint64_t req_cost) {
+ // empty; do nothing
+ };
+
+ Queue pq(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ std::chrono::seconds(3),
+ std::chrono::seconds(5),
+ std::chrono::seconds(2),
+ AtLimit::Wait);
+
+ auto lock_pq = [&](std::function<void()> code) {
+ test_locked(pq.data_mtx, code);
+ };
+
+
+ /* The timeline should be as follows:
+ *
+ * 0 seconds : request created
+ *
+ * 1 seconds : map is size 1, idle is false
+ *
+ * 2 seconds : clean notes first mark; +2 is base for further calcs
+ *
+ * 4 seconds : clean does nothing except makes another mark
+ *
+ * 5 seconds : when we're secheduled to idle (+2 + 3)
+ *
+ * 6 seconds : clean idles client
+ *
+ * 7 seconds : when we're secheduled to erase (+2 + 5)
+ *
+ * 7 seconds : verified client is idle
+ *
+ * 8 seconds : clean erases client info
+ *
+ * 9 seconds : verified client is erased
+ */
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map initially has size 0";
+ });
+
+ Request req;
+ dmc::ReqParams req_params(1, 1);
+ EXPECT_EQ(0, pq.add_request_time(req, client, req_params, dmc::get_time()));
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ lock_pq([&] () {
+ EXPECT_EQ(1u, pq.client_map.size()) <<
+ "client map has 1 after 1 client";
+ EXPECT_FALSE(pq.client_map.at(client)->idle) <<
+ "initially client map entry shows not idle.";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(6));
+
+ lock_pq([&] () {
+ EXPECT_TRUE(pq.client_map.at(client)->idle) <<
+ "after idle age client map entry shows idle.";
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map loses its entry after erase age";
+ });
+ } // TEST
+
+
+ TEST(dmclock_server, delayed_tag_calc) {
+ using ClientId = int;
+ constexpr ClientId client1 = 17;
+
+ using DelayedQueue = PullPriorityQueue<ClientId, Request, true>;
+ using ImmediateQueue = PullPriorityQueue<ClientId, Request, false>;
+
+ ClientInfo info(0.0, 1.0, 1.0);
+ auto client_info_f = [&] (ClientId c) -> const ClientInfo* {
+ return &info;
+ };
+
+ Time t{1};
+ {
+ DelayedQueue queue(client_info_f);
+
+ queue.add_request_time({}, client1, {0,0}, t);
+ queue.add_request_time({}, client1, {0,0}, t + 1);
+ queue.add_request_time({}, client1, {10,10}, t + 2);
+
+ auto pr1 = queue.pull_request(t);
+ ASSERT_TRUE(pr1.is_retn());
+ auto pr2 = queue.pull_request(t + 1);
+ // ReqParams{10,10} from request #3 pushes request #2 over limit by 10s
+ ASSERT_TRUE(pr2.is_future());
+ EXPECT_DOUBLE_EQ(t + 11, pr2.getTime());
+ }
+ {
+ ImmediateQueue queue(client_info_f);
+
+ queue.add_request_time({}, client1, {0,0}, t);
+ queue.add_request_time({}, client1, {0,0}, t + 1);
+ queue.add_request_time({}, client1, {10,10}, t + 2);
+
+ auto pr1 = queue.pull_request(t);
+ ASSERT_TRUE(pr1.is_retn());
+ auto pr2 = queue.pull_request(t + 1);
+ // ReqParams{10,10} from request #3 has no effect on request #2
+ ASSERT_TRUE(pr2.is_retn());
+ auto pr3 = queue.pull_request(t + 2);
+ ASSERT_TRUE(pr3.is_future());
+ EXPECT_DOUBLE_EQ(t + 12, pr3.getTime());
+ }
+ }
+
+#if 0
+ TEST(dmclock_server, reservation_timing) {
+ using ClientId = int;
+ // NB? PUSH OR PULL
+ using Queue = std::unique_ptr<dmc::PriorityQueue<ClientId,Request>>;
+ using std::chrono::steady_clock;
+
+ int client = 17;
+
+ std::vector<dmc::Time> times;
+ std::mutex times_mtx;
+ using Guard = std::lock_guard<decltype(times_mtx)>;
+
+ // reservation every second
+ dmc::ClientInfo ci(1.0, 0.0, 0.0);
+ Queue pq;
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &ci;
+ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [&] (const ClientId& c,
+ std::unique_ptr<Request> req,
+ dmc::PhaseType phase) {
+ {
+ Guard g(times_mtx);
+ times.emplace_back(dmc::get_time());
+ }
+ std::thread complete([&](){ pq->request_completed(); });
+ complete.detach();
+ };
+
+ // NB? PUSH OR PULL
+ pq = Queue(new dmc::PriorityQueue<ClientId,Request>(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ false));
+
+ Request req;
+ ReqParams<ClientId> req_params(client, 1, 1);
+
+ for (int i = 0; i < 5; ++i) {
+ pq->add_request_time(req, req_params, dmc::get_time());
+ }
+
+ {
+ Guard g(times_mtx);
+ std::this_thread::sleep_for(std::chrono::milliseconds(5500));
+ EXPECT_EQ(5, times.size()) <<
+ "after 5.5 seconds, we should have 5 requests times at 1 second apart";
+ }
+ } // TEST
+#endif
+
+
+ TEST(dmclock_server, remove_by_req_filter) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info1;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ EXPECT_EQ(0, pq.add_request(MyReq(1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(11), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(0), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(13), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(13), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(98), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(44), client1, req_params));
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(9u, pq.request_count());
+
+ pq.remove_by_req_filter([](MyReqRef&& r) -> bool {return 1 == r->id % 2;});
+
+ EXPECT_EQ(5u, pq.request_count());
+
+ std::list<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (MyReqRef&& r) -> bool {
+ if (0 == r->id % 2) {
+ capture.push_front(*r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(5u, capture.size());
+ int total = 0;
+ for (auto i : capture) {
+ total += i.id;
+ }
+ EXPECT_EQ(146, total) << " sum of captured items should be 146";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_req_filter_ordering_forwards_visit) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+
+ ClientId client1 = 17;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info1;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ EXPECT_EQ(0, pq.add_request(MyReq(1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(3), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(4), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(5), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(6), client1, req_params));
+
+ EXPECT_EQ(1u, pq.client_count());
+ EXPECT_EQ(6u, pq.request_count());
+
+ // remove odd ids in forward order and append to end
+
+ std::vector<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (MyReqRef&& r) -> bool {
+ if (1 == r->id % 2) {
+ capture.push_back(*r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ false);
+
+ EXPECT_EQ(3u, pq.request_count());
+ EXPECT_EQ(3u, capture.size());
+ EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
+ EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
+ EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
+
+ // remove even ids in reverse order but insert at front so comes
+ // out forwards
+
+ std::vector<MyReq> capture2;
+ pq.remove_by_req_filter(
+ [&capture2] (MyReqRef&& r) -> bool {
+ if (0 == r->id % 2) {
+ capture2.insert(capture2.begin(), *r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ false);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(3u, capture2.size());
+ EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
+ EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
+ EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_req_filter_ordering_backwards_visit) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+
+ ClientId client1 = 17;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info1;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ EXPECT_EQ(0, pq.add_request(MyReq(1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(3), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(4), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(5), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(6), client1, req_params));
+
+ EXPECT_EQ(1u, pq.client_count());
+ EXPECT_EQ(6u, pq.request_count());
+
+ // now remove odd ids in forward order
+
+ std::vector<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (MyReqRef&& r) -> bool {
+ if (1 == r->id % 2) {
+ capture.insert(capture.begin(), *r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(3u, pq.request_count());
+ EXPECT_EQ(3u, capture.size());
+ EXPECT_EQ(1, capture[0].id) << "items should come out in forward order";
+ EXPECT_EQ(3, capture[1].id) << "items should come out in forward order";
+ EXPECT_EQ(5, capture[2].id) << "items should come out in forward order";
+
+ // now remove even ids in reverse order
+
+ std::vector<MyReq> capture2;
+ pq.remove_by_req_filter(
+ [&capture2] (MyReqRef&& r) -> bool {
+ if (0 == r->id % 2) {
+ capture2.push_back(*r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(3u, capture2.size());
+ EXPECT_EQ(6, capture2[0].id) << "items should come out in reverse order";
+ EXPECT_EQ(4, capture2[1].id) << "items should come out in reverse order";
+ EXPECT_EQ(2, capture2[2].id) << "items should come out in reverse order";
+ } // TEST
+
+
+ TEST(dmclock_server, remove_by_client) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info1;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ EXPECT_EQ(0, pq.add_request(MyReq(1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(11), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(0), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(13), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(2), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(13), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(98), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(44), client1, req_params));
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(9u, pq.request_count());
+
+ std::list<MyReq> removed;
+
+ pq.remove_by_client(client1,
+ true,
+ [&removed] (MyReqRef&& r) {
+ removed.push_front(*r);
+ });
+
+ EXPECT_EQ(3u, removed.size());
+ EXPECT_EQ(1, removed.front().id);
+ removed.pop_front();
+ EXPECT_EQ(11, removed.front().id);
+ removed.pop_front();
+ EXPECT_EQ(44, removed.front().id);
+ removed.pop_front();
+
+ EXPECT_EQ(6u, pq.request_count());
+
+ Queue::PullReq pr = pq.pull_request();
+ EXPECT_TRUE(pr.is_retn());
+ EXPECT_EQ(2, pr.get_retn().request->id);
+
+ pr = pq.pull_request();
+ EXPECT_TRUE(pr.is_retn());
+ EXPECT_EQ(0, pr.get_retn().request->id);
+
+ pq.remove_by_client(client2);
+ EXPECT_EQ(0u, pq.request_count()) <<
+ "after second client removed, none left";
+ } // TEST
+
+
+ TEST(dmclock_server_pull, pull_weight) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 1.0, 0.0);
+ dmc::ClientInfo info2(0.0, 2.0, 0.0);
+
+ QueueRef pq;
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &info1;
+ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "two-thirds of request should have come from second client";
+ }
+
+
+ TEST(dmclock_server_pull, pull_reservation) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ ClientId client2 = 8;
+
+ dmc::ClientInfo info1(2.0, 0.0, 0.0);
+ dmc::ClientInfo info2(1.0, 0.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &info1;
+ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto old_time = dmc::get_time() - 100.0;
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, old_time));
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client2, req_params, old_time));
+ old_time += 0.001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::reservation, retn.phase);
+ }
+
+ EXPECT_EQ(4, c1_count) <<
+ "two-thirds of request should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "one-third of request should have come from second client";
+ } // dmclock_server_pull.pull_reservation
+
+
+ TEST(dmclock_server_pull, update_client_info) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,false>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ dmc::ClientInfo info1(0.0, 100.0, 0.0);
+ dmc::ClientInfo info2(0.0, 200.0, 0.0);
+
+ QueueRef pq;
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &info1;
+ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ info1 = dmc::ClientInfo(0.0, 200.0, 0.0);
+ pq->update_client_info(17);
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 6; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(3, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(3, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
+ TEST(dmclock_server_pull, dynamic_cli_info_f) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request,true,true>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 17;
+ ClientId client2 = 98;
+
+ std::vector<dmc::ClientInfo> info1;
+ std::vector<dmc::ClientInfo> info2;
+
+ info1.push_back(dmc::ClientInfo(0.0, 100.0, 0.0));
+ info1.push_back(dmc::ClientInfo(0.0, 150.0, 0.0));
+
+ info2.push_back(dmc::ClientInfo(0.0, 200.0, 0.0));
+ info2.push_back(dmc::ClientInfo(0.0, 50.0, 0.0));
+
+ uint cli_info_group = 0;
+
+ QueueRef pq;
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &info1[cli_info_group];
+ else if (client2 == c) return &info2[cli_info_group];
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return nullptr;
+ }
+ };
+
+ pq = QueueRef(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
+
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < 10; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (i > 5) continue;
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2, c1_count) <<
+ "before: one-third of request should have come from first client";
+ EXPECT_EQ(4, c2_count) <<
+ "before: two-thirds of request should have come from second client";
+
+ std::chrono::seconds dura(1);
+ std::this_thread::sleep_for(dura);
+
+ cli_info_group = 1;
+
+ now = dmc::get_time();
+
+ for (int i = 0; i < 6; ++i) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
+
+ c1_count = 0;
+ c2_count = 0;
+ for (int i = 0; i < 8; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) ++c1_count;
+ else if (client2 == retn.client) ++c2_count;
+ else ADD_FAILURE() << "got request from neither of two clients";
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(6, c1_count) <<
+ "after: one-third of request should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "after: two-thirds of request should have come from second client";
+ }
+
+
+ // This test shows what happens when a request can be ready (under
+ // limit) but not schedulable since proportion tag is 0. We expect
+ // to get some future and none responses.
+ TEST(dmclock_server_pull, ready_and_under_limit) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ ClientId client2 = 8;
+
+ dmc::ClientInfo info1(1.0, 0.0, 0.0);
+ dmc::ClientInfo info2(1.0, 0.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) return &info1;
+ else if (client2 == c) return &info2;
+ else {
+ ADD_FAILURE() << "client info looked up for non-existant client";
+ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(0, 0);
+
+ // make sure all times are well before now
+ auto start_time = dmc::get_time() - 100.0;
+
+ // add six requests; for same client reservations spaced one apart
+ for (int i = 0; i < 3; ++i) {
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, start_time));
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client2, req_params, start_time));
+ }
+
+ Queue::PullReq pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 0.5);
+ EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
+ "too soon for next reservation";
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 1.5);
+ EXPECT_EQ(Queue::NextReqType::future, pr.type) <<
+ "too soon for next reservation";
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ pr = pq->pull_request(start_time + 2.5);
+ EXPECT_EQ(Queue::NextReqType::none, pr.type) << "no more requests left";
+ }
+
+
+ TEST(dmclock_server_pull, pull_none) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ dmc::ClientInfo info(1.0, 1.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+
+ // Request req;
+ ReqParams req_params(1,1);
+
+ auto now = dmc::get_time();
+
+ Queue::PullReq pr = pq->pull_request(now + 100);
+
+ EXPECT_EQ(Queue::NextReqType::none, pr.type);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(1.0, 0.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, now + 100));
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::future, pr.type);
+
+ Time when = boost::get<Time>(pr.data);
+ EXPECT_EQ(now + 100, when);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future_limit_break_weight) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(0.0, 1.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Allow));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, now + 100));
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ EXPECT_EQ(client1, retn.client);
+ }
+
+
+ TEST(dmclock_server_pull, pull_future_limit_break_reservation) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ // ClientId client2 = 8;
+
+ dmc::ClientInfo info(1.0, 0.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Allow));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are well before now
+ auto now = dmc::get_time();
+
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, now + 100));
+ Queue::PullReq pr = pq->pull_request(now);
+
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ EXPECT_EQ(client1, retn.client);
+ }
+
+
+ TEST(dmclock_server_pull, pull_reject_at_limit) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId, Request, false>;
+ using MyReqRef = typename Queue::RequestRef;
+
+ ClientId client1 = 52;
+ ClientId client2 = 53;
+
+ dmc::ClientInfo info(0.0, 1.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ Queue pq(client_info_f, AtLimit::Reject);
+
+ {
+ // success at 1 request per second
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{1}));
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{2}));
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{3}));
+ // request too soon
+ EXPECT_EQ(EAGAIN, pq.add_request_time({}, client1, {}, Time{3.9}));
+ // previous rejected request counts against limit
+ EXPECT_EQ(EAGAIN, pq.add_request_time({}, client1, {}, Time{4}));
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{6}));
+ }
+ {
+ auto r1 = MyReqRef{new Request};
+ ASSERT_EQ(0, pq.add_request(std::move(r1), client2, {}, Time{1}));
+ EXPECT_EQ(nullptr, r1); // add_request takes r1 on success
+ auto r2 = MyReqRef{new Request};
+ ASSERT_EQ(EAGAIN, pq.add_request(std::move(r2), client2, {}, Time{1}));
+ EXPECT_NE(nullptr, r2); // add_request does not take r2 on failure
+ }
+ }
+
+
+ TEST(dmclock_server_pull, pull_reject_threshold) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId, Request, false>;
+
+ ClientId client1 = 52;
+
+ dmc::ClientInfo info(0.0, 1.0, 1.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ // allow up to 3 seconds worth of limit before rejecting
+ Queue pq(client_info_f, RejectThreshold{3.0});
+
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{1})); // at limit=1
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{1})); // 1 over
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{1})); // 2 over
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{1})); // 3 over
+ EXPECT_EQ(EAGAIN, pq.add_request_time({}, client1, {}, Time{1})); // reject
+ EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{3})); // 3 over
+ }
+
+ } // namespace dmclock
+} // namespace crimson
diff --git a/src/dmclock/test/test_test_client.cc b/src/dmclock/test/test_test_client.cc
new file mode 100644
index 00000000..11cbd74b
--- /dev/null
+++ b/src/dmclock/test/test_test_client.cc
@@ -0,0 +1,138 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include <atomic>
+#include <thread>
+#include <chrono>
+#include <iostream>
+
+#include "gtest/gtest.h"
+
+#include "sim_recs.h"
+#include "sim_client.h"
+
+#include "test_dmclock.h"
+
+
+using namespace std::placeholders;
+
+namespace dmc = crimson::dmclock;
+namespace test = crimson::test_dmc;
+namespace sim = crimson::qos_simulation;
+
+using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
+
+static TimePoint now() { return std::chrono::system_clock::now(); }
+
+
+TEST(test_client, full_bore_timing) {
+ std::atomic_ulong count(0);
+
+ ServerId server_id = 3;
+
+ sim::TestResponse resp(0);
+ dmc::PhaseType resp_params = dmc::PhaseType::priority;
+ test::DmcClient* client;
+ const sim::Cost request_cost = 1u;
+
+ auto start = now();
+ client =
+ new test::DmcClient(ClientId(0),
+ [&] (const ServerId& server,
+ const sim::TestRequest& req,
+ const ClientId& client_id,
+ const dmc::ReqParams& req_params) {
+ ++count;
+ client->receive_response(resp, client_id, resp_params, request_cost);
+ },
+ [&] (const uint64_t seed) -> ServerId& {
+ return server_id;
+ },
+ test::dmc_client_accumulate_f,
+ 1000, // ops to run
+ 100, // iops goal
+ 5); // outstanding ops allowed
+ client->wait_until_done();
+ auto end = now();
+ EXPECT_EQ(1000u, count) << "didn't get right number of ops";
+
+ int milliseconds = (end - start) / std::chrono::milliseconds(1);
+ EXPECT_LT(10000, milliseconds) << "timing too fast to be correct";
+ EXPECT_GT(12000, milliseconds) << "timing suspiciously slow";
+
+ delete client;
+}
+
+
+TEST(test_client, paused_timing) {
+ std::atomic_ulong count(0);
+ std::atomic_ulong unresponded_count(0);
+ std::atomic_bool auto_respond(false);
+
+ ClientId my_client_id = 0;
+ ServerId server_id = 3;
+
+ sim::TestResponse resp(0);
+ dmc::PhaseType resp_params = dmc::PhaseType::priority;
+ const uint64_t request_cost = 1u;
+ test::DmcClient* client;
+
+ auto start = now();
+ client =
+ new test::DmcClient(my_client_id,
+ [&] (const ServerId& server,
+ const sim::TestRequest& req,
+ const ClientId& client_id,
+ const dmc::ReqParams& req_params) {
+ ++count;
+ if (auto_respond.load()) {
+ client->receive_response(resp, client_id, resp_params, request_cost);
+ } else {
+ ++unresponded_count;
+ }
+ },
+ [&] (const uint64_t seed) -> ServerId& {
+ return server_id;
+ },
+ test::dmc_client_accumulate_f,
+
+ 1000, // ops to run
+ 100, // iops goal
+ 50); // outstanding ops allowed
+ std::thread t([&]() {
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ EXPECT_EQ(50u, unresponded_count.load()) <<
+ "should have 50 unresponded calls";
+ auto_respond = true;
+ // respond to those 50 calls
+ for(int i = 0; i < 50; ++i) {
+ client->receive_response(resp, my_client_id, resp_params, 1);
+ --unresponded_count;
+ }
+ });
+
+ client->wait_until_done();
+ auto end = now();
+ int milliseconds = (end - start) / std::chrono::milliseconds(1);
+
+ // the 50 outstanding ops allowed means the first half-second of
+ // requests get responded to during the 5 second pause. So we have
+ // to adjust our expectations by a half-second.
+ EXPECT_LT(15000 - 500, milliseconds) << "timing too fast to be correct";
+ EXPECT_GT(17000 - 500, milliseconds) << "timing suspiciously slow";
+ t.join();
+
+ delete client;
+}