summaryrefslogtreecommitdiffstats
path: root/src/dmclock/sim
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/dmclock/sim
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/dmclock/sim/CMakeLists.txt11
-rw-r--r--src/dmclock/sim/dmc_sim_100th.conf32
-rw-r--r--src/dmclock/sim/dmc_sim_example.conf56
-rw-r--r--src/dmclock/sim/src/CMakeLists.txt38
-rw-r--r--src/dmclock/sim/src/ConfUtils.cc574
-rw-r--r--src/dmclock/sim/src/ConfUtils.h83
-rw-r--r--src/dmclock/sim/src/config.cc184
-rw-r--r--src/dmclock/sim/src/config.h158
-rw-r--r--src/dmclock/sim/src/sim_client.h340
-rw-r--r--src/dmclock/sim/src/sim_recs.h131
-rw-r--r--src/dmclock/sim/src/sim_server.h245
-rw-r--r--src/dmclock/sim/src/simulate.h448
-rw-r--r--src/dmclock/sim/src/ssched/ssched_client.h51
-rw-r--r--src/dmclock/sim/src/ssched/ssched_recs.h44
-rw-r--r--src/dmclock/sim/src/ssched/ssched_server.h194
-rw-r--r--src/dmclock/sim/src/str_list.cc106
-rw-r--r--src/dmclock/sim/src/str_list.h109
-rw-r--r--src/dmclock/sim/src/test_dmclock.cc55
-rw-r--r--src/dmclock/sim/src/test_dmclock.h64
-rw-r--r--src/dmclock/sim/src/test_dmclock_main.cc342
-rw-r--r--src/dmclock/sim/src/test_ssched.cc40
-rw-r--r--src/dmclock/sim/src/test_ssched.h64
-rw-r--r--src/dmclock/sim/src/test_ssched_main.cc199
23 files changed, 3568 insertions, 0 deletions
diff --git a/src/dmclock/sim/CMakeLists.txt b/src/dmclock/sim/CMakeLists.txt
new file mode 100644
index 000000000..ca537ce6f
--- /dev/null
+++ b/src/dmclock/sim/CMakeLists.txt
@@ -0,0 +1,11 @@
+set(K_WAY_HEAP "" CACHE STRING "K_WAY_HEAP")
+
+if(K_WAY_HEAP)
+ if(K_WAY_HEAP LESS 2)
+ message(FATAL_ERROR "K_WAY_HEAP value should be at least 2")
+ else()
+ set(CMAKE_CXX_SIM_FLAGS "-DK_WAY_HEAP=${K_WAY_HEAP}")
+ endif()
+endif()
+
+add_subdirectory(src)
diff --git a/src/dmclock/sim/dmc_sim_100th.conf b/src/dmclock/sim/dmc_sim_100th.conf
new file mode 100644
index 000000000..17d004354
--- /dev/null
+++ b/src/dmclock/sim/dmc_sim_100th.conf
@@ -0,0 +1,32 @@
+[global]
+server_groups = 1
+client_groups = 2
+server_random_selection = true
+server_soft_limit = true
+
+[client.0]
+client_count = 99
+client_wait = 0
+client_total_ops = 1000
+client_server_select_range = 10
+client_iops_goal = 50
+client_outstanding_ops = 100
+client_reservation = 20.0
+client_limit = 60.0
+client_weight = 1.0
+
+[client.1]
+client_count = 1
+client_wait = 10
+client_total_ops = 1000
+client_server_select_range = 10
+client_iops_goal = 50
+client_outstanding_ops = 100
+client_reservation = 20.0
+client_limit = 60.0
+client_weight = 1.0
+
+[server.0]
+server_count = 100
+server_iops = 40
+server_threads = 1
diff --git a/src/dmclock/sim/dmc_sim_example.conf b/src/dmclock/sim/dmc_sim_example.conf
new file mode 100644
index 000000000..e98b870eb
--- /dev/null
+++ b/src/dmclock/sim/dmc_sim_example.conf
@@ -0,0 +1,56 @@
+[global]
+server_groups = 1
+client_groups = 4
+server_random_selection = false
+server_soft_limit = false
+
+[client.0]
+client_count = 1
+client_wait = 0
+client_total_ops = 2000
+client_server_select_range = 1
+client_iops_goal = 200
+client_outstanding_ops = 32
+client_reservation = 0.0
+client_limit = 0.0
+client_weight = 1.0
+
+[client.1]
+client_count = 1
+client_wait = 5
+client_total_ops = 2000
+client_server_select_range = 1
+client_iops_goal = 200
+client_outstanding_ops = 32
+client_reservation = 0.0
+client_limit = 40.0
+client_weight = 1.0
+
+[client.2]
+client_count = 1
+client_wait = 10
+client_total_ops = 2000
+client_server_select_range = 1
+client_iops_goal = 200
+client_outstanding_ops = 32
+client_reservation = 0.0
+client_limit = 50.0
+client_weight = 2.0
+client_req_cost = 1
+
+[client.3]
+client_count = 1
+client_wait = 10
+client_total_ops = 2000
+client_server_select_range = 1
+client_iops_goal = 200
+client_outstanding_ops = 32
+client_reservation = 0.0
+client_limit = 50.0
+client_weight = 2.0
+client_req_cost = 3
+
+[server.0]
+server_count = 1
+server_iops = 160
+server_threads = 1
diff --git a/src/dmclock/sim/src/CMakeLists.txt b/src/dmclock/sim/src/CMakeLists.txt
new file mode 100644
index 000000000..dab011ed0
--- /dev/null
+++ b/src/dmclock/sim/src/CMakeLists.txt
@@ -0,0 +1,38 @@
+set(local_flags "-Wall ${CMAKE_CXX_SIM_FLAGS}")
+
+set(ssched_sim_srcs test_ssched.cc test_ssched_main.cc)
+set(dmc_sim_srcs test_dmclock.cc test_dmclock_main.cc)
+set(config_srcs config.cc str_list.cc ConfUtils.cc)
+
+set_source_files_properties(${ssched_sim_srcs} ${dmc_sim_srcs} ${dmc_srcs} ${config_srcs}
+ PROPERTIES
+ COMPILE_FLAGS "${local_flags}"
+ )
+
+if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
+ set(warnings_off " -Wno-unused-variable -Wno-unused-function")
+elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
+ set(warnings_off " -Wno-unused-but-set-variable -Wno-unused-function")
+endif()
+
+# append warning flags to certain source files
+set_property(
+ SOURCE ${ssched_sim_srcs} ${dmc_sim_srcs} ${config_srcs}
+ APPEND_STRING
+ PROPERTY COMPILE_FLAGS "${warnings_off}"
+ )
+
+add_executable(ssched_sim EXCLUDE_FROM_ALL ${ssched_sim_srcs})
+target_include_directories(ssched_sim PRIVATE ssched) # ssched code
+add_executable(dmc_sim EXCLUDE_FROM_ALL ${dmc_sim_srcs} ${config_srcs})
+
+set_target_properties(ssched_sim dmc_sim
+ PROPERTIES
+ RUNTIME_OUTPUT_DIRECTORY ..)
+
+add_dependencies(dmc_sim dmclock)
+
+target_link_libraries(ssched_sim LINK_PRIVATE Threads::Threads)
+target_link_libraries(dmc_sim LINK_PRIVATE dmclock)
+
+add_custom_target(dmclock-sims DEPENDS ssched_sim dmc_sim)
diff --git a/src/dmclock/sim/src/ConfUtils.cc b/src/dmclock/sim/src/ConfUtils.cc
new file mode 100644
index 000000000..a05f7dc42
--- /dev/null
+++ b/src/dmclock/sim/src/ConfUtils.cc
@@ -0,0 +1,574 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <algorithm>
+#include <errno.h>
+#include <list>
+#include <map>
+#include <sstream>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <string>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <iostream>
+
+#include <assert.h>
+#include "ConfUtils.h"
+
+using std::cerr;
+using std::ostringstream;
+using std::pair;
+using std::string;
+
+#define MAX_CONFIG_FILE_SZ 0x40000000
+
+////////////////////////////// ConfLine //////////////////////////////
+ConfLine::
+ConfLine(const std::string &key_, const std::string &val_,
+ const std::string &newsection_, const std::string &comment_, int line_no_)
+ : key(key_), val(val_), newsection(newsection_)
+{
+ // If you want to implement writable ConfFile support, you'll need to save
+ // the comment and line_no arguments here.
+}
+
+bool ConfLine::
+operator<(const ConfLine &rhs) const
+{
+ // We only compare keys.
+ // If you have more than one line with the same key in a given section, the
+ // last one wins.
+ if (key < rhs.key)
+ return true;
+ else
+ return false;
+}
+
+std::ostream &operator<<(std::ostream& oss, const ConfLine &l)
+{
+ oss << "ConfLine(key = '" << l.key << "', val='"
+ << l.val << "', newsection='" << l.newsection << "')";
+ return oss;
+}
+///////////////////////// ConfFile //////////////////////////
+ConfFile::
+ConfFile()
+{
+}
+
+ConfFile::
+~ConfFile()
+{
+}
+
+void ConfFile::
+clear()
+{
+ sections.clear();
+}
+
+/* We load the whole file into memory and then parse it. Although this is not
+ * the optimal approach, it does mean that most of this code can be shared with
+ * the bufferlist loading function. Since bufferlists are always in-memory, the
+ * load_from_buffer interface works well for them.
+ * In general, configuration files should be a few kilobytes at maximum, so
+ * loading the whole configuration into memory shouldn't be a problem.
+ */
+int ConfFile::
+parse_file(const std::string &fname, std::deque<std::string> *errors,
+ std::ostream *warnings)
+{
+ clear();
+
+ int ret = 0;
+ size_t sz;
+ char *buf = NULL;
+ char buf2[128];
+ FILE *fp = fopen(fname.c_str(), "r");
+ if (!fp) {
+ ret = -errno;
+ return ret;
+ }
+
+ struct stat st_buf;
+ if (fstat(fileno(fp), &st_buf)) {
+ ret = -errno;
+ ostringstream oss;
+ oss << "read_conf: failed to fstat '" << fname << "': " << strerror_r(ret, buf2, sizeof(buf2));
+ errors->push_back(oss.str());
+ goto done;
+ }
+
+ if (st_buf.st_size > MAX_CONFIG_FILE_SZ) {
+ ostringstream oss;
+ oss << "read_conf: config file '" << fname << "' is " << st_buf.st_size
+ << " bytes, but the maximum is " << MAX_CONFIG_FILE_SZ;
+ errors->push_back(oss.str());
+ ret = -EINVAL;
+ goto done;
+ }
+
+ sz = (size_t)st_buf.st_size;
+ buf = (char*)malloc(sz);
+ if (!buf) {
+ ret = -ENOMEM;
+ goto done;
+ }
+
+ if (fread(buf, 1, sz, fp) != sz) {
+ if (ferror(fp)) {
+ ret = -errno;
+ ostringstream oss;
+ oss << "read_conf: fread error while reading '" << fname << "': "
+ << strerror_r(ret, buf2, sizeof(buf2));
+ errors->push_back(oss.str());
+ goto done;
+ }
+ else {
+ ostringstream oss;
+ oss << "read_conf: unexpected EOF while reading '" << fname << "': "
+ << "possible concurrent modification?";
+ errors->push_back(oss.str());
+ ret = -EIO;
+ goto done;
+ }
+ }
+
+ load_from_buffer(buf, sz, errors, warnings);
+ ret = 0;
+
+done:
+ free(buf);
+ fclose(fp);
+ return ret;
+}
+
+int ConfFile::
+read(const std::string &section, const std::string &key, std::string &val) const
+{
+ string k(normalize_key_name(key));
+
+ const_section_iter_t s = sections.find(section);
+ if (s == sections.end())
+ return -ENOENT;
+ ConfLine exemplar(k, "", "", "", 0);
+ ConfSection::const_line_iter_t l = s->second.lines.find(exemplar);
+ if (l == s->second.lines.end())
+ return -ENOENT;
+ val = l->val;
+ return 0;
+}
+
+ConfFile::const_section_iter_t ConfFile::
+sections_begin() const
+{
+ return sections.begin();
+}
+
+ConfFile::const_section_iter_t ConfFile::
+sections_end() const
+{
+ return sections.end();
+}
+
+void ConfFile::
+trim_whitespace(std::string &str, bool strip_internal)
+{
+ // strip preceding
+ const char *in = str.c_str();
+ while (true) {
+ char c = *in;
+ if ((!c) || (!isspace(c)))
+ break;
+ ++in;
+ }
+ char output[strlen(in) + 1];
+ strcpy(output, in);
+
+ // strip trailing
+ char *o = output + strlen(output);
+ while (true) {
+ if (o == output)
+ break;
+ --o;
+ if (!isspace(*o)) {
+ ++o;
+ *o = '\0';
+ break;
+ }
+ }
+
+ if (!strip_internal) {
+ str.assign(output);
+ return;
+ }
+
+ // strip internal
+ char output2[strlen(output) + 1];
+ char *out2 = output2;
+ bool prev_was_space = false;
+ for (char *u = output; *u; ++u) {
+ char c = *u;
+ if (isspace(c)) {
+ if (!prev_was_space)
+ *out2++ = c;
+ prev_was_space = true;
+ }
+ else {
+ *out2++ = c;
+ prev_was_space = false;
+ }
+ }
+ *out2++ = '\0';
+ str.assign(output2);
+}
+
+/* Normalize a key name.
+ *
+ * Normalized key names have no leading or trailing whitespace, and all
+ * whitespace is stored as underscores. The main reason for selecting this
+ * normal form is so that in common/config.cc, we can use a macro to stringify
+ * the field names of md_config_t and get a key in normal form.
+ */
+std::string ConfFile::
+normalize_key_name(const std::string &key)
+{
+ string k(key);
+ ConfFile::trim_whitespace(k, true);
+ std::replace(k.begin(), k.end(), ' ', '_');
+ return k;
+}
+
+std::ostream &operator<<(std::ostream &oss, const ConfFile &cf)
+{
+ for (ConfFile::const_section_iter_t s = cf.sections_begin();
+ s != cf.sections_end(); ++s) {
+ oss << "[" << s->first << "]\n";
+ for (ConfSection::const_line_iter_t l = s->second.lines.begin();
+ l != s->second.lines.end(); ++l) {
+ if (!l->key.empty()) {
+ oss << "\t" << l->key << " = \"" << l->val << "\"\n";
+ }
+ }
+ }
+ return oss;
+}
+
+void ConfFile::
+load_from_buffer(const char *buf, size_t sz, std::deque<std::string> *errors,
+ std::ostream *warnings)
+{
+ errors->clear();
+
+ section_iter_t::value_type vt("global", ConfSection());
+ pair < section_iter_t, bool > vr(sections.insert(vt));
+ assert(vr.second);
+ section_iter_t cur_section = vr.first;
+ std::string acc;
+
+ const char *b = buf;
+ int line_no = 0;
+ size_t line_len = -1;
+ size_t rem = sz;
+ while (1) {
+ b += line_len + 1;
+ rem -= line_len + 1;
+ if (rem == 0)
+ break;
+ line_no++;
+
+ // look for the next newline
+ const char *end = (const char*)memchr(b, '\n', rem);
+ if (!end) {
+ ostringstream oss;
+ oss << "read_conf: ignoring line " << line_no << " because it doesn't "
+ << "end with a newline! Please end the config file with a newline.";
+ errors->push_back(oss.str());
+ break;
+ }
+
+ // find length of line, and search for NULLs
+ line_len = 0;
+ bool found_null = false;
+ for (const char *tmp = b; tmp != end; ++tmp) {
+ line_len++;
+ if (*tmp == '\0') {
+ found_null = true;
+ }
+ }
+
+ if (found_null) {
+ ostringstream oss;
+ oss << "read_conf: ignoring line " << line_no << " because it has "
+ << "an embedded null.";
+ errors->push_back(oss.str());
+ acc.clear();
+ continue;
+ }
+
+ if ((line_len >= 1) && (b[line_len-1] == '\\')) {
+ // A backslash at the end of a line serves as a line continuation marker.
+ // Combine the next line with this one.
+ // Remove the backslash itself from the text.
+ acc.append(b, line_len - 1);
+ continue;
+ }
+
+ acc.append(b, line_len);
+
+ //cerr << "acc = '" << acc << "'" << std::endl;
+ ConfLine *cline = process_line(line_no, acc.c_str(), errors);
+ acc.clear();
+ if (!cline)
+ continue;
+ const std::string &csection(cline->newsection);
+ if (!csection.empty()) {
+ std::map <std::string, ConfSection>::value_type nt(csection, ConfSection());
+ pair < section_iter_t, bool > nr(sections.insert(nt));
+ cur_section = nr.first;
+ }
+ else {
+ if (cur_section->second.lines.count(*cline)) {
+ // replace an existing key/line in this section, so that
+ // [mysection]
+ // foo = 1
+ // foo = 2
+ // will result in foo = 2.
+ cur_section->second.lines.erase(*cline);
+ if (cline->key.length() && warnings)
+ *warnings << "warning: line " << line_no << ": '" << cline->key << "' in section '"
+ << cur_section->first << "' redefined " << std::endl;
+ }
+ // add line to current section
+ //std::cerr << "cur_section = " << cur_section->first << ", " << *cline << std::endl;
+ cur_section->second.lines.insert(*cline);
+ }
+ delete cline;
+ }
+
+ if (!acc.empty()) {
+ ostringstream oss;
+ oss << "read_conf: don't end with lines that end in backslashes!";
+ errors->push_back(oss.str());
+ }
+}
+
+/*
+ * A simple state-machine based parser.
+ * This probably could/should be rewritten with something like boost::spirit
+ * or yacc if the grammar ever gets more complex.
+ */
+ConfLine* ConfFile::
+process_line(int line_no, const char *line, std::deque<std::string> *errors)
+{
+ enum acceptor_state_t {
+ ACCEPT_INIT,
+ ACCEPT_SECTION_NAME,
+ ACCEPT_KEY,
+ ACCEPT_VAL_START,
+ ACCEPT_UNQUOTED_VAL,
+ ACCEPT_QUOTED_VAL,
+ ACCEPT_COMMENT_START,
+ ACCEPT_COMMENT_TEXT,
+ };
+ const char *l = line;
+ acceptor_state_t state = ACCEPT_INIT;
+ string key, val, newsection, comment;
+ bool escaping = false;
+ while (true) {
+ char c = *l++;
+ switch (state) {
+ case ACCEPT_INIT:
+ if (c == '\0')
+ return NULL; // blank line. Not an error, but not interesting either.
+ else if (c == '[')
+ state = ACCEPT_SECTION_NAME;
+ else if ((c == '#') || (c == ';'))
+ state = ACCEPT_COMMENT_TEXT;
+ else if (c == ']') {
+ ostringstream oss;
+ oss << "unexpected right bracket at char " << (l - line)
+ << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ else if (isspace(c)) {
+ // ignore whitespace here
+ }
+ else {
+ // try to accept this character as a key
+ state = ACCEPT_KEY;
+ --l;
+ }
+ break;
+ case ACCEPT_SECTION_NAME:
+ if (c == '\0') {
+ ostringstream oss;
+ oss << "error parsing new section name: expected right bracket "
+ << "at char " << (l - line) << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ else if ((c == ']') && (!escaping)) {
+ trim_whitespace(newsection, true);
+ if (newsection.empty()) {
+ ostringstream oss;
+ oss << "error parsing new section name: no section name found? "
+ << "at char " << (l - line) << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ state = ACCEPT_COMMENT_START;
+ }
+ else if (((c == '#') || (c == ';')) && (!escaping)) {
+ ostringstream oss;
+ oss << "unexpected comment marker while parsing new section name, at "
+ << "char " << (l - line) << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ else if ((c == '\\') && (!escaping)) {
+ escaping = true;
+ }
+ else {
+ escaping = false;
+ newsection += c;
+ }
+ break;
+ case ACCEPT_KEY:
+ if ((((c == '#') || (c == ';')) && (!escaping)) || (c == '\0')) {
+ ostringstream oss;
+ if (c == '\0') {
+ oss << "end of key=val line " << line_no
+ << " reached, no \"=val\" found...missing =?";
+ } else {
+ oss << "unexpected character while parsing putative key value, "
+ << "at char " << (l - line) << ", line " << line_no;
+ }
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ else if ((c == '=') && (!escaping)) {
+ key = normalize_key_name(key);
+ if (key.empty()) {
+ ostringstream oss;
+ oss << "error parsing key name: no key name found? "
+ << "at char " << (l - line) << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ state = ACCEPT_VAL_START;
+ }
+ else if ((c == '\\') && (!escaping)) {
+ escaping = true;
+ }
+ else {
+ escaping = false;
+ key += c;
+ }
+ break;
+ case ACCEPT_VAL_START:
+ if (c == '\0')
+ return new ConfLine(key, val, newsection, comment, line_no);
+ else if ((c == '#') || (c == ';'))
+ state = ACCEPT_COMMENT_TEXT;
+ else if (c == '"')
+ state = ACCEPT_QUOTED_VAL;
+ else if (isspace(c)) {
+ // ignore whitespace
+ }
+ else {
+ // try to accept character as a val
+ state = ACCEPT_UNQUOTED_VAL;
+ --l;
+ }
+ break;
+ case ACCEPT_UNQUOTED_VAL:
+ if (c == '\0') {
+ if (escaping) {
+ ostringstream oss;
+ oss << "error parsing value name: unterminated escape sequence "
+ << "at char " << (l - line) << ", line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ trim_whitespace(val, false);
+ return new ConfLine(key, val, newsection, comment, line_no);
+ }
+ else if (((c == '#') || (c == ';')) && (!escaping)) {
+ trim_whitespace(val, false);
+ state = ACCEPT_COMMENT_TEXT;
+ }
+ else if ((c == '\\') && (!escaping)) {
+ escaping = true;
+ }
+ else {
+ escaping = false;
+ val += c;
+ }
+ break;
+ case ACCEPT_QUOTED_VAL:
+ if (c == '\0') {
+ ostringstream oss;
+ oss << "found opening quote for value, but not the closing quote. "
+ << "line " << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ else if ((c == '"') && (!escaping)) {
+ state = ACCEPT_COMMENT_START;
+ }
+ else if ((c == '\\') && (!escaping)) {
+ escaping = true;
+ }
+ else {
+ escaping = false;
+ // Add anything, including whitespace.
+ val += c;
+ }
+ break;
+ case ACCEPT_COMMENT_START:
+ if (c == '\0') {
+ return new ConfLine(key, val, newsection, comment, line_no);
+ }
+ else if ((c == '#') || (c == ';')) {
+ state = ACCEPT_COMMENT_TEXT;
+ }
+ else if (isspace(c)) {
+ // ignore whitespace
+ }
+ else {
+ ostringstream oss;
+ oss << "unexpected character at char " << (l - line) << " of line "
+ << line_no;
+ errors->push_back(oss.str());
+ return NULL;
+ }
+ break;
+ case ACCEPT_COMMENT_TEXT:
+ if (c == '\0')
+ return new ConfLine(key, val, newsection, comment, line_no);
+ else
+ comment += c;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ assert(c != '\0'); // We better not go past the end of the input string.
+ }
+}
diff --git a/src/dmclock/sim/src/ConfUtils.h b/src/dmclock/sim/src/ConfUtils.h
new file mode 100644
index 000000000..3db1d1e14
--- /dev/null
+++ b/src/dmclock/sim/src/ConfUtils.h
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_CONFUTILS_H
+#define CEPH_CONFUTILS_H
+
+#include <deque>
+#include <map>
+#include <set>
+#include <string>
+
+/*
+ * Ceph configuration file support.
+ *
+ * This class loads an INI-style configuration from a file or bufferlist, and
+ * holds it in memory. In general, an INI configuration file is composed of
+ * sections, which contain key/value pairs. You can put comments on the end of
+ * lines by using either a hash mark (#) or the semicolon (;).
+ *
+ * You can get information out of ConfFile by calling get_key or by examining
+ * individual sections.
+ *
+ * This class could be extended to support modifying configuration files and
+ * writing them back out without too much difficulty. Currently, this is not
+ * implemented, and the file is read-only.
+ */
+class ConfLine {
+public:
+ ConfLine(const std::string &key_, const std::string &val_,
+ const std::string &newsection_, const std::string &comment_, int line_no_);
+ bool operator<(const ConfLine &rhs) const;
+ friend std::ostream &operator<<(std::ostream& oss, const ConfLine &l);
+
+ std::string key, val, newsection;
+};
+
+class ConfSection {
+public:
+ typedef std::set <ConfLine>::const_iterator const_line_iter_t;
+
+ std::set <ConfLine> lines;
+};
+
+class ConfFile {
+public:
+ typedef std::map <std::string, ConfSection>::iterator section_iter_t;
+ typedef std::map <std::string, ConfSection>::const_iterator const_section_iter_t;
+
+ ConfFile();
+ ~ConfFile();
+ void clear();
+ int parse_file(const std::string &fname, std::deque<std::string> *errors, std::ostream *warnings);
+ int read(const std::string &section, const std::string &key,
+ std::string &val) const;
+
+ const_section_iter_t sections_begin() const;
+ const_section_iter_t sections_end() const;
+
+ static void trim_whitespace(std::string &str, bool strip_internal);
+ static std::string normalize_key_name(const std::string &key);
+ friend std::ostream &operator<<(std::ostream &oss, const ConfFile &cf);
+
+private:
+ void load_from_buffer(const char *buf, size_t sz,
+ std::deque<std::string> *errors, std::ostream *warnings);
+ static ConfLine* process_line(int line_no, const char *line,
+ std::deque<std::string> *errors);
+
+ std::map <std::string, ConfSection> sections;
+};
+
+#endif
diff --git a/src/dmclock/sim/src/config.cc b/src/dmclock/sim/src/config.cc
new file mode 100644
index 000000000..dae2903e1
--- /dev/null
+++ b/src/dmclock/sim/src/config.cc
@@ -0,0 +1,184 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include <unistd.h>
+#include <string.h>
+#include <stdarg.h>
+
+#include <iostream>
+#include <vector>
+#include <list>
+
+#include "config.h"
+#include "str_list.h"
+
+
+static void dashes_to_underscores(const char *input, char *output) {
+ char c = 0;
+ char *o = output;
+ const char *i = input;
+ // first two characters are copied as-is
+ *o = *i++;
+ if (*o++ == '\0')
+ return;
+ *o = *i++;
+ if (*o++ == '\0')
+ return;
+ for (; ((c = *i)); ++i) {
+ if (c == '=') {
+ strcpy(o, i);
+ return;
+ }
+ if (c == '-')
+ *o++ = '_';
+ else
+ *o++ = c;
+ }
+ *o++ = '\0';
+}
+
+static int va_ceph_argparse_witharg(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, std::string *ret,
+ std::ostream &oss, va_list ap) {
+ const char *first = *i;
+ char tmp[strlen(first)+1];
+ dashes_to_underscores(first, tmp);
+ first = tmp;
+
+ // does this argument match any of the possibilities?
+ while (1) {
+ const char *a = va_arg(ap, char*);
+ if (a == NULL)
+ return 0;
+ int strlen_a = strlen(a);
+ char a2[strlen_a+1];
+ dashes_to_underscores(a, a2);
+ if (strncmp(a2, first, strlen(a2)) == 0) {
+ if (first[strlen_a] == '=') {
+ *ret = first + strlen_a + 1;
+ i = args.erase(i);
+ return 1;
+ }
+ else if (first[strlen_a] == '\0') {
+ // find second part (or not)
+ if (i+1 == args.end()) {
+ oss << "Option " << *i << " requires an argument." << std::endl;
+ i = args.erase(i);
+ return -EINVAL;
+ }
+ i = args.erase(i);
+ *ret = *i;
+ i = args.erase(i);
+ return 1;
+ }
+ }
+ }
+}
+
+bool crimson::qos_simulation::ceph_argparse_witharg(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, std::string *ret, ...) {
+ int r;
+ va_list ap;
+ va_start(ap, ret);
+ r = va_ceph_argparse_witharg(args, i, ret, std::cerr, ap);
+ va_end(ap);
+ if (r < 0)
+ _exit(1);
+ return r != 0;
+}
+
+void crimson::qos_simulation::ceph_argparse_early_args(std::vector<const char*>& args, std::string *conf_file_list) {
+ std::string val;
+
+ std::vector<const char *> orig_args = args;
+
+ for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_witharg(args, i, &val, "--conf", "-c", (char*)NULL)) {
+ *conf_file_list = val;
+ }
+ else {
+ // ignore
+ ++i;
+ }
+ }
+ return;
+}
+
+static bool stobool(const std::string & v) {
+ return !v.empty () &&
+ (strcasecmp (v.c_str (), "true") == 0 ||
+ atoi (v.c_str ()) != 0);
+}
+
+int crimson::qos_simulation::parse_config_file(const std::string &fname, sim_config_t &g_conf) {
+ ConfFile cf;
+ std::deque<std::string> err;
+ std::ostringstream warn;
+ int ret = cf.parse_file(fname.c_str(), &err, &warn);
+ if (ret) {
+ // error
+ return ret;
+ }
+
+ std::string val;
+ if (!cf.read("global", "server_groups", val))
+ g_conf.server_groups = std::stoul(val);
+ if (!cf.read("global", "client_groups", val))
+ g_conf.client_groups = std::stoul(val);
+ if (!cf.read("global", "server_random_selection", val))
+ g_conf.server_random_selection = stobool(val);
+ if (!cf.read("global", "server_soft_limit", val))
+ g_conf.server_soft_limit = stobool(val);
+ if (!cf.read("global", "anticipation_timeout", val))
+ g_conf.anticipation_timeout = stod(val);
+
+ for (unsigned i = 0; i < g_conf.server_groups; i++) {
+ srv_group_t st;
+ std::string section = "server." + std::to_string(i);
+ if (!cf.read(section, "server_count", val))
+ st.server_count = std::stoul(val);
+ if (!cf.read(section, "server_iops", val))
+ st.server_iops = std::stoul(val);
+ if (!cf.read(section, "server_threads", val))
+ st.server_threads = std::stoul(val);
+ g_conf.srv_group.push_back(st);
+ }
+
+ for (unsigned i = 0; i < g_conf.client_groups; i++) {
+ cli_group_t ct;
+ std::string section = "client." + std::to_string(i);
+ if (!cf.read(section, "client_count", val))
+ ct.client_count = std::stoul(val);
+ if (!cf.read(section, "client_wait", val))
+ ct.client_wait = std::chrono::seconds(std::stoul(val));
+ if (!cf.read(section, "client_total_ops", val))
+ ct.client_total_ops = std::stoul(val);
+ if (!cf.read(section, "client_server_select_range", val))
+ ct.client_server_select_range = std::stoul(val);
+ if (!cf.read(section, "client_iops_goal", val))
+ ct.client_iops_goal = std::stoul(val);
+ if (!cf.read(section, "client_outstanding_ops", val))
+ ct.client_outstanding_ops = std::stoul(val);
+ if (!cf.read(section, "client_reservation", val))
+ ct.client_reservation = std::stod(val);
+ if (!cf.read(section, "client_limit", val))
+ ct.client_limit = std::stod(val);
+ if (!cf.read(section, "client_weight", val))
+ ct.client_weight = std::stod(val);
+ if (!cf.read(section, "client_req_cost", val))
+ ct.client_req_cost = std::stoul(val);
+ g_conf.cli_group.push_back(ct);
+ }
+
+ return 0;
+}
diff --git a/src/dmclock/sim/src/config.h b/src/dmclock/sim/src/config.h
new file mode 100644
index 000000000..d61ca45a8
--- /dev/null
+++ b/src/dmclock/sim/src/config.h
@@ -0,0 +1,158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <string.h>
+
+#include <chrono>
+#include <vector>
+#include <sstream>
+#include <iomanip>
+
+#include "ConfUtils.h"
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ struct cli_group_t {
+ unsigned client_count;
+ std::chrono::seconds client_wait;
+ unsigned client_total_ops;
+ unsigned client_server_select_range;
+ unsigned client_iops_goal;
+ unsigned client_outstanding_ops;
+ double client_reservation;
+ double client_limit;
+ double client_weight;
+ Cost client_req_cost;
+
+ cli_group_t(unsigned _client_count = 100,
+ unsigned _client_wait = 0,
+ unsigned _client_total_ops = 1000,
+ unsigned _client_server_select_range = 10,
+ unsigned _client_iops_goal = 50,
+ unsigned _client_outstanding_ops = 100,
+ double _client_reservation = 20.0,
+ double _client_limit = 60.0,
+ double _client_weight = 1.0,
+ Cost _client_req_cost = 1u) :
+ client_count(_client_count),
+ client_wait(std::chrono::seconds(_client_wait)),
+ client_total_ops(_client_total_ops),
+ client_server_select_range(_client_server_select_range),
+ client_iops_goal(_client_iops_goal),
+ client_outstanding_ops(_client_outstanding_ops),
+ client_reservation(_client_reservation),
+ client_limit(_client_limit),
+ client_weight(_client_weight),
+ client_req_cost(_client_req_cost)
+ {
+ // empty
+ }
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const cli_group_t& cli_group) {
+ out <<
+ "client_count = " << cli_group.client_count << "\n" <<
+ "client_wait = " << cli_group.client_wait.count() << "\n" <<
+ "client_total_ops = " << cli_group.client_total_ops << "\n" <<
+ "client_server_select_range = " << cli_group.client_server_select_range << "\n" <<
+ "client_iops_goal = " << cli_group.client_iops_goal << "\n" <<
+ "client_outstanding_ops = " << cli_group.client_outstanding_ops << "\n" <<
+ std::fixed << std::setprecision(1) <<
+ "client_reservation = " << cli_group.client_reservation << "\n" <<
+ "client_limit = " << cli_group.client_limit << "\n" <<
+ "client_weight = " << cli_group.client_weight << "\n" <<
+ "client_req_cost = " << cli_group.client_req_cost;
+ return out;
+ }
+ }; // class cli_group_t
+
+
+ struct srv_group_t {
+ unsigned server_count;
+ unsigned server_iops;
+ unsigned server_threads;
+
+ srv_group_t(unsigned _server_count = 100,
+ unsigned _server_iops = 40,
+ unsigned _server_threads = 1) :
+ server_count(_server_count),
+ server_iops(_server_iops),
+ server_threads(_server_threads)
+ {
+ // empty
+ }
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const srv_group_t& srv_group) {
+ out <<
+ "server_count = " << srv_group.server_count << "\n" <<
+ "server_iops = " << srv_group.server_iops << "\n" <<
+ "server_threads = " << srv_group.server_threads;
+ return out;
+ }
+ }; // class srv_group_t
+
+
+ struct sim_config_t {
+ unsigned server_groups;
+ unsigned client_groups;
+ bool server_random_selection;
+ bool server_soft_limit;
+ double anticipation_timeout;
+
+ std::vector<cli_group_t> cli_group;
+ std::vector<srv_group_t> srv_group;
+
+ sim_config_t(unsigned _server_groups = 1,
+ unsigned _client_groups = 1,
+ bool _server_random_selection = false,
+ bool _server_soft_limit = true,
+ double _anticipation_timeout = 0.0) :
+ server_groups(_server_groups),
+ client_groups(_client_groups),
+ server_random_selection(_server_random_selection),
+ server_soft_limit(_server_soft_limit),
+ anticipation_timeout(_anticipation_timeout)
+ {
+ srv_group.reserve(server_groups);
+ cli_group.reserve(client_groups);
+ }
+
+ friend std::ostream& operator<<(std::ostream& out,
+ const sim_config_t& sim_config) {
+ out <<
+ "server_groups = " << sim_config.server_groups << "\n" <<
+ "client_groups = " << sim_config.client_groups << "\n" <<
+ "server_random_selection = " << sim_config.server_random_selection << "\n" <<
+ "server_soft_limit = " << sim_config.server_soft_limit << "\n" <<
+ std::fixed << std::setprecision(3) <<
+ "anticipation_timeout = " << sim_config.anticipation_timeout;
+ return out;
+ }
+ }; // class sim_config_t
+
+
+ bool ceph_argparse_witharg(std::vector<const char*> &args,
+ std::vector<const char*>::iterator &i, std::string *ret, ...);
+ void ceph_argparse_early_args(std::vector<const char*>& args, std::string *conf_file_list);
+ int parse_config_file(const std::string &fname, sim_config_t &g_conf);
+
+ }; // namespace qos_simulation
+}; // namespace crimson
diff --git a/src/dmclock/sim/src/sim_client.h b/src/dmclock/sim/src/sim_client.h
new file mode 100644
index 000000000..182cdb803
--- /dev/null
+++ b/src/dmclock/sim/src/sim_client.h
@@ -0,0 +1,340 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <chrono>
+#include <vector>
+#include <deque>
+#include <iostream>
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ struct req_op_t {};
+ struct wait_op_t {};
+ constexpr struct req_op_t req_op {};
+ constexpr struct wait_op_t wait_op {};
+
+
+ enum class CliOp { req, wait };
+ struct CliInst {
+ CliOp op;
+ union {
+ std::chrono::milliseconds wait_time;
+ struct {
+ uint32_t count;
+ std::chrono::microseconds time_bw_reqs;
+ uint16_t max_outstanding;
+ } req_params;
+ } args;
+
+ // D is a duration type
+ template<typename D>
+ CliInst(wait_op_t, D duration) :
+ op(CliOp::wait)
+ {
+ args.wait_time =
+ std::chrono::duration_cast<std::chrono::milliseconds>(duration);
+ }
+
+ CliInst(req_op_t,
+ uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
+ op(CliOp::req)
+ {
+ args.req_params.count = count;
+ args.req_params.max_outstanding = max_outstanding;
+ uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
+ args.req_params.time_bw_reqs = std::chrono::microseconds(us);
+ }
+ };
+
+
+ using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
+
+
+ template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
+ class SimulatedClient {
+ public:
+
+ struct InternalStats {
+ std::mutex mtx;
+ std::chrono::nanoseconds track_resp_time;
+ std::chrono::nanoseconds get_req_params_time;
+ uint32_t track_resp_count;
+ uint32_t get_req_params_count;
+
+ InternalStats() :
+ track_resp_time(0),
+ get_req_params_time(0),
+ track_resp_count(0),
+ get_req_params_count(0)
+ {
+ // empty
+ }
+ };
+
+ using SubmitFunc =
+ std::function<void(const ServerId&,
+ TestRequest&&,
+ const ClientId&,
+ const ReqPm&)>;
+
+ using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
+
+ typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+
+ static TimePoint now() { return std::chrono::steady_clock::now(); }
+
+ protected:
+
+ struct RespQueueItem {
+ TestResponse response;
+ ServerId server_id;
+ RespPm resp_params;
+ Cost request_cost;
+ };
+
+ const ClientId id;
+ const SubmitFunc submit_f;
+ const ServerSelectFunc server_select_f;
+ const ClientAccumFunc accum_f;
+
+ std::vector<CliInst> instructions;
+
+ SvcTrk service_tracker;
+
+ // TODO: use lock rather than atomic???
+ std::atomic_ulong outstanding_ops;
+ std::atomic_bool requests_complete;
+
+ std::deque<RespQueueItem> resp_queue;
+
+ std::mutex mtx_req;
+ std::condition_variable cv_req;
+
+ std::mutex mtx_resp;
+ std::condition_variable cv_resp;
+
+ using RespGuard = std::lock_guard<decltype(mtx_resp)>;
+ using Lock = std::unique_lock<std::mutex>;
+
+ // data collection
+
+ std::vector<TimePoint> op_times;
+ Accum accumulator;
+ InternalStats internal_stats;
+
+ std::thread thd_req;
+ std::thread thd_resp;
+
+ public:
+
+ SimulatedClient(ClientId _id,
+ const SubmitFunc& _submit_f,
+ const ServerSelectFunc& _server_select_f,
+ const ClientAccumFunc& _accum_f,
+ const std::vector<CliInst>& _instrs) :
+ id(_id),
+ submit_f(_submit_f),
+ server_select_f(_server_select_f),
+ accum_f(_accum_f),
+ instructions(_instrs),
+ service_tracker(),
+ outstanding_ops(0),
+ requests_complete(false)
+ {
+ size_t op_count = 0;
+ for (auto i : instructions) {
+ if (CliOp::req == i.op) {
+ op_count += i.args.req_params.count;
+ }
+ }
+ op_times.reserve(op_count);
+
+ thd_resp = std::thread(&SimulatedClient::run_resp, this);
+ thd_req = std::thread(&SimulatedClient::run_req, this);
+ }
+
+
+ SimulatedClient(ClientId _id,
+ const SubmitFunc& _submit_f,
+ const ServerSelectFunc& _server_select_f,
+ const ClientAccumFunc& _accum_f,
+ uint16_t _ops_to_run,
+ double _iops_goal,
+ uint16_t _outstanding_ops_allowed) :
+ SimulatedClient(_id,
+ _submit_f, _server_select_f, _accum_f,
+ {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
+ {
+ // empty
+ }
+
+
+ SimulatedClient(const SimulatedClient&) = delete;
+ SimulatedClient(SimulatedClient&&) = delete;
+ SimulatedClient& operator=(const SimulatedClient&) = delete;
+ SimulatedClient& operator=(SimulatedClient&&) = delete;
+
+ virtual ~SimulatedClient() {
+ wait_until_done();
+ }
+
+ void receive_response(const TestResponse& resp,
+ const ServerId& server_id,
+ const RespPm& resp_params,
+ const Cost request_cost) {
+ RespGuard g(mtx_resp);
+ resp_queue.push_back(
+ RespQueueItem{ resp, server_id, resp_params, request_cost });
+ cv_resp.notify_one();
+ }
+
+ const std::vector<TimePoint>& get_op_times() const { return op_times; }
+
+ void wait_until_done() {
+ if (thd_req.joinable()) thd_req.join();
+ if (thd_resp.joinable()) thd_resp.join();
+ }
+
+ const Accum& get_accumulator() const { return accumulator; }
+
+ const InternalStats& get_internal_stats() const { return internal_stats; }
+
+ protected:
+
+ void run_req() {
+ size_t ops_count = 0;
+ for (auto i : instructions) {
+ if (CliOp::wait == i.op) {
+ std::this_thread::sleep_for(i.args.wait_time);
+ } else if (CliOp::req == i.op) {
+ Lock l(mtx_req);
+ for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
+ while (outstanding_ops >= i.args.req_params.max_outstanding) {
+ cv_req.wait(l);
+ }
+
+ l.unlock();
+ auto now = std::chrono::steady_clock::now();
+ const ServerId& server = server_select_f(o);
+
+ ReqPm rp =
+ time_stats_w_return<decltype(internal_stats.get_req_params_time),
+ ReqPm>(internal_stats.mtx,
+ internal_stats.get_req_params_time,
+ [&]() -> ReqPm {
+ return service_tracker.get_req_params(server);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.get_req_params_count);
+
+ submit_f(server,
+ TestRequest{server, static_cast<uint32_t>(o), 12},
+ id, rp);
+ ++outstanding_ops;
+ l.lock(); // lock for return to top of loop
+
+ auto delay_time = now + i.args.req_params.time_bw_reqs;
+ while (std::chrono::steady_clock::now() < delay_time) {
+ cv_req.wait_until(l, delay_time);
+ } // while
+ } // for
+ ops_count += i.args.req_params.count;
+ } else {
+ assert(false);
+ }
+ } // for loop
+
+ requests_complete = true;
+
+ // all requests made, thread ends
+ }
+
+
+ void run_resp() {
+ std::chrono::milliseconds delay(1000);
+ int op = 0;
+
+ Lock l(mtx_resp);
+
+ // since the following code would otherwise be repeated (except for
+ // the call to notify_one) in the two loops below; let's avoid
+ // repetition and define it once.
+ const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
+ if (!resp_queue.empty()) {
+ RespQueueItem item = resp_queue.front();
+ resp_queue.pop_front();
+
+ l.unlock();
+
+ // data collection
+
+ op_times.push_back(now());
+ accum_f(accumulator, item.resp_params);
+
+ // processing
+
+#if 0 // not needed
+ TestResponse& resp = item.response;
+#endif
+
+ time_stats(internal_stats.mtx,
+ internal_stats.track_resp_time,
+ [&](){
+ service_tracker.track_resp(item.server_id, item.resp_params, item.request_cost);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.track_resp_count);
+
+ --outstanding_ops;
+ if (notify_req_cv) {
+ cv_req.notify_one();
+ }
+
+ l.lock();
+ }
+ };
+
+ while(!requests_complete.load()) {
+ while(resp_queue.empty() && !requests_complete.load()) {
+ cv_resp.wait_for(l, delay);
+ }
+ proc_resp(true);
+ }
+
+ while(outstanding_ops.load() > 0) {
+ while(resp_queue.empty() && outstanding_ops.load() > 0) {
+ cv_resp.wait_for(l, delay);
+ }
+ proc_resp(false); // don't call notify_one as all requests are complete
+ }
+
+ // all responses received, thread ends
+ }
+ }; // class SimulatedClient
+
+
+ }; // namespace qos_simulation
+}; // namespace crimson
diff --git a/src/dmclock/sim/src/sim_recs.h b/src/dmclock/sim/src/sim_recs.h
new file mode 100644
index 000000000..010630072
--- /dev/null
+++ b/src/dmclock/sim/src/sim_recs.h
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <signal.h>
+
+#include <sys/time.h>
+
+#include <cmath>
+#include <limits>
+#include <string>
+#include <mutex>
+#include <iostream>
+#include <functional>
+
+
+using ClientId = unsigned;
+using ServerId = unsigned;
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ using Cost = uint32_t;
+
+ inline void debugger() {
+ raise(SIGCONT);
+ }
+
+ template<typename T>
+ void time_stats(std::mutex& mtx,
+ T& time_accumulate,
+ std::function<void()> code) {
+ auto t1 = std::chrono::steady_clock::now();
+ code();
+ auto t2 = std::chrono::steady_clock::now();
+ auto duration = t2 - t1;
+ auto cast_duration = std::chrono::duration_cast<T>(duration);
+ std::lock_guard<std::mutex> lock(mtx);
+ time_accumulate += cast_duration;
+ }
+
+ // unfortunately it's hard for the compiler to infer the types,
+ // and therefore when called the template params might have to be
+ // explicit
+ template<typename T, typename R>
+ R time_stats_w_return(std::mutex& mtx,
+ T& time_accumulate,
+ std::function<R()> code) {
+ auto t1 = std::chrono::steady_clock::now();
+ R result = code();
+ auto t2 = std::chrono::steady_clock::now();
+ auto duration = t2 - t1;
+ auto cast_duration = std::chrono::duration_cast<T>(duration);
+ std::lock_guard<std::mutex> lock(mtx);
+ time_accumulate += cast_duration;
+ return result;
+ }
+
+ template<typename T>
+ void count_stats(std::mutex& mtx,
+ T& counter) {
+ std::lock_guard<std::mutex> lock(mtx);
+ ++counter;
+ }
+
+ struct TestRequest {
+ ServerId server; // allows debugging
+ uint32_t epoch;
+ uint32_t op;
+
+ TestRequest(ServerId _server,
+ uint32_t _epoch,
+ uint32_t _op) :
+ server(_server),
+ epoch(_epoch),
+ op(_op)
+ {
+ // empty
+ }
+
+ TestRequest(const TestRequest& r) :
+ TestRequest(r.server, r.epoch, r.op)
+ {
+ // empty
+ }
+ }; // struct TestRequest
+
+
+ struct TestResponse {
+ uint32_t epoch;
+
+ explicit TestResponse(uint32_t _epoch) :
+ epoch(_epoch)
+ {
+ // empty
+ }
+
+ TestResponse(const TestResponse& r) :
+ epoch(r.epoch)
+ {
+ // empty
+ }
+
+ friend std::ostream& operator<<(std::ostream& out, const TestResponse& resp) {
+ out << "{ ";
+ out << "epoch:" << resp.epoch;
+ out << " }";
+ return out;
+ }
+ }; // class TestResponse
+
+ }; // namespace qos_simulation
+}; // namespace crimson
diff --git a/src/dmclock/sim/src/sim_server.h b/src/dmclock/sim/src/sim_server.h
new file mode 100644
index 000000000..743c6e79a
--- /dev/null
+++ b/src/dmclock/sim/src/sim_server.h
@@ -0,0 +1,245 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <chrono>
+#include <deque>
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ template<typename Q, typename ReqPm, typename RespPm, typename Accum>
+ class SimulatedServer {
+
+ struct QueueItem {
+ ClientId client;
+ std::unique_ptr<TestRequest> request;
+ RespPm additional;
+ Cost request_cost;
+
+ QueueItem(const ClientId& _client,
+ std::unique_ptr<TestRequest>&& _request,
+ const RespPm& _additional,
+ const Cost _request_cost) :
+ client(_client),
+ request(std::move(_request)),
+ additional(_additional),
+ request_cost(_request_cost)
+ {
+ // empty
+ }
+ }; // QueueItem
+
+ public:
+
+ struct InternalStats {
+ std::mutex mtx;
+ std::chrono::nanoseconds add_request_time;
+ std::chrono::nanoseconds request_complete_time;
+ uint32_t add_request_count;
+ uint32_t request_complete_count;
+
+ InternalStats() :
+ add_request_time(0),
+ request_complete_time(0),
+ add_request_count(0),
+ request_complete_count(0)
+ {
+ // empty
+ }
+ };
+
+ using ClientRespFunc = std::function<void(ClientId,
+ const TestResponse&,
+ const ServerId&,
+ const RespPm&,
+ const Cost)>;
+
+ using ServerAccumFunc = std::function<void(Accum& accumulator,
+ const RespPm& additional)>;
+
+ protected:
+
+ const ServerId id;
+ Q* priority_queue;
+ ClientRespFunc client_resp_f;
+ int iops;
+ size_t thread_pool_size;
+
+ bool finishing;
+ std::chrono::microseconds op_time;
+
+ std::mutex inner_queue_mtx;
+ std::condition_variable inner_queue_cv;
+ std::deque<QueueItem> inner_queue;
+
+ std::thread* threads;
+
+ using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>;
+ using Lock = std::unique_lock<std::mutex>;
+
+ // data collection
+
+ ServerAccumFunc accum_f;
+ Accum accumulator;
+
+ InternalStats internal_stats;
+
+ public:
+
+ using CanHandleRequestFunc = std::function<bool(void)>;
+ using HandleRequestFunc =
+ std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&, uint64_t)>;
+ using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>;
+
+
+ SimulatedServer(ServerId _id,
+ int _iops,
+ size_t _thread_pool_size,
+ const ClientRespFunc& _client_resp_f,
+ const ServerAccumFunc& _accum_f,
+ CreateQueueF _create_queue_f) :
+ id(_id),
+ priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread,
+ this),
+ std::bind(&SimulatedServer::inner_post,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3,
+ std::placeholders::_4))),
+ client_resp_f(_client_resp_f),
+ iops(_iops),
+ thread_pool_size(_thread_pool_size),
+ finishing(false),
+ accum_f(_accum_f)
+ {
+ op_time =
+ std::chrono::microseconds((int) (0.5 +
+ thread_pool_size * 1000000.0 / iops));
+ std::chrono::milliseconds finishing_check_period(1000);
+ threads = new std::thread[thread_pool_size];
+ for (size_t i = 0; i < thread_pool_size; ++i) {
+ threads[i] = std::thread(&SimulatedServer::run, this, finishing_check_period);
+ }
+ }
+
+ virtual ~SimulatedServer() {
+ Lock l(inner_queue_mtx);
+ finishing = true;
+ inner_queue_cv.notify_all();
+ l.unlock();
+
+ for (size_t i = 0; i < thread_pool_size; ++i) {
+ threads[i].join();
+ }
+
+ delete[] threads;
+
+ delete priority_queue;
+ }
+
+ void post(TestRequest&& request,
+ const ClientId& client_id,
+ const ReqPm& req_params,
+ const Cost request_cost)
+ {
+ time_stats(internal_stats.mtx,
+ internal_stats.add_request_time,
+ [&](){
+ priority_queue->add_request(std::move(request),
+ client_id,
+ req_params,
+ request_cost);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.add_request_count);
+ }
+
+ bool has_avail_thread() {
+ InnerQGuard g(inner_queue_mtx);
+ return inner_queue.size() <= thread_pool_size;
+ }
+
+ const Accum& get_accumulator() const { return accumulator; }
+ const Q& get_priority_queue() const { return *priority_queue; }
+ const InternalStats& get_internal_stats() const { return internal_stats; }
+
+ protected:
+
+ void inner_post(const ClientId& client,
+ std::unique_ptr<TestRequest> request,
+ const RespPm& additional,
+ const Cost request_cost) {
+ Lock l(inner_queue_mtx);
+ assert(!finishing);
+ accum_f(accumulator, additional);
+ inner_queue.emplace_back(QueueItem(client,
+ std::move(request),
+ additional,
+ request_cost));
+ inner_queue_cv.notify_one();
+ }
+
+ void run(std::chrono::milliseconds check_period) {
+ Lock l(inner_queue_mtx);
+ while(true) {
+ while(inner_queue.empty() && !finishing) {
+ inner_queue_cv.wait_for(l, check_period);
+ }
+ if (!inner_queue.empty()) {
+ auto& front = inner_queue.front();
+ auto client = front.client;
+ auto req = std::move(front.request);
+ auto additional = front.additional;
+ auto request_cost = front.request_cost;
+ inner_queue.pop_front();
+
+ l.unlock();
+
+ // simulation operation by sleeping; then call function to
+ // notify server of completion
+ std::this_thread::sleep_for(op_time * request_cost);
+
+ // TODO: rather than assuming this constructor exists, perhaps
+ // pass in a function that does this mapping?
+ client_resp_f(client, TestResponse{req->epoch}, id, additional, request_cost);
+
+ time_stats(internal_stats.mtx,
+ internal_stats.request_complete_time,
+ [&](){
+ priority_queue->request_completed();
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.request_complete_count);
+
+ l.lock(); // in prep for next iteration of loop
+ } else {
+ break;
+ }
+ }
+ }
+ }; // class SimulatedServer
+
+ }; // namespace qos_simulation
+}; // namespace crimson
diff --git a/src/dmclock/sim/src/simulate.h b/src/dmclock/sim/src/simulate.h
new file mode 100644
index 000000000..44a09ca31
--- /dev/null
+++ b/src/dmclock/sim/src/simulate.h
@@ -0,0 +1,448 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <assert.h>
+
+#include <memory>
+#include <chrono>
+#include <map>
+#include <random>
+#include <iostream>
+#include <iomanip>
+#include <string>
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ template<typename ServerId, typename ClientId, typename TS, typename TC>
+ class Simulation {
+
+ public:
+
+ using TimePoint = std::chrono::time_point<std::chrono::steady_clock>;
+
+ protected:
+
+ using ClientMap = std::map<ClientId,TC*>;
+ using ServerMap = std::map<ServerId,TS*>;
+
+ unsigned server_count = 0;
+ unsigned client_count = 0;
+
+ ServerMap servers;
+ ClientMap clients;
+ std::vector<ServerId> server_ids;
+
+ TimePoint early_time;
+ TimePoint servers_created_time;
+ TimePoint clients_created_time;
+ TimePoint clients_finished_time;
+ TimePoint late_time;
+
+ std::default_random_engine prng;
+
+ bool has_run = false;
+
+
+ public:
+
+ double fmt_tp(const TimePoint& t) {
+ auto c = t.time_since_epoch().count();
+ return uint64_t(c / 1000000.0 + 0.5) % 100000 / 1000.0;
+ }
+
+ TimePoint now() {
+ return std::chrono::steady_clock::now();
+ }
+
+ using ClientBasedServerSelectFunc =
+ std::function<const ServerId&(uint64_t, uint16_t)>;
+
+ using ClientFilter = std::function<bool(const ClientId&)>;
+
+ using ServerFilter = std::function<bool(const ServerId&)>;
+
+ using ServerDataOutF =
+ std::function<void(std::ostream& out,
+ Simulation* sim, ServerFilter,
+ int header_w, int data_w, int data_prec)>;
+
+ using ClientDataOutF =
+ std::function<void(std::ostream& out,
+ Simulation* sim, ClientFilter,
+ int header_w, int data_w, int data_prec)>;
+
+ Simulation() :
+ early_time(now()),
+ prng(std::chrono::system_clock::now().time_since_epoch().count())
+ {
+ // empty
+ }
+
+ ~Simulation() {
+ for (auto c : clients) {
+ TC* cp = c.second;
+ delete cp;
+ }
+
+ for (auto s : servers) {
+ delete s.second;
+ }
+ }
+
+ unsigned get_client_count() const { return client_count; }
+ unsigned get_server_count() const { return server_count; }
+ TC& get_client(ClientId id) { return *clients[id]; }
+ TS& get_server(ServerId id) { return *servers[id]; }
+ const ServerId& get_server_id(std::size_t index) const {
+ return server_ids[index];
+ }
+
+
+ void add_servers(unsigned count,
+ std::function<TS*(ServerId)> create_server_f) {
+ unsigned i = server_count;
+
+ // increment server_count before creating servers since they
+ // will start running immediately and may use the server_count
+ // value; NB: this could still be an issue if servers are
+ // added with multiple add_servers calls; consider using a
+ // separate start function after all servers (and clients?)
+ // have been added
+ server_count += count;
+
+ for (; i < server_count; ++i) {
+ server_ids.push_back(i);
+ servers[i] = create_server_f(i);
+ }
+
+ servers_created_time = now();
+ }
+
+
+ void add_clients(unsigned count,
+ std::function<TC*(ClientId)> create_client_f) {
+ unsigned i = client_count;
+
+ // increment client_count before creating clients since they
+ // will start running immediately and may use the client_count
+ // value (e.g., in the server selection function); NB: this could
+ // still be an issue if clients are added with multiple
+ // add_clients calls; consider using a separate start function
+ // after all clients have been added
+ client_count += count;
+
+ for (; i < client_count; ++i) {
+ clients[i] = create_client_f(i);
+ }
+
+ clients_created_time = now();
+ }
+
+
+ void run() {
+ assert(server_count > 0);
+ assert(client_count > 0);
+
+ std::cout << "simulation started" << std::endl;
+
+ // clients are now running; wait for all to finish
+
+ for (auto const &i : clients) {
+ i.second->wait_until_done();
+ }
+
+ late_time = clients_finished_time = now();
+
+ std::cout << "simulation completed in " <<
+ std::chrono::duration_cast<std::chrono::milliseconds>(clients_finished_time - servers_created_time).count() <<
+ " millisecs" << std::endl;
+
+ has_run = true;
+ } // run
+
+
+ void display_stats(std::ostream& out,
+ ServerDataOutF server_out_f, ClientDataOutF client_out_f,
+ ServerFilter server_filter =
+ [] (const ServerId&) { return true; },
+ ClientFilter client_filter =
+ [] (const ClientId&) { return true; },
+ int head_w = 12, int data_w = 7, int data_prec = 2) {
+ assert(has_run);
+
+ // skip first 2 secondsd of data
+ const std::chrono::seconds skip_amount(0);
+ // calculate in groups of 5 seconds
+ const std::chrono::seconds measure_unit(2);
+ // unit to output reports in
+ const std::chrono::seconds report_unit(1);
+
+ // compute and display stats
+
+ TimePoint earliest_start = late_time;
+ TimePoint latest_start = early_time;
+ TimePoint earliest_finish = late_time;
+ TimePoint latest_finish = early_time;
+
+ for (auto const &c : clients) {
+ auto start = c.second->get_op_times().front();
+ auto end = c.second->get_op_times().back();
+
+ if (start < earliest_start) { earliest_start = start; }
+ if (start > latest_start) { latest_start = start; }
+ if (end < earliest_finish) { earliest_finish = end; }
+ if (end > latest_finish) { latest_finish = end; }
+ }
+
+ double ops_factor =
+ std::chrono::duration_cast<std::chrono::duration<double>>(measure_unit) /
+ std::chrono::duration_cast<std::chrono::duration<double>>(report_unit);
+
+ const auto start_edge = clients_created_time + skip_amount;
+
+ std::map<ClientId,std::vector<double>> ops_data;
+
+ for (auto const &c : clients) {
+ auto it = c.second->get_op_times().begin();
+ const auto end = c.second->get_op_times().end();
+ while (it != end && *it < start_edge) { ++it; }
+
+ for (auto time_edge = start_edge + measure_unit;
+ time_edge <= latest_finish + measure_unit;
+ time_edge += measure_unit) {
+ int count = 0;
+ for (; it != end && *it < time_edge; ++count, ++it) { /* empty */ }
+ double ops_per_second = double(count) / ops_factor;
+ ops_data[c.first].push_back(ops_per_second);
+ }
+ }
+
+ out << "==== Client Data ====" << std::endl;
+
+ out << std::setw(head_w) << "client:";
+ for (auto const &c : clients) {
+ if (!client_filter(c.first)) continue;
+ out << " " << std::setw(data_w) << c.first;
+ }
+ out << std::setw(data_w) << "total" << std::endl;
+
+ {
+ bool has_data;
+ size_t i = 0;
+ do {
+ std::string line_header = "t_" + std::to_string(i) + ":";
+ out << std::setw(head_w) << line_header;
+ has_data = false;
+ double total = 0.0;
+ for (auto const &c : clients) {
+ double data = 0.0;
+ if (i < ops_data[c.first].size()) {
+ data = ops_data[c.first][i];
+ has_data = true;
+ }
+ total += data;
+
+ if (!client_filter(c.first)) continue;
+
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << data;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total << std::endl;
+ ++i;
+ } while(has_data);
+ }
+
+ client_out_f(out, this, client_filter, head_w, data_w, data_prec);
+
+ display_client_internal_stats<std::chrono::nanoseconds>(out,
+ "nanoseconds");
+
+ out << std::endl << "==== Server Data ====" << std::endl;
+
+ out << std::setw(head_w) << "server:";
+ for (auto const &s : servers) {
+ if (!server_filter(s.first)) continue;
+ out << " " << std::setw(data_w) << s.first;
+ }
+ out << " " << std::setw(data_w) << "total" << std::endl;
+
+ server_out_f(out, this, server_filter, head_w, data_w, data_prec);
+
+ display_server_internal_stats<std::chrono::nanoseconds>(out,
+ "nanoseconds");
+
+ // clean up clients then servers
+
+ for (auto i = clients.begin(); i != clients.end(); ++i) {
+ delete i->second;
+ i->second = nullptr;
+ }
+
+ for (auto i = servers.begin(); i != servers.end(); ++i) {
+ delete i->second;
+ i->second = nullptr;
+ }
+ } // display_stats
+
+
+ template<typename T>
+ void display_server_internal_stats(std::ostream& out,
+ const std::string& time_unit) {
+ T add_request_time(0);
+ T request_complete_time(0);
+ uint32_t add_request_count = 0;
+ uint32_t request_complete_count = 0;
+
+ for (unsigned i = 0; i < get_server_count(); ++i) {
+ const auto& server = get_server(i);
+ const auto& is = server.get_internal_stats();
+ add_request_time +=
+ std::chrono::duration_cast<T>(is.add_request_time);
+ request_complete_time +=
+ std::chrono::duration_cast<T>(is.request_complete_time);
+ add_request_count += is.add_request_count;
+ request_complete_count += is.request_complete_count;
+ }
+
+ double add_request_time_per_unit =
+ double(add_request_time.count()) / add_request_count ;
+ out << "total time to add requests: " <<
+ std::fixed << add_request_time.count() << " " << time_unit <<
+ ";" << std::endl <<
+ " count: " << add_request_count << ";" << std::endl <<
+ " average: " << add_request_time_per_unit <<
+ " " << time_unit << " per request/response" << std::endl;
+
+ double request_complete_time_unit =
+ double(request_complete_time.count()) / request_complete_count ;
+ out << "total time to note requests complete: " << std::fixed <<
+ request_complete_time.count() << " " << time_unit << ";" <<
+ std::endl <<
+ " count: " << request_complete_count << ";" << std::endl <<
+ " average: " << request_complete_time_unit <<
+ " " << time_unit << " per request/response" << std::endl;
+
+ out << std::endl;
+
+ assert(add_request_count == request_complete_count);
+ out << "server timing for QOS algorithm: " <<
+ add_request_time_per_unit + request_complete_time_unit <<
+ " " << time_unit << " per request/response" << std::endl;
+ }
+
+
+ template<typename T>
+ void display_client_internal_stats(std::ostream& out,
+ const std::string& time_unit) {
+ T track_resp_time(0);
+ T get_req_params_time(0);
+ uint32_t track_resp_count = 0;
+ uint32_t get_req_params_count = 0;
+
+ for (unsigned i = 0; i < get_client_count(); ++i) {
+ const auto& client = get_client(i);
+ const auto& is = client.get_internal_stats();
+ track_resp_time +=
+ std::chrono::duration_cast<T>(is.track_resp_time);
+ get_req_params_time +=
+ std::chrono::duration_cast<T>(is.get_req_params_time);
+ track_resp_count += is.track_resp_count;
+ get_req_params_count += is.get_req_params_count;
+ }
+
+ double track_resp_time_unit =
+ double(track_resp_time.count()) / track_resp_count;
+ out << "total time to track responses: " <<
+ std::fixed << track_resp_time.count() << " " << time_unit << ";" <<
+ std::endl <<
+ " count: " << track_resp_count << ";" << std::endl <<
+ " average: " << track_resp_time_unit << " " << time_unit <<
+ " per request/response" << std::endl;
+
+ double get_req_params_time_unit =
+ double(get_req_params_time.count()) / get_req_params_count;
+ out << "total time to get request parameters: " <<
+ std::fixed << get_req_params_time.count() << " " << time_unit <<
+ ";" << std::endl <<
+ " count: " << get_req_params_count << ";" << std::endl <<
+ " average: " << get_req_params_time_unit << " " << time_unit <<
+ " per request/response" << std::endl;
+
+ out << std::endl;
+
+ assert(track_resp_count == get_req_params_count);
+ out << "client timing for QOS algorithm: " <<
+ track_resp_time_unit + get_req_params_time_unit << " " <<
+ time_unit << " per request/response" << std::endl;
+ }
+
+
+ // **** server selection functions ****
+
+
+ const ServerId& server_select_alternate(uint64_t seed,
+ uint16_t client_idx) {
+ size_t index = (client_idx + seed) % server_count;
+ return server_ids[index];
+ }
+
+
+ // returns a lambda using the range specified as servers_per (client)
+ ClientBasedServerSelectFunc
+ make_server_select_alt_range(uint16_t servers_per) {
+ return [servers_per,this](uint64_t seed, uint16_t client_idx)
+ -> const ServerId& {
+ double factor = double(server_count) / client_count;
+ size_t offset = seed % servers_per;
+ size_t index = (size_t(0.5 + client_idx * factor) + offset) % server_count;
+ return server_ids[index];
+ };
+ }
+
+
+ // function to choose a server randomly
+ const ServerId& server_select_random(uint64_t seed, uint16_t client_idx) {
+ size_t index = prng() % server_count;
+ return server_ids[index];
+ }
+
+
+ // function to choose a server randomly
+ ClientBasedServerSelectFunc
+ make_server_select_ran_range(uint16_t servers_per) {
+ return [servers_per,this](uint64_t seed, uint16_t client_idx)
+ -> const ServerId& {
+ double factor = double(server_count) / client_count;
+ size_t offset = prng() % servers_per;
+ size_t index = (size_t(0.5 + client_idx * factor) + offset) % server_count;
+ return server_ids[index];
+ };
+ }
+
+
+ // function to always choose the first server
+ const ServerId& server_select_0(uint64_t seed, uint16_t client_idx) {
+ return server_ids[0];
+ }
+ }; // class Simulation
+
+ }; // namespace qos_simulation
+}; // namespace crimson
diff --git a/src/dmclock/sim/src/ssched/ssched_client.h b/src/dmclock/sim/src/ssched/ssched_client.h
new file mode 100644
index 000000000..6764a09a8
--- /dev/null
+++ b/src/dmclock/sim/src/ssched/ssched_client.h
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+#include "ssched_recs.h"
+
+
+namespace crimson {
+ namespace simple_scheduler {
+
+ // S is server identifier type
+ template<typename S>
+ class ServiceTracker {
+
+ public:
+
+ // we have to start the counters at 1, as 0 is used in the
+ // cleaning process
+ ServiceTracker()
+ {
+ // empty
+ }
+
+ void track_resp(const S& server_id,
+ const NullData& ignore,
+ uint64_t request_cost) {
+ // empty
+ }
+
+ /*
+ * Returns the ReqParams for the given server.
+ */
+ ReqParams get_req_params(const S& server) {
+ return ReqParams();
+ } // get_req_params
+ }; // class ServiceTracker
+ } // namespace simple_scheduler
+} // namespace crimson
diff --git a/src/dmclock/sim/src/ssched/ssched_recs.h b/src/dmclock/sim/src/ssched/ssched_recs.h
new file mode 100644
index 000000000..935e678c1
--- /dev/null
+++ b/src/dmclock/sim/src/ssched/ssched_recs.h
@@ -0,0 +1,44 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+
+#include <ostream>
+#include <assert.h>
+
+
+namespace crimson {
+ namespace simple_scheduler {
+
+ // since we send no additional data out
+ // NOTE: Change name to RespParams? Is it used elsewhere?
+ struct NullData {
+ friend std::ostream& operator<<(std::ostream& out, const NullData& n) {
+ out << "NullData{ EMPTY }";
+ return out;
+ }
+ }; // struct NullData
+
+
+ struct ReqParams {
+ friend std::ostream& operator<<(std::ostream& out, const ReqParams& rp) {
+ out << "ReqParams{ EMPTY }";
+ return out;
+ }
+ };
+
+ }
+}
diff --git a/src/dmclock/sim/src/ssched/ssched_server.h b/src/dmclock/sim/src/ssched/ssched_server.h
new file mode 100644
index 000000000..c4e057a88
--- /dev/null
+++ b/src/dmclock/sim/src/ssched/ssched_server.h
@@ -0,0 +1,194 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <deque>
+#include <functional>
+
+#include "boost/variant.hpp"
+
+#include "ssched_recs.h"
+
+#ifdef PROFILE
+#include "profile.h"
+#endif
+
+namespace crimson {
+
+ namespace simple_scheduler {
+
+ template<typename C, typename R, typename Time>
+ class SimpleQueue {
+
+ public:
+
+ using RequestRef = std::unique_ptr<R>;
+
+ // a function to see whether the server can handle another request
+ using CanHandleRequestFunc = std::function<bool(void)>;
+
+ // a function to submit a request to the server; the second
+ // parameter is a callback when it's completed
+ using HandleRequestFunc =
+ std::function<void(const C&,RequestRef,NullData,uint64_t)>;
+
+ struct PullReq {
+ enum class Type { returning, none };
+
+ struct Retn {
+ C client;
+ RequestRef request;
+ };
+
+ Type type;
+ boost::variant<Retn> data;
+ };
+
+ protected:
+
+ enum class Mechanism { push, pull };
+
+ struct QRequest {
+ C client;
+ RequestRef request;
+ };
+
+ bool finishing = false;
+ Mechanism mechanism;
+
+ CanHandleRequestFunc can_handle_f;
+ HandleRequestFunc handle_f;
+
+ mutable std::mutex queue_mtx;
+ using DataGuard = std::lock_guard<decltype(queue_mtx)>;
+
+ std::deque<QRequest> queue;
+
+#ifdef PROFILE
+ public:
+ ProfileTimer<std::chrono::nanoseconds> pull_request_timer;
+ ProfileTimer<std::chrono::nanoseconds> add_request_timer;
+ ProfileTimer<std::chrono::nanoseconds> request_complete_timer;
+ protected:
+#endif
+
+ public:
+
+ // push full constructor
+ SimpleQueue(CanHandleRequestFunc _can_handle_f,
+ HandleRequestFunc _handle_f) :
+ mechanism(Mechanism::push),
+ can_handle_f(_can_handle_f),
+ handle_f(_handle_f)
+ {
+ // empty
+ }
+
+ SimpleQueue() :
+ mechanism(Mechanism::pull)
+ {
+ // empty
+ }
+
+ ~SimpleQueue() {
+ finishing = true;
+ }
+
+ void add_request(R&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ uint64_t request_cost) {
+ add_request(RequestRef(new R(std::move(request))),
+ client_id, req_params, request_cost);
+ }
+
+ void add_request(RequestRef&& request,
+ const C& client_id,
+ const ReqParams& req_params,
+ uint64_t request_cost) {
+ DataGuard g(queue_mtx);
+
+#ifdef PROFILE
+ add_request_timer.start();
+#endif
+ queue.emplace_back(QRequest{client_id, std::move(request)});
+
+ if (Mechanism::push == mechanism) {
+ schedule_request();
+ }
+
+#ifdef PROFILE
+ add_request_timer.stop();
+#endif
+ } // add_request
+
+ void request_completed() {
+ assert(Mechanism::push == mechanism);
+ DataGuard g(queue_mtx);
+
+#ifdef PROFILE
+ request_complete_timer.start();
+#endif
+ schedule_request();
+
+#ifdef PROFILE
+ request_complete_timer.stop();
+#endif
+ } // request_completed
+
+ PullReq pull_request() {
+ assert(Mechanism::pull == mechanism);
+ PullReq result;
+ DataGuard g(queue_mtx);
+
+#ifdef PROFILE
+ pull_request_timer.start();
+#endif
+
+ if (queue.empty()) {
+ result.type = PullReq::Type::none;
+ } else {
+ auto front = queue.front();
+ result.type = PullReq::Type::returning;
+ result.data =
+ typename PullReq::Retn{front.client, std::move(front.request)};
+ queue.pop();
+ }
+
+#ifdef PROFILE
+ pull_request_timer.stop();
+#endif
+
+ return result;
+ }
+
+ protected:
+
+ // queue_mtx should be held when called; should only be called
+ // when mechanism is push
+ void schedule_request() {
+ if (!queue.empty() && can_handle_f()) {
+ auto& front = queue.front();
+ static NullData null_data;
+ handle_f(front.client, std::move(front.request), null_data, 1u);
+ queue.pop_front();
+ }
+ }
+ };
+ };
+};
diff --git a/src/dmclock/sim/src/str_list.cc b/src/dmclock/sim/src/str_list.cc
new file mode 100644
index 000000000..22109e008
--- /dev/null
+++ b/src/dmclock/sim/src/str_list.cc
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2009-2010 Dreamhost
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "str_list.h"
+
+using std::string;
+using std::vector;
+using std::set;
+using std::list;
+
+static bool get_next_token(const string &s, size_t& pos, const char *delims, string& token)
+{
+ int start = s.find_first_not_of(delims, pos);
+ int end;
+
+ if (start < 0){
+ pos = s.size();
+ return false;
+ }
+
+ end = s.find_first_of(delims, start);
+ if (end >= 0)
+ pos = end + 1;
+ else {
+ pos = end = s.size();
+ }
+
+ token = s.substr(start, end - start);
+ return true;
+}
+
+void get_str_list(const string& str, const char *delims, list<string>& str_list)
+{
+ size_t pos = 0;
+ string token;
+
+ str_list.clear();
+
+ while (pos < str.size()) {
+ if (get_next_token(str, pos, delims, token)) {
+ if (token.size() > 0) {
+ str_list.push_back(token);
+ }
+ }
+ }
+}
+
+void get_str_list(const string& str, list<string>& str_list)
+{
+ const char *delims = ";,= \t";
+ return get_str_list(str, delims, str_list);
+}
+
+void get_str_vec(const string& str, const char *delims, vector<string>& str_vec)
+{
+ size_t pos = 0;
+ string token;
+ str_vec.clear();
+
+ while (pos < str.size()) {
+ if (get_next_token(str, pos, delims, token)) {
+ if (token.size() > 0) {
+ str_vec.push_back(token);
+ }
+ }
+ }
+}
+
+void get_str_vec(const string& str, vector<string>& str_vec)
+{
+ const char *delims = ";,= \t";
+ return get_str_vec(str, delims, str_vec);
+}
+
+void get_str_set(const string& str, const char *delims, set<string>& str_set)
+{
+ size_t pos = 0;
+ string token;
+
+ str_set.clear();
+
+ while (pos < str.size()) {
+ if (get_next_token(str, pos, delims, token)) {
+ if (token.size() > 0) {
+ str_set.insert(token);
+ }
+ }
+ }
+}
+
+void get_str_set(const string& str, set<string>& str_set)
+{
+ const char *delims = ";,= \t";
+ return get_str_set(str, delims, str_set);
+}
diff --git a/src/dmclock/sim/src/str_list.h b/src/dmclock/sim/src/str_list.h
new file mode 100644
index 000000000..4a6dcc57f
--- /dev/null
+++ b/src/dmclock/sim/src/str_list.h
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2009 Red Hat Inc.
+ *
+ * Forked from Red Hat's Ceph project.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#ifndef CEPH_STRLIST_H
+#define CEPH_STRLIST_H
+
+#include <list>
+#include <set>
+#include <sstream>
+#include <string>
+#include <vector>
+
+/**
+ * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**.
+ *
+ * @param [in] str String to split and save as list
+ * @param [out] str_list List modified containing str after it has been split
+**/
+extern void get_str_list(const std::string& str,
+ std::list<std::string>& str_list);
+
+/**
+ * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**.
+ *
+ * @param [in] str String to split and save as list
+ * @param [in] delims characters used to split **str**
+ * @param [out] str_list List modified containing str after it has been split
+**/
+extern void get_str_list(const std::string& str,
+ const char *delims,
+ std::list<std::string>& str_list);
+
+/**
+ * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_vec**.
+ *
+ * @param [in] str String to split and save as Vector
+ * @param [out] str_vec Vector modified containing str after it has been split
+**/
+extern void get_str_vec(const std::string& str,
+ std::vector<std::string>& str_vec);
+
+/**
+ * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_vec**.
+ *
+ * @param [in] str String to split and save as Vector
+ * @param [in] delims characters used to split **str**
+ * @param [out] str_vec Vector modified containing str after it has been split
+**/
+extern void get_str_vec(const std::string& str,
+ const char *delims,
+ std::vector<std::string>& str_vec);
+
+/**
+ * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**.
+ *
+ * @param [in] str String to split and save as Set
+ * @param [out] str_list Set modified containing str after it has been split
+**/
+extern void get_str_set(const std::string& str,
+ std::set<std::string>& str_list);
+
+/**
+ * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**.
+ *
+ * @param [in] str String to split and save as Set
+ * @param [in] delims characters used to split **str**
+ * @param [out] str_list Set modified containing str after it has been split
+**/
+extern void get_str_set(const std::string& str,
+ const char *delims,
+ std::set<std::string>& str_list);
+
+/**
+ * Return a String containing the vector **v** joined with **sep**
+ *
+ * If **v** is empty, the function returns an empty string
+ * For each element in **v**,
+ * it will concatenate this element and **sep** with result
+ *
+ * @param [in] v Vector to join as a String
+ * @param [in] sep String used to join each element from **v**
+ * @return empty string if **v** is empty or concatenated string
+**/
+inline std::string str_join(const std::vector<std::string>& v, const std::string& sep)
+{
+ if (v.empty())
+ return std::string();
+ std::vector<std::string>::const_iterator i = v.begin();
+ std::string r = *i;
+ for (++i; i != v.end(); ++i) {
+ r += sep;
+ r += *i;
+ }
+ return r;
+}
+
+#endif
diff --git a/src/dmclock/sim/src/test_dmclock.cc b/src/dmclock/sim/src/test_dmclock.cc
new file mode 100644
index 000000000..402a47335
--- /dev/null
+++ b/src/dmclock/sim/src/test_dmclock.cc
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+#include <type_traits>
+
+#include "dmclock_recs.h"
+#include "dmclock_server.h"
+#include "dmclock_client.h"
+
+#include "sim_recs.h"
+#include "sim_server.h"
+#include "sim_client.h"
+
+#include "test_dmclock.h"
+
+
+namespace test = crimson::test_dmc;
+
+
+// Note: if this static_assert fails then our two definitions of Cost
+// do not match; change crimson::qos_simulation::Cost to match the
+// definition of crimson::dmclock::Cost.
+static_assert(std::is_same<crimson::qos_simulation::Cost,crimson::dmclock::Cost>::value,
+ "Please make sure the simulator type crimson::qos_simulation::Cost matches the dmclock type crimson::dmclock::Cost.");
+
+
+void test::dmc_server_accumulate_f(test::DmcAccum& a,
+ const test::dmc::PhaseType& phase) {
+ if (test::dmc::PhaseType::reservation == phase) {
+ ++a.reservation_count;
+ } else {
+ ++a.proportion_count;
+ }
+}
+
+
+void test::dmc_client_accumulate_f(test::DmcAccum& a,
+ const test::dmc::PhaseType& phase) {
+ if (test::dmc::PhaseType::reservation == phase) {
+ ++a.reservation_count;
+ } else {
+ ++a.proportion_count;
+ }
+}
diff --git a/src/dmclock/sim/src/test_dmclock.h b/src/dmclock/sim/src/test_dmclock.h
new file mode 100644
index 000000000..1c7a55968
--- /dev/null
+++ b/src/dmclock/sim/src/test_dmclock.h
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include "dmclock_recs.h"
+#include "dmclock_server.h"
+#include "dmclock_client.h"
+
+#include "sim_recs.h"
+#include "sim_server.h"
+#include "sim_client.h"
+
+#include "simulate.h"
+
+
+namespace crimson {
+ namespace test_dmc {
+
+ namespace dmc = crimson::dmclock;
+ namespace sim = crimson::qos_simulation;
+
+ struct DmcAccum {
+ uint64_t reservation_count = 0;
+ uint64_t proportion_count = 0;
+ };
+
+ using DmcQueue = dmc::PushPriorityQueue<ClientId,sim::TestRequest>;
+ using DmcServiceTracker = dmc::ServiceTracker<ServerId,dmc::OrigTracker>;
+
+ using DmcServer = sim::SimulatedServer<DmcQueue,
+ dmc::ReqParams,
+ dmc::PhaseType,
+ DmcAccum>;
+
+ using DmcClient = sim::SimulatedClient<DmcServiceTracker,
+ dmc::ReqParams,
+ dmc::PhaseType,
+ DmcAccum>;
+
+ using CreateQueueF = std::function<DmcQueue*(DmcQueue::CanHandleRequestFunc,
+ DmcQueue::HandleRequestFunc)>;
+
+ using MySim = sim::Simulation<ServerId,ClientId,DmcServer,DmcClient>;
+
+ using SubmitFunc = DmcClient::SubmitFunc;
+
+ extern void dmc_server_accumulate_f(DmcAccum& a,
+ const dmc::PhaseType& phase);
+
+ extern void dmc_client_accumulate_f(DmcAccum& a,
+ const dmc::PhaseType& phase);
+ } // namespace test_dmc
+} // namespace crimson
diff --git a/src/dmclock/sim/src/test_dmclock_main.cc b/src/dmclock/sim/src/test_dmclock_main.cc
new file mode 100644
index 000000000..5cf656608
--- /dev/null
+++ b/src/dmclock/sim/src/test_dmclock_main.cc
@@ -0,0 +1,342 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include "test_dmclock.h"
+#include "config.h"
+
+#ifdef PROFILE
+#include "profile.h"
+#endif
+
+
+namespace dmc = crimson::dmclock;
+namespace test = crimson::test_dmc;
+namespace sim = crimson::qos_simulation;
+
+using namespace std::placeholders;
+
+
+namespace crimson {
+ namespace test_dmc {
+ void server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec);
+
+ void client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec);
+ }
+}
+
+
+int main(int argc, char* argv[]) {
+ std::vector<const char*> args;
+ for (int i = 1; i < argc; ++i) {
+ args.push_back(argv[i]);
+ }
+
+ std::string conf_file_list;
+ sim::ceph_argparse_early_args(args, &conf_file_list);
+
+ sim::sim_config_t g_conf;
+ std::vector<sim::cli_group_t> &cli_group = g_conf.cli_group;
+ std::vector<sim::srv_group_t> &srv_group = g_conf.srv_group;
+
+ if (!conf_file_list.empty()) {
+ int ret;
+ ret = sim::parse_config_file(conf_file_list, g_conf);
+ if (ret) {
+ // error
+ _exit(1);
+ }
+ } else {
+ // default simulation parameter
+ g_conf.client_groups = 2;
+
+ sim::srv_group_t st;
+ srv_group.push_back(st);
+
+ sim::cli_group_t ct1(99, 0);
+ cli_group.push_back(ct1);
+
+ sim::cli_group_t ct2(1, 10);
+ cli_group.push_back(ct2);
+ }
+
+ const unsigned server_groups = g_conf.server_groups;
+ const unsigned client_groups = g_conf.client_groups;
+ const bool server_random_selection = g_conf.server_random_selection;
+ const bool server_soft_limit = g_conf.server_soft_limit;
+ const double anticipation_timeout = g_conf.anticipation_timeout;
+ unsigned server_total_count = 0;
+ unsigned client_total_count = 0;
+
+ for (unsigned i = 0; i < client_groups; ++i) {
+ client_total_count += cli_group[i].client_count;
+ }
+
+ for (unsigned i = 0; i < server_groups; ++i) {
+ server_total_count += srv_group[i].server_count;
+ }
+
+ std::vector<test::dmc::ClientInfo> client_info;
+ for (unsigned i = 0; i < client_groups; ++i) {
+ client_info.push_back(test::dmc::ClientInfo
+ { cli_group[i].client_reservation,
+ cli_group[i].client_weight,
+ cli_group[i].client_limit } );
+ }
+
+ auto ret_client_group_f = [&](const ClientId& c) -> unsigned {
+ unsigned group_max = 0;
+ unsigned i = 0;
+ for (; i < client_groups; ++i) {
+ group_max += cli_group[i].client_count;
+ if (c < group_max) {
+ break;
+ }
+ }
+ return i;
+ };
+
+ auto ret_server_group_f = [&](const ServerId& s) -> unsigned {
+ unsigned group_max = 0;
+ unsigned i = 0;
+ for (; i < server_groups; ++i) {
+ group_max += srv_group[i].server_count;
+ if (s < group_max) {
+ break;
+ }
+ }
+ return i;
+ };
+
+ auto client_info_f =
+ [=](const ClientId& c) -> const test::dmc::ClientInfo* {
+ return &client_info[ret_client_group_f(c)];
+ };
+
+ auto client_disp_filter = [=] (const ClientId& i) -> bool {
+ return i < 3 || i >= (client_total_count - 3);
+ };
+
+ auto server_disp_filter = [=] (const ServerId& i) -> bool {
+ return i < 3 || i >= (server_total_count - 3);
+ };
+
+
+ test::MySim *simulation;
+
+
+ // lambda to post a request to the identified server; called by client
+ test::SubmitFunc server_post_f =
+ [&simulation,
+ &cli_group,
+ &ret_client_group_f](const ServerId& server,
+ sim::TestRequest&& request,
+ const ClientId& client_id,
+ const test::dmc::ReqParams& req_params) {
+ test::DmcServer& s = simulation->get_server(server);
+ sim::Cost request_cost = cli_group[ret_client_group_f(client_id)].client_req_cost;
+ s.post(std::move(request), client_id, req_params, request_cost);
+ };
+
+ std::vector<std::vector<sim::CliInst>> cli_inst;
+ for (unsigned i = 0; i < client_groups; ++i) {
+ if (cli_group[i].client_wait == std::chrono::seconds(0)) {
+ cli_inst.push_back(
+ { { sim::req_op,
+ (uint32_t)cli_group[i].client_total_ops,
+ (double)cli_group[i].client_iops_goal,
+ (uint16_t)cli_group[i].client_outstanding_ops } } );
+ } else {
+ cli_inst.push_back(
+ { { sim::wait_op, cli_group[i].client_wait },
+ { sim::req_op,
+ (uint32_t)cli_group[i].client_total_ops,
+ (double)cli_group[i].client_iops_goal,
+ (uint16_t)cli_group[i].client_outstanding_ops } } );
+ }
+ }
+
+ simulation = new test::MySim();
+
+ test::DmcServer::ClientRespFunc client_response_f =
+ [&simulation](ClientId client_id,
+ const sim::TestResponse& resp,
+ const ServerId& server_id,
+ const dmc::PhaseType& phase,
+ const sim::Cost request_cost) {
+ simulation->get_client(client_id).receive_response(resp,
+ server_id,
+ phase,
+ request_cost);
+ };
+
+ test::CreateQueueF create_queue_f =
+ [&](test::DmcQueue::CanHandleRequestFunc can_f,
+ test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* {
+ return new test::DmcQueue(client_info_f,
+ can_f,
+ handle_f,
+ server_soft_limit ? dmc::AtLimit::Allow : dmc::AtLimit::Wait,
+ anticipation_timeout);
+ };
+
+
+ auto create_server_f = [&](ServerId id) -> test::DmcServer* {
+ unsigned i = ret_server_group_f(id);
+ return new test::DmcServer(id,
+ srv_group[i].server_iops,
+ srv_group[i].server_threads,
+ client_response_f,
+ test::dmc_server_accumulate_f,
+ create_queue_f);
+ };
+
+ auto create_client_f = [&](ClientId id) -> test::DmcClient* {
+ unsigned i = ret_client_group_f(id);
+ test::MySim::ClientBasedServerSelectFunc server_select_f;
+ unsigned client_server_select_range = cli_group[i].client_server_select_range;
+ if (!server_random_selection) {
+ server_select_f = simulation->make_server_select_alt_range(client_server_select_range);
+ } else {
+ server_select_f = simulation->make_server_select_ran_range(client_server_select_range);
+ }
+ return new test::DmcClient(id,
+ server_post_f,
+ std::bind(server_select_f, _1, id),
+ test::dmc_client_accumulate_f,
+ cli_inst[i]);
+ };
+
+#if 1
+ std::cout << "[global]" << std::endl << g_conf << std::endl;
+ for (unsigned i = 0; i < client_groups; ++i) {
+ std::cout << std::endl << "[client." << i << "]" << std::endl;
+ std::cout << cli_group[i] << std::endl;
+ }
+ for (unsigned i = 0; i < server_groups; ++i) {
+ std::cout << std::endl << "[server." << i << "]" << std::endl;
+ std::cout << srv_group[i] << std::endl;
+ }
+ std::cout << std::endl;
+#endif
+
+ simulation->add_servers(server_total_count, create_server_f);
+ simulation->add_clients(client_total_count, create_client_f);
+
+ simulation->run();
+ simulation->display_stats(std::cout,
+ &test::server_data, &test::client_data,
+ server_disp_filter, client_disp_filter);
+
+ delete simulation;
+} // main
+
+
+void test::client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ // report how many ops were done by reservation and proportion for
+ // each client
+
+ int total_r = 0;
+ out << std::setw(head_w) << "res_ops:";
+ for (unsigned i = 0; i < sim->get_client_count(); ++i) {
+ const auto& client = sim->get_client(i);
+ auto r = client.get_accumulator().reservation_count;
+ total_r += r;
+ if (!client_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << r;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_r << std::endl;
+
+ int total_p = 0;
+ out << std::setw(head_w) << "prop_ops:";
+ for (unsigned i = 0; i < sim->get_client_count(); ++i) {
+ const auto& client = sim->get_client(i);
+ auto p = client.get_accumulator().proportion_count;
+ total_p += p;
+ if (!client_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << p;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_p << std::endl;
+}
+
+
+void test::server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ out << std::setw(head_w) << "res_ops:";
+ int total_r = 0;
+ for (unsigned i = 0; i < sim->get_server_count(); ++i) {
+ const auto& server = sim->get_server(i);
+ auto rc = server.get_accumulator().reservation_count;
+ total_r += rc;
+ if (!server_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << rc;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_r << std::endl;
+
+ out << std::setw(head_w) << "prop_ops:";
+ int total_p = 0;
+ for (unsigned i = 0; i < sim->get_server_count(); ++i) {
+ const auto& server = sim->get_server(i);
+ auto pc = server.get_accumulator().proportion_count;
+ total_p += pc;
+ if (!server_disp_filter(i)) continue;
+ out << " " << std::setw(data_w) << pc;
+ }
+ out << " " << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_p << std::endl;
+
+ const auto& q = sim->get_server(0).get_priority_queue();
+ out << std::endl <<
+ " k-way heap: " << q.get_heap_branching_factor() << std::endl
+ << std::endl;
+
+#ifdef PROFILE
+ crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner;
+ crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner;
+ for (unsigned i = 0; i < sim->get_server_count(); ++i) {
+ const auto& q = sim->get_server(i).get_priority_queue();
+ const auto& art = q.add_request_timer;
+ art_combiner.combine(art);
+ const auto& rct = q.request_complete_timer;
+ rct_combiner.combine(rct);
+ }
+ out << "Server add_request_timer: count:" << art_combiner.get_count() <<
+ ", mean:" << art_combiner.get_mean() <<
+ ", std_dev:" << art_combiner.get_std_dev() <<
+ ", low:" << art_combiner.get_low() <<
+ ", high:" << art_combiner.get_high() << std::endl;
+ out << "Server request_complete_timer: count:" << rct_combiner.get_count() <<
+ ", mean:" << rct_combiner.get_mean() <<
+ ", std_dev:" << rct_combiner.get_std_dev() <<
+ ", low:" << rct_combiner.get_low() <<
+ ", high:" << rct_combiner.get_high() << std::endl;
+ out << "Server combined mean: " <<
+ (art_combiner.get_mean() + rct_combiner.get_mean()) <<
+ std::endl;
+#endif
+}
diff --git a/src/dmclock/sim/src/test_ssched.cc b/src/dmclock/sim/src/test_ssched.cc
new file mode 100644
index 000000000..b06273dc0
--- /dev/null
+++ b/src/dmclock/sim/src/test_ssched.cc
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include "ssched_recs.h"
+#include "ssched_server.h"
+#include "ssched_client.h"
+
+#include "sim_recs.h"
+#include "sim_server.h"
+#include "sim_client.h"
+
+#include "test_ssched.h"
+
+
+namespace test = crimson::test_simple_scheduler;
+namespace ssched = crimson::simple_scheduler;
+
+
+void test::simple_server_accumulate_f(test::SimpleAccum& a,
+ const ssched::NullData& add_info) {
+ ++a.request_count;
+}
+
+
+void test::simple_client_accumulate_f(test::SimpleAccum& a,
+ const ssched::NullData& ignore) {
+ // empty
+}
diff --git a/src/dmclock/sim/src/test_ssched.h b/src/dmclock/sim/src/test_ssched.h
new file mode 100644
index 000000000..0d778709a
--- /dev/null
+++ b/src/dmclock/sim/src/test_ssched.h
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include "ssched_server.h"
+#include "ssched_client.h"
+
+#include "sim_recs.h"
+#include "sim_server.h"
+#include "sim_client.h"
+
+#include "simulate.h"
+
+
+namespace crimson {
+ namespace test_simple_scheduler {
+
+ namespace ssched = crimson::simple_scheduler;
+ namespace sim = crimson::qos_simulation;
+
+ using Time = double;
+
+ struct SimpleAccum {
+ uint32_t request_count = 0;
+ };
+
+ using SimpleQueue = ssched::SimpleQueue<ClientId,sim::TestRequest,Time>;
+
+ using SimpleServer = sim::SimulatedServer<SimpleQueue,
+ ssched::ReqParams,
+ ssched::NullData,
+ SimpleAccum>;
+ using SimpleClient = sim::SimulatedClient<ssched::ServiceTracker<ServerId>,
+ ssched::ReqParams,
+ ssched::NullData,
+ SimpleAccum>;
+
+ using CreateQueueF =
+ std::function<SimpleQueue*(SimpleQueue::CanHandleRequestFunc,
+ SimpleQueue::HandleRequestFunc)>;
+
+
+ using MySim = sim::Simulation<ServerId,ClientId,SimpleServer,SimpleClient>;
+
+ using SubmitFunc = SimpleClient::SubmitFunc;
+
+ extern void simple_server_accumulate_f(SimpleAccum& a,
+ const ssched::NullData& add_info);
+
+ extern void simple_client_accumulate_f(SimpleAccum& a,
+ const ssched::NullData& ignore);
+ } // namespace test_simple
+} // namespace crimson
diff --git a/src/dmclock/sim/src/test_ssched_main.cc b/src/dmclock/sim/src/test_ssched_main.cc
new file mode 100644
index 000000000..ace4f8cce
--- /dev/null
+++ b/src/dmclock/sim/src/test_ssched_main.cc
@@ -0,0 +1,199 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * Author: J. Eric Ivancich <ivancich@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License version
+ * 2.1, as published by the Free Software Foundation. See file
+ * COPYING.
+ */
+
+
+#include "test_ssched.h"
+
+
+#ifdef PROFILE
+#include "profile.h"
+#endif
+
+
+namespace test = crimson::test_simple_scheduler;
+namespace ssched = crimson::simple_scheduler;
+namespace sim = crimson::qos_simulation;
+
+using namespace std::placeholders;
+
+
+namespace crimson {
+ namespace test_simple_scheduler {
+ void client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec);
+
+ void server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec);
+ } // namespace test_simple
+} // namespace crimson
+
+
+using Cost = uint32_t;
+
+
+int main(int argc, char* argv[]) {
+ // server params
+
+ const unsigned server_count = 100;
+ const unsigned server_iops = 40;
+ const unsigned server_threads = 1;
+
+ // client params
+
+ const unsigned client_total_ops = 1000;
+ const unsigned client_count = 100;
+ const unsigned client_server_select_range = 10;
+ const unsigned client_wait_count = 1;
+ const unsigned client_iops_goal = 50;
+ const unsigned client_outstanding_ops = 100;
+ const std::chrono::seconds client_wait(10);
+
+ auto client_disp_filter = [=] (const ClientId& i) -> bool {
+ return i < 3 || i >= (client_count - 3);
+ };
+
+ auto server_disp_filter = [=] (const ServerId& i) -> bool {
+ return i < 3 || i >= (server_count - 3);
+ };
+
+
+ test::MySim *simulation;
+
+ // lambda to post a request to the identified server; called by client
+ test::SubmitFunc server_post_f =
+ [&simulation](const ServerId& server_id,
+ sim::TestRequest&& request,
+ const ClientId& client_id,
+ const ssched::ReqParams& req_params) {
+ auto& server = simulation->get_server(server_id);
+ server.post(std::move(request), client_id, req_params, 1u);
+ };
+
+ static std::vector<sim::CliInst> no_wait =
+ { { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } };
+ static std::vector<sim::CliInst> wait =
+ { { sim::wait_op, client_wait },
+ { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } };
+
+ simulation = new test::MySim();
+
+#if 1
+ test::MySim::ClientBasedServerSelectFunc server_select_f =
+ simulation->make_server_select_alt_range(client_server_select_range);
+#elif 0
+ test::MySim::ClientBasedServerSelectFunc server_select_f =
+ std::bind(&test::MySim::server_select_random, simulation, _1, _2);
+#else
+ test::MySim::ClientBasedServerSelectFunc server_select_f =
+ std::bind(&test::MySim::server_select_0, simulation, _1, _2);
+#endif
+
+ test::SimpleServer::ClientRespFunc client_response_f =
+ [&simulation](ClientId client_id,
+ const sim::TestResponse& resp,
+ const ServerId& server_id,
+ const ssched::NullData& resp_params,
+ Cost request_cost) {
+ simulation->get_client(client_id).receive_response(resp,
+ server_id,
+ resp_params,
+ request_cost);
+ };
+
+ test::CreateQueueF create_queue_f =
+ [&](test::SimpleQueue::CanHandleRequestFunc can_f,
+ test::SimpleQueue::HandleRequestFunc handle_f) -> test::SimpleQueue* {
+ return new test::SimpleQueue(can_f, handle_f);
+ };
+
+ auto create_server_f = [&](ServerId id) -> test::SimpleServer* {
+ return new test::SimpleServer(id,
+ server_iops, server_threads,
+ client_response_f,
+ test::simple_server_accumulate_f,
+ create_queue_f);
+ };
+
+ auto create_client_f = [&](ClientId id) -> test::SimpleClient* {
+ return new test::SimpleClient(id,
+ server_post_f,
+ std::bind(server_select_f, _1, id),
+ test::simple_client_accumulate_f,
+ id < (client_count - client_wait_count)
+ ? no_wait : wait);
+ };
+
+ simulation->add_servers(server_count, create_server_f);
+ simulation->add_clients(client_count, create_client_f);
+
+ simulation->run();
+ simulation->display_stats(std::cout,
+ &test::server_data, &test::client_data,
+ server_disp_filter, client_disp_filter);
+} // main
+
+
+void test::client_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ClientFilter client_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ // empty
+}
+
+
+void test::server_data(std::ostream& out,
+ test::MySim* sim,
+ test::MySim::ServerFilter server_disp_filter,
+ int head_w, int data_w, int data_prec) {
+ out << std::setw(head_w) << "requests:";
+ int total_req = 0;
+ for (unsigned i = 0; i < sim->get_server_count(); ++i) {
+ const auto& server = sim->get_server(i);
+ auto req_count = server.get_accumulator().request_count;
+ total_req += req_count;
+ if (!server_disp_filter(i)) continue;
+ out << std::setw(data_w) << req_count;
+ }
+ out << std::setw(data_w) << std::setprecision(data_prec) <<
+ std::fixed << total_req << std::endl;
+
+#ifdef PROFILE
+ crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner;
+ crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner;
+ for (unsigned i = 0; i < sim->get_server_count(); ++i) {
+ const auto& q = sim->get_server(i).get_priority_queue();
+ const auto& art = q.add_request_timer;
+ art_combiner.combine(art);
+ const auto& rct = q.request_complete_timer;
+ rct_combiner.combine(rct);
+ }
+ out << "Server add_request_timer: count:" << art_combiner.get_count() <<
+ ", mean:" << art_combiner.get_mean() <<
+ ", std_dev:" << art_combiner.get_std_dev() <<
+ ", low:" << art_combiner.get_low() <<
+ ", high:" << art_combiner.get_high() << std::endl;
+ out << "Server request_complete_timer: count:" << rct_combiner.get_count() <<
+ ", mean:" << rct_combiner.get_mean() <<
+ ", std_dev:" << rct_combiner.get_std_dev() <<
+ ", low:" << rct_combiner.get_low() <<
+ ", high:" << rct_combiner.get_high() << std::endl;
+ out << "Server combined mean: " <<
+ (art_combiner.get_mean() + rct_combiner.get_mean()) <<
+ std::endl;
+#endif
+}