diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/dmclock/sim | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
23 files changed, 3568 insertions, 0 deletions
diff --git a/src/dmclock/sim/CMakeLists.txt b/src/dmclock/sim/CMakeLists.txt new file mode 100644 index 000000000..ca537ce6f --- /dev/null +++ b/src/dmclock/sim/CMakeLists.txt @@ -0,0 +1,11 @@ +set(K_WAY_HEAP "" CACHE STRING "K_WAY_HEAP") + +if(K_WAY_HEAP) + if(K_WAY_HEAP LESS 2) + message(FATAL_ERROR "K_WAY_HEAP value should be at least 2") + else() + set(CMAKE_CXX_SIM_FLAGS "-DK_WAY_HEAP=${K_WAY_HEAP}") + endif() +endif() + +add_subdirectory(src) diff --git a/src/dmclock/sim/dmc_sim_100th.conf b/src/dmclock/sim/dmc_sim_100th.conf new file mode 100644 index 000000000..17d004354 --- /dev/null +++ b/src/dmclock/sim/dmc_sim_100th.conf @@ -0,0 +1,32 @@ +[global] +server_groups = 1 +client_groups = 2 +server_random_selection = true +server_soft_limit = true + +[client.0] +client_count = 99 +client_wait = 0 +client_total_ops = 1000 +client_server_select_range = 10 +client_iops_goal = 50 +client_outstanding_ops = 100 +client_reservation = 20.0 +client_limit = 60.0 +client_weight = 1.0 + +[client.1] +client_count = 1 +client_wait = 10 +client_total_ops = 1000 +client_server_select_range = 10 +client_iops_goal = 50 +client_outstanding_ops = 100 +client_reservation = 20.0 +client_limit = 60.0 +client_weight = 1.0 + +[server.0] +server_count = 100 +server_iops = 40 +server_threads = 1 diff --git a/src/dmclock/sim/dmc_sim_example.conf b/src/dmclock/sim/dmc_sim_example.conf new file mode 100644 index 000000000..e98b870eb --- /dev/null +++ b/src/dmclock/sim/dmc_sim_example.conf @@ -0,0 +1,56 @@ +[global] +server_groups = 1 +client_groups = 4 +server_random_selection = false +server_soft_limit = false + +[client.0] +client_count = 1 +client_wait = 0 +client_total_ops = 2000 +client_server_select_range = 1 +client_iops_goal = 200 +client_outstanding_ops = 32 +client_reservation = 0.0 +client_limit = 0.0 +client_weight = 1.0 + +[client.1] +client_count = 1 +client_wait = 5 +client_total_ops = 2000 +client_server_select_range = 1 +client_iops_goal = 200 +client_outstanding_ops = 32 +client_reservation = 0.0 +client_limit = 40.0 +client_weight = 1.0 + +[client.2] +client_count = 1 +client_wait = 10 +client_total_ops = 2000 +client_server_select_range = 1 +client_iops_goal = 200 +client_outstanding_ops = 32 +client_reservation = 0.0 +client_limit = 50.0 +client_weight = 2.0 +client_req_cost = 1 + +[client.3] +client_count = 1 +client_wait = 10 +client_total_ops = 2000 +client_server_select_range = 1 +client_iops_goal = 200 +client_outstanding_ops = 32 +client_reservation = 0.0 +client_limit = 50.0 +client_weight = 2.0 +client_req_cost = 3 + +[server.0] +server_count = 1 +server_iops = 160 +server_threads = 1 diff --git a/src/dmclock/sim/src/CMakeLists.txt b/src/dmclock/sim/src/CMakeLists.txt new file mode 100644 index 000000000..dab011ed0 --- /dev/null +++ b/src/dmclock/sim/src/CMakeLists.txt @@ -0,0 +1,38 @@ +set(local_flags "-Wall ${CMAKE_CXX_SIM_FLAGS}") + +set(ssched_sim_srcs test_ssched.cc test_ssched_main.cc) +set(dmc_sim_srcs test_dmclock.cc test_dmclock_main.cc) +set(config_srcs config.cc str_list.cc ConfUtils.cc) + +set_source_files_properties(${ssched_sim_srcs} ${dmc_sim_srcs} ${dmc_srcs} ${config_srcs} + PROPERTIES + COMPILE_FLAGS "${local_flags}" + ) + +if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") + set(warnings_off " -Wno-unused-variable -Wno-unused-function") +elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU") + set(warnings_off " -Wno-unused-but-set-variable -Wno-unused-function") +endif() + +# append warning flags to certain source files +set_property( + SOURCE ${ssched_sim_srcs} ${dmc_sim_srcs} ${config_srcs} + APPEND_STRING + PROPERTY COMPILE_FLAGS "${warnings_off}" + ) + +add_executable(ssched_sim EXCLUDE_FROM_ALL ${ssched_sim_srcs}) +target_include_directories(ssched_sim PRIVATE ssched) # ssched code +add_executable(dmc_sim EXCLUDE_FROM_ALL ${dmc_sim_srcs} ${config_srcs}) + +set_target_properties(ssched_sim dmc_sim + PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ..) + +add_dependencies(dmc_sim dmclock) + +target_link_libraries(ssched_sim LINK_PRIVATE Threads::Threads) +target_link_libraries(dmc_sim LINK_PRIVATE dmclock) + +add_custom_target(dmclock-sims DEPENDS ssched_sim dmc_sim) diff --git a/src/dmclock/sim/src/ConfUtils.cc b/src/dmclock/sim/src/ConfUtils.cc new file mode 100644 index 000000000..a05f7dc42 --- /dev/null +++ b/src/dmclock/sim/src/ConfUtils.cc @@ -0,0 +1,574 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <algorithm> +#include <errno.h> +#include <list> +#include <map> +#include <sstream> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <string> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <iostream> + +#include <assert.h> +#include "ConfUtils.h" + +using std::cerr; +using std::ostringstream; +using std::pair; +using std::string; + +#define MAX_CONFIG_FILE_SZ 0x40000000 + +////////////////////////////// ConfLine ////////////////////////////// +ConfLine:: +ConfLine(const std::string &key_, const std::string &val_, + const std::string &newsection_, const std::string &comment_, int line_no_) + : key(key_), val(val_), newsection(newsection_) +{ + // If you want to implement writable ConfFile support, you'll need to save + // the comment and line_no arguments here. +} + +bool ConfLine:: +operator<(const ConfLine &rhs) const +{ + // We only compare keys. + // If you have more than one line with the same key in a given section, the + // last one wins. + if (key < rhs.key) + return true; + else + return false; +} + +std::ostream &operator<<(std::ostream& oss, const ConfLine &l) +{ + oss << "ConfLine(key = '" << l.key << "', val='" + << l.val << "', newsection='" << l.newsection << "')"; + return oss; +} +///////////////////////// ConfFile ////////////////////////// +ConfFile:: +ConfFile() +{ +} + +ConfFile:: +~ConfFile() +{ +} + +void ConfFile:: +clear() +{ + sections.clear(); +} + +/* We load the whole file into memory and then parse it. Although this is not + * the optimal approach, it does mean that most of this code can be shared with + * the bufferlist loading function. Since bufferlists are always in-memory, the + * load_from_buffer interface works well for them. + * In general, configuration files should be a few kilobytes at maximum, so + * loading the whole configuration into memory shouldn't be a problem. + */ +int ConfFile:: +parse_file(const std::string &fname, std::deque<std::string> *errors, + std::ostream *warnings) +{ + clear(); + + int ret = 0; + size_t sz; + char *buf = NULL; + char buf2[128]; + FILE *fp = fopen(fname.c_str(), "r"); + if (!fp) { + ret = -errno; + return ret; + } + + struct stat st_buf; + if (fstat(fileno(fp), &st_buf)) { + ret = -errno; + ostringstream oss; + oss << "read_conf: failed to fstat '" << fname << "': " << strerror_r(ret, buf2, sizeof(buf2)); + errors->push_back(oss.str()); + goto done; + } + + if (st_buf.st_size > MAX_CONFIG_FILE_SZ) { + ostringstream oss; + oss << "read_conf: config file '" << fname << "' is " << st_buf.st_size + << " bytes, but the maximum is " << MAX_CONFIG_FILE_SZ; + errors->push_back(oss.str()); + ret = -EINVAL; + goto done; + } + + sz = (size_t)st_buf.st_size; + buf = (char*)malloc(sz); + if (!buf) { + ret = -ENOMEM; + goto done; + } + + if (fread(buf, 1, sz, fp) != sz) { + if (ferror(fp)) { + ret = -errno; + ostringstream oss; + oss << "read_conf: fread error while reading '" << fname << "': " + << strerror_r(ret, buf2, sizeof(buf2)); + errors->push_back(oss.str()); + goto done; + } + else { + ostringstream oss; + oss << "read_conf: unexpected EOF while reading '" << fname << "': " + << "possible concurrent modification?"; + errors->push_back(oss.str()); + ret = -EIO; + goto done; + } + } + + load_from_buffer(buf, sz, errors, warnings); + ret = 0; + +done: + free(buf); + fclose(fp); + return ret; +} + +int ConfFile:: +read(const std::string §ion, const std::string &key, std::string &val) const +{ + string k(normalize_key_name(key)); + + const_section_iter_t s = sections.find(section); + if (s == sections.end()) + return -ENOENT; + ConfLine exemplar(k, "", "", "", 0); + ConfSection::const_line_iter_t l = s->second.lines.find(exemplar); + if (l == s->second.lines.end()) + return -ENOENT; + val = l->val; + return 0; +} + +ConfFile::const_section_iter_t ConfFile:: +sections_begin() const +{ + return sections.begin(); +} + +ConfFile::const_section_iter_t ConfFile:: +sections_end() const +{ + return sections.end(); +} + +void ConfFile:: +trim_whitespace(std::string &str, bool strip_internal) +{ + // strip preceding + const char *in = str.c_str(); + while (true) { + char c = *in; + if ((!c) || (!isspace(c))) + break; + ++in; + } + char output[strlen(in) + 1]; + strcpy(output, in); + + // strip trailing + char *o = output + strlen(output); + while (true) { + if (o == output) + break; + --o; + if (!isspace(*o)) { + ++o; + *o = '\0'; + break; + } + } + + if (!strip_internal) { + str.assign(output); + return; + } + + // strip internal + char output2[strlen(output) + 1]; + char *out2 = output2; + bool prev_was_space = false; + for (char *u = output; *u; ++u) { + char c = *u; + if (isspace(c)) { + if (!prev_was_space) + *out2++ = c; + prev_was_space = true; + } + else { + *out2++ = c; + prev_was_space = false; + } + } + *out2++ = '\0'; + str.assign(output2); +} + +/* Normalize a key name. + * + * Normalized key names have no leading or trailing whitespace, and all + * whitespace is stored as underscores. The main reason for selecting this + * normal form is so that in common/config.cc, we can use a macro to stringify + * the field names of md_config_t and get a key in normal form. + */ +std::string ConfFile:: +normalize_key_name(const std::string &key) +{ + string k(key); + ConfFile::trim_whitespace(k, true); + std::replace(k.begin(), k.end(), ' ', '_'); + return k; +} + +std::ostream &operator<<(std::ostream &oss, const ConfFile &cf) +{ + for (ConfFile::const_section_iter_t s = cf.sections_begin(); + s != cf.sections_end(); ++s) { + oss << "[" << s->first << "]\n"; + for (ConfSection::const_line_iter_t l = s->second.lines.begin(); + l != s->second.lines.end(); ++l) { + if (!l->key.empty()) { + oss << "\t" << l->key << " = \"" << l->val << "\"\n"; + } + } + } + return oss; +} + +void ConfFile:: +load_from_buffer(const char *buf, size_t sz, std::deque<std::string> *errors, + std::ostream *warnings) +{ + errors->clear(); + + section_iter_t::value_type vt("global", ConfSection()); + pair < section_iter_t, bool > vr(sections.insert(vt)); + assert(vr.second); + section_iter_t cur_section = vr.first; + std::string acc; + + const char *b = buf; + int line_no = 0; + size_t line_len = -1; + size_t rem = sz; + while (1) { + b += line_len + 1; + rem -= line_len + 1; + if (rem == 0) + break; + line_no++; + + // look for the next newline + const char *end = (const char*)memchr(b, '\n', rem); + if (!end) { + ostringstream oss; + oss << "read_conf: ignoring line " << line_no << " because it doesn't " + << "end with a newline! Please end the config file with a newline."; + errors->push_back(oss.str()); + break; + } + + // find length of line, and search for NULLs + line_len = 0; + bool found_null = false; + for (const char *tmp = b; tmp != end; ++tmp) { + line_len++; + if (*tmp == '\0') { + found_null = true; + } + } + + if (found_null) { + ostringstream oss; + oss << "read_conf: ignoring line " << line_no << " because it has " + << "an embedded null."; + errors->push_back(oss.str()); + acc.clear(); + continue; + } + + if ((line_len >= 1) && (b[line_len-1] == '\\')) { + // A backslash at the end of a line serves as a line continuation marker. + // Combine the next line with this one. + // Remove the backslash itself from the text. + acc.append(b, line_len - 1); + continue; + } + + acc.append(b, line_len); + + //cerr << "acc = '" << acc << "'" << std::endl; + ConfLine *cline = process_line(line_no, acc.c_str(), errors); + acc.clear(); + if (!cline) + continue; + const std::string &csection(cline->newsection); + if (!csection.empty()) { + std::map <std::string, ConfSection>::value_type nt(csection, ConfSection()); + pair < section_iter_t, bool > nr(sections.insert(nt)); + cur_section = nr.first; + } + else { + if (cur_section->second.lines.count(*cline)) { + // replace an existing key/line in this section, so that + // [mysection] + // foo = 1 + // foo = 2 + // will result in foo = 2. + cur_section->second.lines.erase(*cline); + if (cline->key.length() && warnings) + *warnings << "warning: line " << line_no << ": '" << cline->key << "' in section '" + << cur_section->first << "' redefined " << std::endl; + } + // add line to current section + //std::cerr << "cur_section = " << cur_section->first << ", " << *cline << std::endl; + cur_section->second.lines.insert(*cline); + } + delete cline; + } + + if (!acc.empty()) { + ostringstream oss; + oss << "read_conf: don't end with lines that end in backslashes!"; + errors->push_back(oss.str()); + } +} + +/* + * A simple state-machine based parser. + * This probably could/should be rewritten with something like boost::spirit + * or yacc if the grammar ever gets more complex. + */ +ConfLine* ConfFile:: +process_line(int line_no, const char *line, std::deque<std::string> *errors) +{ + enum acceptor_state_t { + ACCEPT_INIT, + ACCEPT_SECTION_NAME, + ACCEPT_KEY, + ACCEPT_VAL_START, + ACCEPT_UNQUOTED_VAL, + ACCEPT_QUOTED_VAL, + ACCEPT_COMMENT_START, + ACCEPT_COMMENT_TEXT, + }; + const char *l = line; + acceptor_state_t state = ACCEPT_INIT; + string key, val, newsection, comment; + bool escaping = false; + while (true) { + char c = *l++; + switch (state) { + case ACCEPT_INIT: + if (c == '\0') + return NULL; // blank line. Not an error, but not interesting either. + else if (c == '[') + state = ACCEPT_SECTION_NAME; + else if ((c == '#') || (c == ';')) + state = ACCEPT_COMMENT_TEXT; + else if (c == ']') { + ostringstream oss; + oss << "unexpected right bracket at char " << (l - line) + << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + else if (isspace(c)) { + // ignore whitespace here + } + else { + // try to accept this character as a key + state = ACCEPT_KEY; + --l; + } + break; + case ACCEPT_SECTION_NAME: + if (c == '\0') { + ostringstream oss; + oss << "error parsing new section name: expected right bracket " + << "at char " << (l - line) << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + else if ((c == ']') && (!escaping)) { + trim_whitespace(newsection, true); + if (newsection.empty()) { + ostringstream oss; + oss << "error parsing new section name: no section name found? " + << "at char " << (l - line) << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + state = ACCEPT_COMMENT_START; + } + else if (((c == '#') || (c == ';')) && (!escaping)) { + ostringstream oss; + oss << "unexpected comment marker while parsing new section name, at " + << "char " << (l - line) << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + else if ((c == '\\') && (!escaping)) { + escaping = true; + } + else { + escaping = false; + newsection += c; + } + break; + case ACCEPT_KEY: + if ((((c == '#') || (c == ';')) && (!escaping)) || (c == '\0')) { + ostringstream oss; + if (c == '\0') { + oss << "end of key=val line " << line_no + << " reached, no \"=val\" found...missing =?"; + } else { + oss << "unexpected character while parsing putative key value, " + << "at char " << (l - line) << ", line " << line_no; + } + errors->push_back(oss.str()); + return NULL; + } + else if ((c == '=') && (!escaping)) { + key = normalize_key_name(key); + if (key.empty()) { + ostringstream oss; + oss << "error parsing key name: no key name found? " + << "at char " << (l - line) << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + state = ACCEPT_VAL_START; + } + else if ((c == '\\') && (!escaping)) { + escaping = true; + } + else { + escaping = false; + key += c; + } + break; + case ACCEPT_VAL_START: + if (c == '\0') + return new ConfLine(key, val, newsection, comment, line_no); + else if ((c == '#') || (c == ';')) + state = ACCEPT_COMMENT_TEXT; + else if (c == '"') + state = ACCEPT_QUOTED_VAL; + else if (isspace(c)) { + // ignore whitespace + } + else { + // try to accept character as a val + state = ACCEPT_UNQUOTED_VAL; + --l; + } + break; + case ACCEPT_UNQUOTED_VAL: + if (c == '\0') { + if (escaping) { + ostringstream oss; + oss << "error parsing value name: unterminated escape sequence " + << "at char " << (l - line) << ", line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + trim_whitespace(val, false); + return new ConfLine(key, val, newsection, comment, line_no); + } + else if (((c == '#') || (c == ';')) && (!escaping)) { + trim_whitespace(val, false); + state = ACCEPT_COMMENT_TEXT; + } + else if ((c == '\\') && (!escaping)) { + escaping = true; + } + else { + escaping = false; + val += c; + } + break; + case ACCEPT_QUOTED_VAL: + if (c == '\0') { + ostringstream oss; + oss << "found opening quote for value, but not the closing quote. " + << "line " << line_no; + errors->push_back(oss.str()); + return NULL; + } + else if ((c == '"') && (!escaping)) { + state = ACCEPT_COMMENT_START; + } + else if ((c == '\\') && (!escaping)) { + escaping = true; + } + else { + escaping = false; + // Add anything, including whitespace. + val += c; + } + break; + case ACCEPT_COMMENT_START: + if (c == '\0') { + return new ConfLine(key, val, newsection, comment, line_no); + } + else if ((c == '#') || (c == ';')) { + state = ACCEPT_COMMENT_TEXT; + } + else if (isspace(c)) { + // ignore whitespace + } + else { + ostringstream oss; + oss << "unexpected character at char " << (l - line) << " of line " + << line_no; + errors->push_back(oss.str()); + return NULL; + } + break; + case ACCEPT_COMMENT_TEXT: + if (c == '\0') + return new ConfLine(key, val, newsection, comment, line_no); + else + comment += c; + break; + default: + assert(0); + break; + } + assert(c != '\0'); // We better not go past the end of the input string. + } +} diff --git a/src/dmclock/sim/src/ConfUtils.h b/src/dmclock/sim/src/ConfUtils.h new file mode 100644 index 000000000..3db1d1e14 --- /dev/null +++ b/src/dmclock/sim/src/ConfUtils.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2011 New Dream Network + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_CONFUTILS_H +#define CEPH_CONFUTILS_H + +#include <deque> +#include <map> +#include <set> +#include <string> + +/* + * Ceph configuration file support. + * + * This class loads an INI-style configuration from a file or bufferlist, and + * holds it in memory. In general, an INI configuration file is composed of + * sections, which contain key/value pairs. You can put comments on the end of + * lines by using either a hash mark (#) or the semicolon (;). + * + * You can get information out of ConfFile by calling get_key or by examining + * individual sections. + * + * This class could be extended to support modifying configuration files and + * writing them back out without too much difficulty. Currently, this is not + * implemented, and the file is read-only. + */ +class ConfLine { +public: + ConfLine(const std::string &key_, const std::string &val_, + const std::string &newsection_, const std::string &comment_, int line_no_); + bool operator<(const ConfLine &rhs) const; + friend std::ostream &operator<<(std::ostream& oss, const ConfLine &l); + + std::string key, val, newsection; +}; + +class ConfSection { +public: + typedef std::set <ConfLine>::const_iterator const_line_iter_t; + + std::set <ConfLine> lines; +}; + +class ConfFile { +public: + typedef std::map <std::string, ConfSection>::iterator section_iter_t; + typedef std::map <std::string, ConfSection>::const_iterator const_section_iter_t; + + ConfFile(); + ~ConfFile(); + void clear(); + int parse_file(const std::string &fname, std::deque<std::string> *errors, std::ostream *warnings); + int read(const std::string §ion, const std::string &key, + std::string &val) const; + + const_section_iter_t sections_begin() const; + const_section_iter_t sections_end() const; + + static void trim_whitespace(std::string &str, bool strip_internal); + static std::string normalize_key_name(const std::string &key); + friend std::ostream &operator<<(std::ostream &oss, const ConfFile &cf); + +private: + void load_from_buffer(const char *buf, size_t sz, + std::deque<std::string> *errors, std::ostream *warnings); + static ConfLine* process_line(int line_no, const char *line, + std::deque<std::string> *errors); + + std::map <std::string, ConfSection> sections; +}; + +#endif diff --git a/src/dmclock/sim/src/config.cc b/src/dmclock/sim/src/config.cc new file mode 100644 index 000000000..dae2903e1 --- /dev/null +++ b/src/dmclock/sim/src/config.cc @@ -0,0 +1,184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include <unistd.h> +#include <string.h> +#include <stdarg.h> + +#include <iostream> +#include <vector> +#include <list> + +#include "config.h" +#include "str_list.h" + + +static void dashes_to_underscores(const char *input, char *output) { + char c = 0; + char *o = output; + const char *i = input; + // first two characters are copied as-is + *o = *i++; + if (*o++ == '\0') + return; + *o = *i++; + if (*o++ == '\0') + return; + for (; ((c = *i)); ++i) { + if (c == '=') { + strcpy(o, i); + return; + } + if (c == '-') + *o++ = '_'; + else + *o++ = c; + } + *o++ = '\0'; +} + +static int va_ceph_argparse_witharg(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, std::string *ret, + std::ostream &oss, va_list ap) { + const char *first = *i; + char tmp[strlen(first)+1]; + dashes_to_underscores(first, tmp); + first = tmp; + + // does this argument match any of the possibilities? + while (1) { + const char *a = va_arg(ap, char*); + if (a == NULL) + return 0; + int strlen_a = strlen(a); + char a2[strlen_a+1]; + dashes_to_underscores(a, a2); + if (strncmp(a2, first, strlen(a2)) == 0) { + if (first[strlen_a] == '=') { + *ret = first + strlen_a + 1; + i = args.erase(i); + return 1; + } + else if (first[strlen_a] == '\0') { + // find second part (or not) + if (i+1 == args.end()) { + oss << "Option " << *i << " requires an argument." << std::endl; + i = args.erase(i); + return -EINVAL; + } + i = args.erase(i); + *ret = *i; + i = args.erase(i); + return 1; + } + } + } +} + +bool crimson::qos_simulation::ceph_argparse_witharg(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, std::string *ret, ...) { + int r; + va_list ap; + va_start(ap, ret); + r = va_ceph_argparse_witharg(args, i, ret, std::cerr, ap); + va_end(ap); + if (r < 0) + _exit(1); + return r != 0; +} + +void crimson::qos_simulation::ceph_argparse_early_args(std::vector<const char*>& args, std::string *conf_file_list) { + std::string val; + + std::vector<const char *> orig_args = args; + + for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_witharg(args, i, &val, "--conf", "-c", (char*)NULL)) { + *conf_file_list = val; + } + else { + // ignore + ++i; + } + } + return; +} + +static bool stobool(const std::string & v) { + return !v.empty () && + (strcasecmp (v.c_str (), "true") == 0 || + atoi (v.c_str ()) != 0); +} + +int crimson::qos_simulation::parse_config_file(const std::string &fname, sim_config_t &g_conf) { + ConfFile cf; + std::deque<std::string> err; + std::ostringstream warn; + int ret = cf.parse_file(fname.c_str(), &err, &warn); + if (ret) { + // error + return ret; + } + + std::string val; + if (!cf.read("global", "server_groups", val)) + g_conf.server_groups = std::stoul(val); + if (!cf.read("global", "client_groups", val)) + g_conf.client_groups = std::stoul(val); + if (!cf.read("global", "server_random_selection", val)) + g_conf.server_random_selection = stobool(val); + if (!cf.read("global", "server_soft_limit", val)) + g_conf.server_soft_limit = stobool(val); + if (!cf.read("global", "anticipation_timeout", val)) + g_conf.anticipation_timeout = stod(val); + + for (unsigned i = 0; i < g_conf.server_groups; i++) { + srv_group_t st; + std::string section = "server." + std::to_string(i); + if (!cf.read(section, "server_count", val)) + st.server_count = std::stoul(val); + if (!cf.read(section, "server_iops", val)) + st.server_iops = std::stoul(val); + if (!cf.read(section, "server_threads", val)) + st.server_threads = std::stoul(val); + g_conf.srv_group.push_back(st); + } + + for (unsigned i = 0; i < g_conf.client_groups; i++) { + cli_group_t ct; + std::string section = "client." + std::to_string(i); + if (!cf.read(section, "client_count", val)) + ct.client_count = std::stoul(val); + if (!cf.read(section, "client_wait", val)) + ct.client_wait = std::chrono::seconds(std::stoul(val)); + if (!cf.read(section, "client_total_ops", val)) + ct.client_total_ops = std::stoul(val); + if (!cf.read(section, "client_server_select_range", val)) + ct.client_server_select_range = std::stoul(val); + if (!cf.read(section, "client_iops_goal", val)) + ct.client_iops_goal = std::stoul(val); + if (!cf.read(section, "client_outstanding_ops", val)) + ct.client_outstanding_ops = std::stoul(val); + if (!cf.read(section, "client_reservation", val)) + ct.client_reservation = std::stod(val); + if (!cf.read(section, "client_limit", val)) + ct.client_limit = std::stod(val); + if (!cf.read(section, "client_weight", val)) + ct.client_weight = std::stod(val); + if (!cf.read(section, "client_req_cost", val)) + ct.client_req_cost = std::stoul(val); + g_conf.cli_group.push_back(ct); + } + + return 0; +} diff --git a/src/dmclock/sim/src/config.h b/src/dmclock/sim/src/config.h new file mode 100644 index 000000000..d61ca45a8 --- /dev/null +++ b/src/dmclock/sim/src/config.h @@ -0,0 +1,158 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <string.h> + +#include <chrono> +#include <vector> +#include <sstream> +#include <iomanip> + +#include "ConfUtils.h" + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + struct cli_group_t { + unsigned client_count; + std::chrono::seconds client_wait; + unsigned client_total_ops; + unsigned client_server_select_range; + unsigned client_iops_goal; + unsigned client_outstanding_ops; + double client_reservation; + double client_limit; + double client_weight; + Cost client_req_cost; + + cli_group_t(unsigned _client_count = 100, + unsigned _client_wait = 0, + unsigned _client_total_ops = 1000, + unsigned _client_server_select_range = 10, + unsigned _client_iops_goal = 50, + unsigned _client_outstanding_ops = 100, + double _client_reservation = 20.0, + double _client_limit = 60.0, + double _client_weight = 1.0, + Cost _client_req_cost = 1u) : + client_count(_client_count), + client_wait(std::chrono::seconds(_client_wait)), + client_total_ops(_client_total_ops), + client_server_select_range(_client_server_select_range), + client_iops_goal(_client_iops_goal), + client_outstanding_ops(_client_outstanding_ops), + client_reservation(_client_reservation), + client_limit(_client_limit), + client_weight(_client_weight), + client_req_cost(_client_req_cost) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, + const cli_group_t& cli_group) { + out << + "client_count = " << cli_group.client_count << "\n" << + "client_wait = " << cli_group.client_wait.count() << "\n" << + "client_total_ops = " << cli_group.client_total_ops << "\n" << + "client_server_select_range = " << cli_group.client_server_select_range << "\n" << + "client_iops_goal = " << cli_group.client_iops_goal << "\n" << + "client_outstanding_ops = " << cli_group.client_outstanding_ops << "\n" << + std::fixed << std::setprecision(1) << + "client_reservation = " << cli_group.client_reservation << "\n" << + "client_limit = " << cli_group.client_limit << "\n" << + "client_weight = " << cli_group.client_weight << "\n" << + "client_req_cost = " << cli_group.client_req_cost; + return out; + } + }; // class cli_group_t + + + struct srv_group_t { + unsigned server_count; + unsigned server_iops; + unsigned server_threads; + + srv_group_t(unsigned _server_count = 100, + unsigned _server_iops = 40, + unsigned _server_threads = 1) : + server_count(_server_count), + server_iops(_server_iops), + server_threads(_server_threads) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, + const srv_group_t& srv_group) { + out << + "server_count = " << srv_group.server_count << "\n" << + "server_iops = " << srv_group.server_iops << "\n" << + "server_threads = " << srv_group.server_threads; + return out; + } + }; // class srv_group_t + + + struct sim_config_t { + unsigned server_groups; + unsigned client_groups; + bool server_random_selection; + bool server_soft_limit; + double anticipation_timeout; + + std::vector<cli_group_t> cli_group; + std::vector<srv_group_t> srv_group; + + sim_config_t(unsigned _server_groups = 1, + unsigned _client_groups = 1, + bool _server_random_selection = false, + bool _server_soft_limit = true, + double _anticipation_timeout = 0.0) : + server_groups(_server_groups), + client_groups(_client_groups), + server_random_selection(_server_random_selection), + server_soft_limit(_server_soft_limit), + anticipation_timeout(_anticipation_timeout) + { + srv_group.reserve(server_groups); + cli_group.reserve(client_groups); + } + + friend std::ostream& operator<<(std::ostream& out, + const sim_config_t& sim_config) { + out << + "server_groups = " << sim_config.server_groups << "\n" << + "client_groups = " << sim_config.client_groups << "\n" << + "server_random_selection = " << sim_config.server_random_selection << "\n" << + "server_soft_limit = " << sim_config.server_soft_limit << "\n" << + std::fixed << std::setprecision(3) << + "anticipation_timeout = " << sim_config.anticipation_timeout; + return out; + } + }; // class sim_config_t + + + bool ceph_argparse_witharg(std::vector<const char*> &args, + std::vector<const char*>::iterator &i, std::string *ret, ...); + void ceph_argparse_early_args(std::vector<const char*>& args, std::string *conf_file_list); + int parse_config_file(const std::string &fname, sim_config_t &g_conf); + + }; // namespace qos_simulation +}; // namespace crimson diff --git a/src/dmclock/sim/src/sim_client.h b/src/dmclock/sim/src/sim_client.h new file mode 100644 index 000000000..182cdb803 --- /dev/null +++ b/src/dmclock/sim/src/sim_client.h @@ -0,0 +1,340 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <atomic> +#include <mutex> +#include <condition_variable> +#include <thread> +#include <chrono> +#include <vector> +#include <deque> +#include <iostream> + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + struct req_op_t {}; + struct wait_op_t {}; + constexpr struct req_op_t req_op {}; + constexpr struct wait_op_t wait_op {}; + + + enum class CliOp { req, wait }; + struct CliInst { + CliOp op; + union { + std::chrono::milliseconds wait_time; + struct { + uint32_t count; + std::chrono::microseconds time_bw_reqs; + uint16_t max_outstanding; + } req_params; + } args; + + // D is a duration type + template<typename D> + CliInst(wait_op_t, D duration) : + op(CliOp::wait) + { + args.wait_time = + std::chrono::duration_cast<std::chrono::milliseconds>(duration); + } + + CliInst(req_op_t, + uint32_t count, double ops_per_sec, uint16_t max_outstanding) : + op(CliOp::req) + { + args.req_params.count = count; + args.req_params.max_outstanding = max_outstanding; + uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); + args.req_params.time_bw_reqs = std::chrono::microseconds(us); + } + }; + + + using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>; + + + template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum> + class SimulatedClient { + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds track_resp_time; + std::chrono::nanoseconds get_req_params_time; + uint32_t track_resp_count; + uint32_t get_req_params_count; + + InternalStats() : + track_resp_time(0), + get_req_params_time(0), + track_resp_count(0), + get_req_params_count(0) + { + // empty + } + }; + + using SubmitFunc = + std::function<void(const ServerId&, + TestRequest&&, + const ClientId&, + const ReqPm&)>; + + using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>; + + typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint; + + static TimePoint now() { return std::chrono::steady_clock::now(); } + + protected: + + struct RespQueueItem { + TestResponse response; + ServerId server_id; + RespPm resp_params; + Cost request_cost; + }; + + const ClientId id; + const SubmitFunc submit_f; + const ServerSelectFunc server_select_f; + const ClientAccumFunc accum_f; + + std::vector<CliInst> instructions; + + SvcTrk service_tracker; + + // TODO: use lock rather than atomic??? + std::atomic_ulong outstanding_ops; + std::atomic_bool requests_complete; + + std::deque<RespQueueItem> resp_queue; + + std::mutex mtx_req; + std::condition_variable cv_req; + + std::mutex mtx_resp; + std::condition_variable cv_resp; + + using RespGuard = std::lock_guard<decltype(mtx_resp)>; + using Lock = std::unique_lock<std::mutex>; + + // data collection + + std::vector<TimePoint> op_times; + Accum accumulator; + InternalStats internal_stats; + + std::thread thd_req; + std::thread thd_resp; + + public: + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + const std::vector<CliInst>& _instrs) : + id(_id), + submit_f(_submit_f), + server_select_f(_server_select_f), + accum_f(_accum_f), + instructions(_instrs), + service_tracker(), + outstanding_ops(0), + requests_complete(false) + { + size_t op_count = 0; + for (auto i : instructions) { + if (CliOp::req == i.op) { + op_count += i.args.req_params.count; + } + } + op_times.reserve(op_count); + + thd_resp = std::thread(&SimulatedClient::run_resp, this); + thd_req = std::thread(&SimulatedClient::run_req, this); + } + + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + uint16_t _ops_to_run, + double _iops_goal, + uint16_t _outstanding_ops_allowed) : + SimulatedClient(_id, + _submit_f, _server_select_f, _accum_f, + {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) + { + // empty + } + + + SimulatedClient(const SimulatedClient&) = delete; + SimulatedClient(SimulatedClient&&) = delete; + SimulatedClient& operator=(const SimulatedClient&) = delete; + SimulatedClient& operator=(SimulatedClient&&) = delete; + + virtual ~SimulatedClient() { + wait_until_done(); + } + + void receive_response(const TestResponse& resp, + const ServerId& server_id, + const RespPm& resp_params, + const Cost request_cost) { + RespGuard g(mtx_resp); + resp_queue.push_back( + RespQueueItem{ resp, server_id, resp_params, request_cost }); + cv_resp.notify_one(); + } + + const std::vector<TimePoint>& get_op_times() const { return op_times; } + + void wait_until_done() { + if (thd_req.joinable()) thd_req.join(); + if (thd_resp.joinable()) thd_resp.join(); + } + + const Accum& get_accumulator() const { return accumulator; } + + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void run_req() { + size_t ops_count = 0; + for (auto i : instructions) { + if (CliOp::wait == i.op) { + std::this_thread::sleep_for(i.args.wait_time); + } else if (CliOp::req == i.op) { + Lock l(mtx_req); + for (uint64_t o = 0; o < i.args.req_params.count; ++o) { + while (outstanding_ops >= i.args.req_params.max_outstanding) { + cv_req.wait(l); + } + + l.unlock(); + auto now = std::chrono::steady_clock::now(); + const ServerId& server = server_select_f(o); + + ReqPm rp = + time_stats_w_return<decltype(internal_stats.get_req_params_time), + ReqPm>(internal_stats.mtx, + internal_stats.get_req_params_time, + [&]() -> ReqPm { + return service_tracker.get_req_params(server); + }); + count_stats(internal_stats.mtx, + internal_stats.get_req_params_count); + + submit_f(server, + TestRequest{server, static_cast<uint32_t>(o), 12}, + id, rp); + ++outstanding_ops; + l.lock(); // lock for return to top of loop + + auto delay_time = now + i.args.req_params.time_bw_reqs; + while (std::chrono::steady_clock::now() < delay_time) { + cv_req.wait_until(l, delay_time); + } // while + } // for + ops_count += i.args.req_params.count; + } else { + assert(false); + } + } // for loop + + requests_complete = true; + + // all requests made, thread ends + } + + + void run_resp() { + std::chrono::milliseconds delay(1000); + int op = 0; + + Lock l(mtx_resp); + + // since the following code would otherwise be repeated (except for + // the call to notify_one) in the two loops below; let's avoid + // repetition and define it once. + const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { + if (!resp_queue.empty()) { + RespQueueItem item = resp_queue.front(); + resp_queue.pop_front(); + + l.unlock(); + + // data collection + + op_times.push_back(now()); + accum_f(accumulator, item.resp_params); + + // processing + +#if 0 // not needed + TestResponse& resp = item.response; +#endif + + time_stats(internal_stats.mtx, + internal_stats.track_resp_time, + [&](){ + service_tracker.track_resp(item.server_id, item.resp_params, item.request_cost); + }); + count_stats(internal_stats.mtx, + internal_stats.track_resp_count); + + --outstanding_ops; + if (notify_req_cv) { + cv_req.notify_one(); + } + + l.lock(); + } + }; + + while(!requests_complete.load()) { + while(resp_queue.empty() && !requests_complete.load()) { + cv_resp.wait_for(l, delay); + } + proc_resp(true); + } + + while(outstanding_ops.load() > 0) { + while(resp_queue.empty() && outstanding_ops.load() > 0) { + cv_resp.wait_for(l, delay); + } + proc_resp(false); // don't call notify_one as all requests are complete + } + + // all responses received, thread ends + } + }; // class SimulatedClient + + + }; // namespace qos_simulation +}; // namespace crimson diff --git a/src/dmclock/sim/src/sim_recs.h b/src/dmclock/sim/src/sim_recs.h new file mode 100644 index 000000000..010630072 --- /dev/null +++ b/src/dmclock/sim/src/sim_recs.h @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <stdint.h> +#include <stdlib.h> +#include <assert.h> +#include <signal.h> + +#include <sys/time.h> + +#include <cmath> +#include <limits> +#include <string> +#include <mutex> +#include <iostream> +#include <functional> + + +using ClientId = unsigned; +using ServerId = unsigned; + + +namespace crimson { + namespace qos_simulation { + + using Cost = uint32_t; + + inline void debugger() { + raise(SIGCONT); + } + + template<typename T> + void time_stats(std::mutex& mtx, + T& time_accumulate, + std::function<void()> code) { + auto t1 = std::chrono::steady_clock::now(); + code(); + auto t2 = std::chrono::steady_clock::now(); + auto duration = t2 - t1; + auto cast_duration = std::chrono::duration_cast<T>(duration); + std::lock_guard<std::mutex> lock(mtx); + time_accumulate += cast_duration; + } + + // unfortunately it's hard for the compiler to infer the types, + // and therefore when called the template params might have to be + // explicit + template<typename T, typename R> + R time_stats_w_return(std::mutex& mtx, + T& time_accumulate, + std::function<R()> code) { + auto t1 = std::chrono::steady_clock::now(); + R result = code(); + auto t2 = std::chrono::steady_clock::now(); + auto duration = t2 - t1; + auto cast_duration = std::chrono::duration_cast<T>(duration); + std::lock_guard<std::mutex> lock(mtx); + time_accumulate += cast_duration; + return result; + } + + template<typename T> + void count_stats(std::mutex& mtx, + T& counter) { + std::lock_guard<std::mutex> lock(mtx); + ++counter; + } + + struct TestRequest { + ServerId server; // allows debugging + uint32_t epoch; + uint32_t op; + + TestRequest(ServerId _server, + uint32_t _epoch, + uint32_t _op) : + server(_server), + epoch(_epoch), + op(_op) + { + // empty + } + + TestRequest(const TestRequest& r) : + TestRequest(r.server, r.epoch, r.op) + { + // empty + } + }; // struct TestRequest + + + struct TestResponse { + uint32_t epoch; + + explicit TestResponse(uint32_t _epoch) : + epoch(_epoch) + { + // empty + } + + TestResponse(const TestResponse& r) : + epoch(r.epoch) + { + // empty + } + + friend std::ostream& operator<<(std::ostream& out, const TestResponse& resp) { + out << "{ "; + out << "epoch:" << resp.epoch; + out << " }"; + return out; + } + }; // class TestResponse + + }; // namespace qos_simulation +}; // namespace crimson diff --git a/src/dmclock/sim/src/sim_server.h b/src/dmclock/sim/src/sim_server.h new file mode 100644 index 000000000..743c6e79a --- /dev/null +++ b/src/dmclock/sim/src/sim_server.h @@ -0,0 +1,245 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <thread> +#include <mutex> +#include <condition_variable> +#include <chrono> +#include <deque> + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + template<typename Q, typename ReqPm, typename RespPm, typename Accum> + class SimulatedServer { + + struct QueueItem { + ClientId client; + std::unique_ptr<TestRequest> request; + RespPm additional; + Cost request_cost; + + QueueItem(const ClientId& _client, + std::unique_ptr<TestRequest>&& _request, + const RespPm& _additional, + const Cost _request_cost) : + client(_client), + request(std::move(_request)), + additional(_additional), + request_cost(_request_cost) + { + // empty + } + }; // QueueItem + + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds add_request_time; + std::chrono::nanoseconds request_complete_time; + uint32_t add_request_count; + uint32_t request_complete_count; + + InternalStats() : + add_request_time(0), + request_complete_time(0), + add_request_count(0), + request_complete_count(0) + { + // empty + } + }; + + using ClientRespFunc = std::function<void(ClientId, + const TestResponse&, + const ServerId&, + const RespPm&, + const Cost)>; + + using ServerAccumFunc = std::function<void(Accum& accumulator, + const RespPm& additional)>; + + protected: + + const ServerId id; + Q* priority_queue; + ClientRespFunc client_resp_f; + int iops; + size_t thread_pool_size; + + bool finishing; + std::chrono::microseconds op_time; + + std::mutex inner_queue_mtx; + std::condition_variable inner_queue_cv; + std::deque<QueueItem> inner_queue; + + std::thread* threads; + + using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>; + using Lock = std::unique_lock<std::mutex>; + + // data collection + + ServerAccumFunc accum_f; + Accum accumulator; + + InternalStats internal_stats; + + public: + + using CanHandleRequestFunc = std::function<bool(void)>; + using HandleRequestFunc = + std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&, uint64_t)>; + using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>; + + + SimulatedServer(ServerId _id, + int _iops, + size_t _thread_pool_size, + const ClientRespFunc& _client_resp_f, + const ServerAccumFunc& _accum_f, + CreateQueueF _create_queue_f) : + id(_id), + priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, + this), + std::bind(&SimulatedServer::inner_post, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3, + std::placeholders::_4))), + client_resp_f(_client_resp_f), + iops(_iops), + thread_pool_size(_thread_pool_size), + finishing(false), + accum_f(_accum_f) + { + op_time = + std::chrono::microseconds((int) (0.5 + + thread_pool_size * 1000000.0 / iops)); + std::chrono::milliseconds finishing_check_period(1000); + threads = new std::thread[thread_pool_size]; + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i] = std::thread(&SimulatedServer::run, this, finishing_check_period); + } + } + + virtual ~SimulatedServer() { + Lock l(inner_queue_mtx); + finishing = true; + inner_queue_cv.notify_all(); + l.unlock(); + + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i].join(); + } + + delete[] threads; + + delete priority_queue; + } + + void post(TestRequest&& request, + const ClientId& client_id, + const ReqPm& req_params, + const Cost request_cost) + { + time_stats(internal_stats.mtx, + internal_stats.add_request_time, + [&](){ + priority_queue->add_request(std::move(request), + client_id, + req_params, + request_cost); + }); + count_stats(internal_stats.mtx, + internal_stats.add_request_count); + } + + bool has_avail_thread() { + InnerQGuard g(inner_queue_mtx); + return inner_queue.size() <= thread_pool_size; + } + + const Accum& get_accumulator() const { return accumulator; } + const Q& get_priority_queue() const { return *priority_queue; } + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void inner_post(const ClientId& client, + std::unique_ptr<TestRequest> request, + const RespPm& additional, + const Cost request_cost) { + Lock l(inner_queue_mtx); + assert(!finishing); + accum_f(accumulator, additional); + inner_queue.emplace_back(QueueItem(client, + std::move(request), + additional, + request_cost)); + inner_queue_cv.notify_one(); + } + + void run(std::chrono::milliseconds check_period) { + Lock l(inner_queue_mtx); + while(true) { + while(inner_queue.empty() && !finishing) { + inner_queue_cv.wait_for(l, check_period); + } + if (!inner_queue.empty()) { + auto& front = inner_queue.front(); + auto client = front.client; + auto req = std::move(front.request); + auto additional = front.additional; + auto request_cost = front.request_cost; + inner_queue.pop_front(); + + l.unlock(); + + // simulation operation by sleeping; then call function to + // notify server of completion + std::this_thread::sleep_for(op_time * request_cost); + + // TODO: rather than assuming this constructor exists, perhaps + // pass in a function that does this mapping? + client_resp_f(client, TestResponse{req->epoch}, id, additional, request_cost); + + time_stats(internal_stats.mtx, + internal_stats.request_complete_time, + [&](){ + priority_queue->request_completed(); + }); + count_stats(internal_stats.mtx, + internal_stats.request_complete_count); + + l.lock(); // in prep for next iteration of loop + } else { + break; + } + } + } + }; // class SimulatedServer + + }; // namespace qos_simulation +}; // namespace crimson diff --git a/src/dmclock/sim/src/simulate.h b/src/dmclock/sim/src/simulate.h new file mode 100644 index 000000000..44a09ca31 --- /dev/null +++ b/src/dmclock/sim/src/simulate.h @@ -0,0 +1,448 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <assert.h> + +#include <memory> +#include <chrono> +#include <map> +#include <random> +#include <iostream> +#include <iomanip> +#include <string> + + +namespace crimson { + namespace qos_simulation { + + template<typename ServerId, typename ClientId, typename TS, typename TC> + class Simulation { + + public: + + using TimePoint = std::chrono::time_point<std::chrono::steady_clock>; + + protected: + + using ClientMap = std::map<ClientId,TC*>; + using ServerMap = std::map<ServerId,TS*>; + + unsigned server_count = 0; + unsigned client_count = 0; + + ServerMap servers; + ClientMap clients; + std::vector<ServerId> server_ids; + + TimePoint early_time; + TimePoint servers_created_time; + TimePoint clients_created_time; + TimePoint clients_finished_time; + TimePoint late_time; + + std::default_random_engine prng; + + bool has_run = false; + + + public: + + double fmt_tp(const TimePoint& t) { + auto c = t.time_since_epoch().count(); + return uint64_t(c / 1000000.0 + 0.5) % 100000 / 1000.0; + } + + TimePoint now() { + return std::chrono::steady_clock::now(); + } + + using ClientBasedServerSelectFunc = + std::function<const ServerId&(uint64_t, uint16_t)>; + + using ClientFilter = std::function<bool(const ClientId&)>; + + using ServerFilter = std::function<bool(const ServerId&)>; + + using ServerDataOutF = + std::function<void(std::ostream& out, + Simulation* sim, ServerFilter, + int header_w, int data_w, int data_prec)>; + + using ClientDataOutF = + std::function<void(std::ostream& out, + Simulation* sim, ClientFilter, + int header_w, int data_w, int data_prec)>; + + Simulation() : + early_time(now()), + prng(std::chrono::system_clock::now().time_since_epoch().count()) + { + // empty + } + + ~Simulation() { + for (auto c : clients) { + TC* cp = c.second; + delete cp; + } + + for (auto s : servers) { + delete s.second; + } + } + + unsigned get_client_count() const { return client_count; } + unsigned get_server_count() const { return server_count; } + TC& get_client(ClientId id) { return *clients[id]; } + TS& get_server(ServerId id) { return *servers[id]; } + const ServerId& get_server_id(std::size_t index) const { + return server_ids[index]; + } + + + void add_servers(unsigned count, + std::function<TS*(ServerId)> create_server_f) { + unsigned i = server_count; + + // increment server_count before creating servers since they + // will start running immediately and may use the server_count + // value; NB: this could still be an issue if servers are + // added with multiple add_servers calls; consider using a + // separate start function after all servers (and clients?) + // have been added + server_count += count; + + for (; i < server_count; ++i) { + server_ids.push_back(i); + servers[i] = create_server_f(i); + } + + servers_created_time = now(); + } + + + void add_clients(unsigned count, + std::function<TC*(ClientId)> create_client_f) { + unsigned i = client_count; + + // increment client_count before creating clients since they + // will start running immediately and may use the client_count + // value (e.g., in the server selection function); NB: this could + // still be an issue if clients are added with multiple + // add_clients calls; consider using a separate start function + // after all clients have been added + client_count += count; + + for (; i < client_count; ++i) { + clients[i] = create_client_f(i); + } + + clients_created_time = now(); + } + + + void run() { + assert(server_count > 0); + assert(client_count > 0); + + std::cout << "simulation started" << std::endl; + + // clients are now running; wait for all to finish + + for (auto const &i : clients) { + i.second->wait_until_done(); + } + + late_time = clients_finished_time = now(); + + std::cout << "simulation completed in " << + std::chrono::duration_cast<std::chrono::milliseconds>(clients_finished_time - servers_created_time).count() << + " millisecs" << std::endl; + + has_run = true; + } // run + + + void display_stats(std::ostream& out, + ServerDataOutF server_out_f, ClientDataOutF client_out_f, + ServerFilter server_filter = + [] (const ServerId&) { return true; }, + ClientFilter client_filter = + [] (const ClientId&) { return true; }, + int head_w = 12, int data_w = 7, int data_prec = 2) { + assert(has_run); + + // skip first 2 secondsd of data + const std::chrono::seconds skip_amount(0); + // calculate in groups of 5 seconds + const std::chrono::seconds measure_unit(2); + // unit to output reports in + const std::chrono::seconds report_unit(1); + + // compute and display stats + + TimePoint earliest_start = late_time; + TimePoint latest_start = early_time; + TimePoint earliest_finish = late_time; + TimePoint latest_finish = early_time; + + for (auto const &c : clients) { + auto start = c.second->get_op_times().front(); + auto end = c.second->get_op_times().back(); + + if (start < earliest_start) { earliest_start = start; } + if (start > latest_start) { latest_start = start; } + if (end < earliest_finish) { earliest_finish = end; } + if (end > latest_finish) { latest_finish = end; } + } + + double ops_factor = + std::chrono::duration_cast<std::chrono::duration<double>>(measure_unit) / + std::chrono::duration_cast<std::chrono::duration<double>>(report_unit); + + const auto start_edge = clients_created_time + skip_amount; + + std::map<ClientId,std::vector<double>> ops_data; + + for (auto const &c : clients) { + auto it = c.second->get_op_times().begin(); + const auto end = c.second->get_op_times().end(); + while (it != end && *it < start_edge) { ++it; } + + for (auto time_edge = start_edge + measure_unit; + time_edge <= latest_finish + measure_unit; + time_edge += measure_unit) { + int count = 0; + for (; it != end && *it < time_edge; ++count, ++it) { /* empty */ } + double ops_per_second = double(count) / ops_factor; + ops_data[c.first].push_back(ops_per_second); + } + } + + out << "==== Client Data ====" << std::endl; + + out << std::setw(head_w) << "client:"; + for (auto const &c : clients) { + if (!client_filter(c.first)) continue; + out << " " << std::setw(data_w) << c.first; + } + out << std::setw(data_w) << "total" << std::endl; + + { + bool has_data; + size_t i = 0; + do { + std::string line_header = "t_" + std::to_string(i) + ":"; + out << std::setw(head_w) << line_header; + has_data = false; + double total = 0.0; + for (auto const &c : clients) { + double data = 0.0; + if (i < ops_data[c.first].size()) { + data = ops_data[c.first][i]; + has_data = true; + } + total += data; + + if (!client_filter(c.first)) continue; + + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << data; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total << std::endl; + ++i; + } while(has_data); + } + + client_out_f(out, this, client_filter, head_w, data_w, data_prec); + + display_client_internal_stats<std::chrono::nanoseconds>(out, + "nanoseconds"); + + out << std::endl << "==== Server Data ====" << std::endl; + + out << std::setw(head_w) << "server:"; + for (auto const &s : servers) { + if (!server_filter(s.first)) continue; + out << " " << std::setw(data_w) << s.first; + } + out << " " << std::setw(data_w) << "total" << std::endl; + + server_out_f(out, this, server_filter, head_w, data_w, data_prec); + + display_server_internal_stats<std::chrono::nanoseconds>(out, + "nanoseconds"); + + // clean up clients then servers + + for (auto i = clients.begin(); i != clients.end(); ++i) { + delete i->second; + i->second = nullptr; + } + + for (auto i = servers.begin(); i != servers.end(); ++i) { + delete i->second; + i->second = nullptr; + } + } // display_stats + + + template<typename T> + void display_server_internal_stats(std::ostream& out, + const std::string& time_unit) { + T add_request_time(0); + T request_complete_time(0); + uint32_t add_request_count = 0; + uint32_t request_complete_count = 0; + + for (unsigned i = 0; i < get_server_count(); ++i) { + const auto& server = get_server(i); + const auto& is = server.get_internal_stats(); + add_request_time += + std::chrono::duration_cast<T>(is.add_request_time); + request_complete_time += + std::chrono::duration_cast<T>(is.request_complete_time); + add_request_count += is.add_request_count; + request_complete_count += is.request_complete_count; + } + + double add_request_time_per_unit = + double(add_request_time.count()) / add_request_count ; + out << "total time to add requests: " << + std::fixed << add_request_time.count() << " " << time_unit << + ";" << std::endl << + " count: " << add_request_count << ";" << std::endl << + " average: " << add_request_time_per_unit << + " " << time_unit << " per request/response" << std::endl; + + double request_complete_time_unit = + double(request_complete_time.count()) / request_complete_count ; + out << "total time to note requests complete: " << std::fixed << + request_complete_time.count() << " " << time_unit << ";" << + std::endl << + " count: " << request_complete_count << ";" << std::endl << + " average: " << request_complete_time_unit << + " " << time_unit << " per request/response" << std::endl; + + out << std::endl; + + assert(add_request_count == request_complete_count); + out << "server timing for QOS algorithm: " << + add_request_time_per_unit + request_complete_time_unit << + " " << time_unit << " per request/response" << std::endl; + } + + + template<typename T> + void display_client_internal_stats(std::ostream& out, + const std::string& time_unit) { + T track_resp_time(0); + T get_req_params_time(0); + uint32_t track_resp_count = 0; + uint32_t get_req_params_count = 0; + + for (unsigned i = 0; i < get_client_count(); ++i) { + const auto& client = get_client(i); + const auto& is = client.get_internal_stats(); + track_resp_time += + std::chrono::duration_cast<T>(is.track_resp_time); + get_req_params_time += + std::chrono::duration_cast<T>(is.get_req_params_time); + track_resp_count += is.track_resp_count; + get_req_params_count += is.get_req_params_count; + } + + double track_resp_time_unit = + double(track_resp_time.count()) / track_resp_count; + out << "total time to track responses: " << + std::fixed << track_resp_time.count() << " " << time_unit << ";" << + std::endl << + " count: " << track_resp_count << ";" << std::endl << + " average: " << track_resp_time_unit << " " << time_unit << + " per request/response" << std::endl; + + double get_req_params_time_unit = + double(get_req_params_time.count()) / get_req_params_count; + out << "total time to get request parameters: " << + std::fixed << get_req_params_time.count() << " " << time_unit << + ";" << std::endl << + " count: " << get_req_params_count << ";" << std::endl << + " average: " << get_req_params_time_unit << " " << time_unit << + " per request/response" << std::endl; + + out << std::endl; + + assert(track_resp_count == get_req_params_count); + out << "client timing for QOS algorithm: " << + track_resp_time_unit + get_req_params_time_unit << " " << + time_unit << " per request/response" << std::endl; + } + + + // **** server selection functions **** + + + const ServerId& server_select_alternate(uint64_t seed, + uint16_t client_idx) { + size_t index = (client_idx + seed) % server_count; + return server_ids[index]; + } + + + // returns a lambda using the range specified as servers_per (client) + ClientBasedServerSelectFunc + make_server_select_alt_range(uint16_t servers_per) { + return [servers_per,this](uint64_t seed, uint16_t client_idx) + -> const ServerId& { + double factor = double(server_count) / client_count; + size_t offset = seed % servers_per; + size_t index = (size_t(0.5 + client_idx * factor) + offset) % server_count; + return server_ids[index]; + }; + } + + + // function to choose a server randomly + const ServerId& server_select_random(uint64_t seed, uint16_t client_idx) { + size_t index = prng() % server_count; + return server_ids[index]; + } + + + // function to choose a server randomly + ClientBasedServerSelectFunc + make_server_select_ran_range(uint16_t servers_per) { + return [servers_per,this](uint64_t seed, uint16_t client_idx) + -> const ServerId& { + double factor = double(server_count) / client_count; + size_t offset = prng() % servers_per; + size_t index = (size_t(0.5 + client_idx * factor) + offset) % server_count; + return server_ids[index]; + }; + } + + + // function to always choose the first server + const ServerId& server_select_0(uint64_t seed, uint16_t client_idx) { + return server_ids[0]; + } + }; // class Simulation + + }; // namespace qos_simulation +}; // namespace crimson diff --git a/src/dmclock/sim/src/ssched/ssched_client.h b/src/dmclock/sim/src/ssched/ssched_client.h new file mode 100644 index 000000000..6764a09a8 --- /dev/null +++ b/src/dmclock/sim/src/ssched/ssched_client.h @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + +#include "ssched_recs.h" + + +namespace crimson { + namespace simple_scheduler { + + // S is server identifier type + template<typename S> + class ServiceTracker { + + public: + + // we have to start the counters at 1, as 0 is used in the + // cleaning process + ServiceTracker() + { + // empty + } + + void track_resp(const S& server_id, + const NullData& ignore, + uint64_t request_cost) { + // empty + } + + /* + * Returns the ReqParams for the given server. + */ + ReqParams get_req_params(const S& server) { + return ReqParams(); + } // get_req_params + }; // class ServiceTracker + } // namespace simple_scheduler +} // namespace crimson diff --git a/src/dmclock/sim/src/ssched/ssched_recs.h b/src/dmclock/sim/src/ssched/ssched_recs.h new file mode 100644 index 000000000..935e678c1 --- /dev/null +++ b/src/dmclock/sim/src/ssched/ssched_recs.h @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + + +#include <ostream> +#include <assert.h> + + +namespace crimson { + namespace simple_scheduler { + + // since we send no additional data out + // NOTE: Change name to RespParams? Is it used elsewhere? + struct NullData { + friend std::ostream& operator<<(std::ostream& out, const NullData& n) { + out << "NullData{ EMPTY }"; + return out; + } + }; // struct NullData + + + struct ReqParams { + friend std::ostream& operator<<(std::ostream& out, const ReqParams& rp) { + out << "ReqParams{ EMPTY }"; + return out; + } + }; + + } +} diff --git a/src/dmclock/sim/src/ssched/ssched_server.h b/src/dmclock/sim/src/ssched/ssched_server.h new file mode 100644 index 000000000..c4e057a88 --- /dev/null +++ b/src/dmclock/sim/src/ssched/ssched_server.h @@ -0,0 +1,194 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#pragma once + +#include <memory> +#include <mutex> +#include <deque> +#include <functional> + +#include "boost/variant.hpp" + +#include "ssched_recs.h" + +#ifdef PROFILE +#include "profile.h" +#endif + +namespace crimson { + + namespace simple_scheduler { + + template<typename C, typename R, typename Time> + class SimpleQueue { + + public: + + using RequestRef = std::unique_ptr<R>; + + // a function to see whether the server can handle another request + using CanHandleRequestFunc = std::function<bool(void)>; + + // a function to submit a request to the server; the second + // parameter is a callback when it's completed + using HandleRequestFunc = + std::function<void(const C&,RequestRef,NullData,uint64_t)>; + + struct PullReq { + enum class Type { returning, none }; + + struct Retn { + C client; + RequestRef request; + }; + + Type type; + boost::variant<Retn> data; + }; + + protected: + + enum class Mechanism { push, pull }; + + struct QRequest { + C client; + RequestRef request; + }; + + bool finishing = false; + Mechanism mechanism; + + CanHandleRequestFunc can_handle_f; + HandleRequestFunc handle_f; + + mutable std::mutex queue_mtx; + using DataGuard = std::lock_guard<decltype(queue_mtx)>; + + std::deque<QRequest> queue; + +#ifdef PROFILE + public: + ProfileTimer<std::chrono::nanoseconds> pull_request_timer; + ProfileTimer<std::chrono::nanoseconds> add_request_timer; + ProfileTimer<std::chrono::nanoseconds> request_complete_timer; + protected: +#endif + + public: + + // push full constructor + SimpleQueue(CanHandleRequestFunc _can_handle_f, + HandleRequestFunc _handle_f) : + mechanism(Mechanism::push), + can_handle_f(_can_handle_f), + handle_f(_handle_f) + { + // empty + } + + SimpleQueue() : + mechanism(Mechanism::pull) + { + // empty + } + + ~SimpleQueue() { + finishing = true; + } + + void add_request(R&& request, + const C& client_id, + const ReqParams& req_params, + uint64_t request_cost) { + add_request(RequestRef(new R(std::move(request))), + client_id, req_params, request_cost); + } + + void add_request(RequestRef&& request, + const C& client_id, + const ReqParams& req_params, + uint64_t request_cost) { + DataGuard g(queue_mtx); + +#ifdef PROFILE + add_request_timer.start(); +#endif + queue.emplace_back(QRequest{client_id, std::move(request)}); + + if (Mechanism::push == mechanism) { + schedule_request(); + } + +#ifdef PROFILE + add_request_timer.stop(); +#endif + } // add_request + + void request_completed() { + assert(Mechanism::push == mechanism); + DataGuard g(queue_mtx); + +#ifdef PROFILE + request_complete_timer.start(); +#endif + schedule_request(); + +#ifdef PROFILE + request_complete_timer.stop(); +#endif + } // request_completed + + PullReq pull_request() { + assert(Mechanism::pull == mechanism); + PullReq result; + DataGuard g(queue_mtx); + +#ifdef PROFILE + pull_request_timer.start(); +#endif + + if (queue.empty()) { + result.type = PullReq::Type::none; + } else { + auto front = queue.front(); + result.type = PullReq::Type::returning; + result.data = + typename PullReq::Retn{front.client, std::move(front.request)}; + queue.pop(); + } + +#ifdef PROFILE + pull_request_timer.stop(); +#endif + + return result; + } + + protected: + + // queue_mtx should be held when called; should only be called + // when mechanism is push + void schedule_request() { + if (!queue.empty() && can_handle_f()) { + auto& front = queue.front(); + static NullData null_data; + handle_f(front.client, std::move(front.request), null_data, 1u); + queue.pop_front(); + } + } + }; + }; +}; diff --git a/src/dmclock/sim/src/str_list.cc b/src/dmclock/sim/src/str_list.cc new file mode 100644 index 000000000..22109e008 --- /dev/null +++ b/src/dmclock/sim/src/str_list.cc @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2009-2010 Dreamhost + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "str_list.h" + +using std::string; +using std::vector; +using std::set; +using std::list; + +static bool get_next_token(const string &s, size_t& pos, const char *delims, string& token) +{ + int start = s.find_first_not_of(delims, pos); + int end; + + if (start < 0){ + pos = s.size(); + return false; + } + + end = s.find_first_of(delims, start); + if (end >= 0) + pos = end + 1; + else { + pos = end = s.size(); + } + + token = s.substr(start, end - start); + return true; +} + +void get_str_list(const string& str, const char *delims, list<string>& str_list) +{ + size_t pos = 0; + string token; + + str_list.clear(); + + while (pos < str.size()) { + if (get_next_token(str, pos, delims, token)) { + if (token.size() > 0) { + str_list.push_back(token); + } + } + } +} + +void get_str_list(const string& str, list<string>& str_list) +{ + const char *delims = ";,= \t"; + return get_str_list(str, delims, str_list); +} + +void get_str_vec(const string& str, const char *delims, vector<string>& str_vec) +{ + size_t pos = 0; + string token; + str_vec.clear(); + + while (pos < str.size()) { + if (get_next_token(str, pos, delims, token)) { + if (token.size() > 0) { + str_vec.push_back(token); + } + } + } +} + +void get_str_vec(const string& str, vector<string>& str_vec) +{ + const char *delims = ";,= \t"; + return get_str_vec(str, delims, str_vec); +} + +void get_str_set(const string& str, const char *delims, set<string>& str_set) +{ + size_t pos = 0; + string token; + + str_set.clear(); + + while (pos < str.size()) { + if (get_next_token(str, pos, delims, token)) { + if (token.size() > 0) { + str_set.insert(token); + } + } + } +} + +void get_str_set(const string& str, set<string>& str_set) +{ + const char *delims = ";,= \t"; + return get_str_set(str, delims, str_set); +} diff --git a/src/dmclock/sim/src/str_list.h b/src/dmclock/sim/src/str_list.h new file mode 100644 index 000000000..4a6dcc57f --- /dev/null +++ b/src/dmclock/sim/src/str_list.h @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2009 Red Hat Inc. + * + * Forked from Red Hat's Ceph project. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#ifndef CEPH_STRLIST_H +#define CEPH_STRLIST_H + +#include <list> +#include <set> +#include <sstream> +#include <string> +#include <vector> + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as list + * @param [out] str_list List modified containing str after it has been split +**/ +extern void get_str_list(const std::string& str, + std::list<std::string>& str_list); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as list + * @param [in] delims characters used to split **str** + * @param [out] str_list List modified containing str after it has been split +**/ +extern void get_str_list(const std::string& str, + const char *delims, + std::list<std::string>& str_list); + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_vec**. + * + * @param [in] str String to split and save as Vector + * @param [out] str_vec Vector modified containing str after it has been split +**/ +extern void get_str_vec(const std::string& str, + std::vector<std::string>& str_vec); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_vec**. + * + * @param [in] str String to split and save as Vector + * @param [in] delims characters used to split **str** + * @param [out] str_vec Vector modified containing str after it has been split +**/ +extern void get_str_vec(const std::string& str, + const char *delims, + std::vector<std::string>& str_vec); + +/** + * Split **str** into a list of strings, using the ";,= \t" delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as Set + * @param [out] str_list Set modified containing str after it has been split +**/ +extern void get_str_set(const std::string& str, + std::set<std::string>& str_list); + +/** + * Split **str** into a list of strings, using the **delims** delimiters and output the result in **str_list**. + * + * @param [in] str String to split and save as Set + * @param [in] delims characters used to split **str** + * @param [out] str_list Set modified containing str after it has been split +**/ +extern void get_str_set(const std::string& str, + const char *delims, + std::set<std::string>& str_list); + +/** + * Return a String containing the vector **v** joined with **sep** + * + * If **v** is empty, the function returns an empty string + * For each element in **v**, + * it will concatenate this element and **sep** with result + * + * @param [in] v Vector to join as a String + * @param [in] sep String used to join each element from **v** + * @return empty string if **v** is empty or concatenated string +**/ +inline std::string str_join(const std::vector<std::string>& v, const std::string& sep) +{ + if (v.empty()) + return std::string(); + std::vector<std::string>::const_iterator i = v.begin(); + std::string r = *i; + for (++i; i != v.end(); ++i) { + r += sep; + r += *i; + } + return r; +} + +#endif diff --git a/src/dmclock/sim/src/test_dmclock.cc b/src/dmclock/sim/src/test_dmclock.cc new file mode 100644 index 000000000..402a47335 --- /dev/null +++ b/src/dmclock/sim/src/test_dmclock.cc @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + +#include <type_traits> + +#include "dmclock_recs.h" +#include "dmclock_server.h" +#include "dmclock_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "test_dmclock.h" + + +namespace test = crimson::test_dmc; + + +// Note: if this static_assert fails then our two definitions of Cost +// do not match; change crimson::qos_simulation::Cost to match the +// definition of crimson::dmclock::Cost. +static_assert(std::is_same<crimson::qos_simulation::Cost,crimson::dmclock::Cost>::value, + "Please make sure the simulator type crimson::qos_simulation::Cost matches the dmclock type crimson::dmclock::Cost."); + + +void test::dmc_server_accumulate_f(test::DmcAccum& a, + const test::dmc::PhaseType& phase) { + if (test::dmc::PhaseType::reservation == phase) { + ++a.reservation_count; + } else { + ++a.proportion_count; + } +} + + +void test::dmc_client_accumulate_f(test::DmcAccum& a, + const test::dmc::PhaseType& phase) { + if (test::dmc::PhaseType::reservation == phase) { + ++a.reservation_count; + } else { + ++a.proportion_count; + } +} diff --git a/src/dmclock/sim/src/test_dmclock.h b/src/dmclock/sim/src/test_dmclock.h new file mode 100644 index 000000000..1c7a55968 --- /dev/null +++ b/src/dmclock/sim/src/test_dmclock.h @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include "dmclock_recs.h" +#include "dmclock_server.h" +#include "dmclock_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "simulate.h" + + +namespace crimson { + namespace test_dmc { + + namespace dmc = crimson::dmclock; + namespace sim = crimson::qos_simulation; + + struct DmcAccum { + uint64_t reservation_count = 0; + uint64_t proportion_count = 0; + }; + + using DmcQueue = dmc::PushPriorityQueue<ClientId,sim::TestRequest>; + using DmcServiceTracker = dmc::ServiceTracker<ServerId,dmc::OrigTracker>; + + using DmcServer = sim::SimulatedServer<DmcQueue, + dmc::ReqParams, + dmc::PhaseType, + DmcAccum>; + + using DmcClient = sim::SimulatedClient<DmcServiceTracker, + dmc::ReqParams, + dmc::PhaseType, + DmcAccum>; + + using CreateQueueF = std::function<DmcQueue*(DmcQueue::CanHandleRequestFunc, + DmcQueue::HandleRequestFunc)>; + + using MySim = sim::Simulation<ServerId,ClientId,DmcServer,DmcClient>; + + using SubmitFunc = DmcClient::SubmitFunc; + + extern void dmc_server_accumulate_f(DmcAccum& a, + const dmc::PhaseType& phase); + + extern void dmc_client_accumulate_f(DmcAccum& a, + const dmc::PhaseType& phase); + } // namespace test_dmc +} // namespace crimson diff --git a/src/dmclock/sim/src/test_dmclock_main.cc b/src/dmclock/sim/src/test_dmclock_main.cc new file mode 100644 index 000000000..5cf656608 --- /dev/null +++ b/src/dmclock/sim/src/test_dmclock_main.cc @@ -0,0 +1,342 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include "test_dmclock.h" +#include "config.h" + +#ifdef PROFILE +#include "profile.h" +#endif + + +namespace dmc = crimson::dmclock; +namespace test = crimson::test_dmc; +namespace sim = crimson::qos_simulation; + +using namespace std::placeholders; + + +namespace crimson { + namespace test_dmc { + void server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec); + + void client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec); + } +} + + +int main(int argc, char* argv[]) { + std::vector<const char*> args; + for (int i = 1; i < argc; ++i) { + args.push_back(argv[i]); + } + + std::string conf_file_list; + sim::ceph_argparse_early_args(args, &conf_file_list); + + sim::sim_config_t g_conf; + std::vector<sim::cli_group_t> &cli_group = g_conf.cli_group; + std::vector<sim::srv_group_t> &srv_group = g_conf.srv_group; + + if (!conf_file_list.empty()) { + int ret; + ret = sim::parse_config_file(conf_file_list, g_conf); + if (ret) { + // error + _exit(1); + } + } else { + // default simulation parameter + g_conf.client_groups = 2; + + sim::srv_group_t st; + srv_group.push_back(st); + + sim::cli_group_t ct1(99, 0); + cli_group.push_back(ct1); + + sim::cli_group_t ct2(1, 10); + cli_group.push_back(ct2); + } + + const unsigned server_groups = g_conf.server_groups; + const unsigned client_groups = g_conf.client_groups; + const bool server_random_selection = g_conf.server_random_selection; + const bool server_soft_limit = g_conf.server_soft_limit; + const double anticipation_timeout = g_conf.anticipation_timeout; + unsigned server_total_count = 0; + unsigned client_total_count = 0; + + for (unsigned i = 0; i < client_groups; ++i) { + client_total_count += cli_group[i].client_count; + } + + for (unsigned i = 0; i < server_groups; ++i) { + server_total_count += srv_group[i].server_count; + } + + std::vector<test::dmc::ClientInfo> client_info; + for (unsigned i = 0; i < client_groups; ++i) { + client_info.push_back(test::dmc::ClientInfo + { cli_group[i].client_reservation, + cli_group[i].client_weight, + cli_group[i].client_limit } ); + } + + auto ret_client_group_f = [&](const ClientId& c) -> unsigned { + unsigned group_max = 0; + unsigned i = 0; + for (; i < client_groups; ++i) { + group_max += cli_group[i].client_count; + if (c < group_max) { + break; + } + } + return i; + }; + + auto ret_server_group_f = [&](const ServerId& s) -> unsigned { + unsigned group_max = 0; + unsigned i = 0; + for (; i < server_groups; ++i) { + group_max += srv_group[i].server_count; + if (s < group_max) { + break; + } + } + return i; + }; + + auto client_info_f = + [=](const ClientId& c) -> const test::dmc::ClientInfo* { + return &client_info[ret_client_group_f(c)]; + }; + + auto client_disp_filter = [=] (const ClientId& i) -> bool { + return i < 3 || i >= (client_total_count - 3); + }; + + auto server_disp_filter = [=] (const ServerId& i) -> bool { + return i < 3 || i >= (server_total_count - 3); + }; + + + test::MySim *simulation; + + + // lambda to post a request to the identified server; called by client + test::SubmitFunc server_post_f = + [&simulation, + &cli_group, + &ret_client_group_f](const ServerId& server, + sim::TestRequest&& request, + const ClientId& client_id, + const test::dmc::ReqParams& req_params) { + test::DmcServer& s = simulation->get_server(server); + sim::Cost request_cost = cli_group[ret_client_group_f(client_id)].client_req_cost; + s.post(std::move(request), client_id, req_params, request_cost); + }; + + std::vector<std::vector<sim::CliInst>> cli_inst; + for (unsigned i = 0; i < client_groups; ++i) { + if (cli_group[i].client_wait == std::chrono::seconds(0)) { + cli_inst.push_back( + { { sim::req_op, + (uint32_t)cli_group[i].client_total_ops, + (double)cli_group[i].client_iops_goal, + (uint16_t)cli_group[i].client_outstanding_ops } } ); + } else { + cli_inst.push_back( + { { sim::wait_op, cli_group[i].client_wait }, + { sim::req_op, + (uint32_t)cli_group[i].client_total_ops, + (double)cli_group[i].client_iops_goal, + (uint16_t)cli_group[i].client_outstanding_ops } } ); + } + } + + simulation = new test::MySim(); + + test::DmcServer::ClientRespFunc client_response_f = + [&simulation](ClientId client_id, + const sim::TestResponse& resp, + const ServerId& server_id, + const dmc::PhaseType& phase, + const sim::Cost request_cost) { + simulation->get_client(client_id).receive_response(resp, + server_id, + phase, + request_cost); + }; + + test::CreateQueueF create_queue_f = + [&](test::DmcQueue::CanHandleRequestFunc can_f, + test::DmcQueue::HandleRequestFunc handle_f) -> test::DmcQueue* { + return new test::DmcQueue(client_info_f, + can_f, + handle_f, + server_soft_limit ? dmc::AtLimit::Allow : dmc::AtLimit::Wait, + anticipation_timeout); + }; + + + auto create_server_f = [&](ServerId id) -> test::DmcServer* { + unsigned i = ret_server_group_f(id); + return new test::DmcServer(id, + srv_group[i].server_iops, + srv_group[i].server_threads, + client_response_f, + test::dmc_server_accumulate_f, + create_queue_f); + }; + + auto create_client_f = [&](ClientId id) -> test::DmcClient* { + unsigned i = ret_client_group_f(id); + test::MySim::ClientBasedServerSelectFunc server_select_f; + unsigned client_server_select_range = cli_group[i].client_server_select_range; + if (!server_random_selection) { + server_select_f = simulation->make_server_select_alt_range(client_server_select_range); + } else { + server_select_f = simulation->make_server_select_ran_range(client_server_select_range); + } + return new test::DmcClient(id, + server_post_f, + std::bind(server_select_f, _1, id), + test::dmc_client_accumulate_f, + cli_inst[i]); + }; + +#if 1 + std::cout << "[global]" << std::endl << g_conf << std::endl; + for (unsigned i = 0; i < client_groups; ++i) { + std::cout << std::endl << "[client." << i << "]" << std::endl; + std::cout << cli_group[i] << std::endl; + } + for (unsigned i = 0; i < server_groups; ++i) { + std::cout << std::endl << "[server." << i << "]" << std::endl; + std::cout << srv_group[i] << std::endl; + } + std::cout << std::endl; +#endif + + simulation->add_servers(server_total_count, create_server_f); + simulation->add_clients(client_total_count, create_client_f); + + simulation->run(); + simulation->display_stats(std::cout, + &test::server_data, &test::client_data, + server_disp_filter, client_disp_filter); + + delete simulation; +} // main + + +void test::client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec) { + // report how many ops were done by reservation and proportion for + // each client + + int total_r = 0; + out << std::setw(head_w) << "res_ops:"; + for (unsigned i = 0; i < sim->get_client_count(); ++i) { + const auto& client = sim->get_client(i); + auto r = client.get_accumulator().reservation_count; + total_r += r; + if (!client_disp_filter(i)) continue; + out << " " << std::setw(data_w) << r; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_r << std::endl; + + int total_p = 0; + out << std::setw(head_w) << "prop_ops:"; + for (unsigned i = 0; i < sim->get_client_count(); ++i) { + const auto& client = sim->get_client(i); + auto p = client.get_accumulator().proportion_count; + total_p += p; + if (!client_disp_filter(i)) continue; + out << " " << std::setw(data_w) << p; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_p << std::endl; +} + + +void test::server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec) { + out << std::setw(head_w) << "res_ops:"; + int total_r = 0; + for (unsigned i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto rc = server.get_accumulator().reservation_count; + total_r += rc; + if (!server_disp_filter(i)) continue; + out << " " << std::setw(data_w) << rc; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_r << std::endl; + + out << std::setw(head_w) << "prop_ops:"; + int total_p = 0; + for (unsigned i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto pc = server.get_accumulator().proportion_count; + total_p += pc; + if (!server_disp_filter(i)) continue; + out << " " << std::setw(data_w) << pc; + } + out << " " << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_p << std::endl; + + const auto& q = sim->get_server(0).get_priority_queue(); + out << std::endl << + " k-way heap: " << q.get_heap_branching_factor() << std::endl + << std::endl; + +#ifdef PROFILE + crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner; + crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner; + for (unsigned i = 0; i < sim->get_server_count(); ++i) { + const auto& q = sim->get_server(i).get_priority_queue(); + const auto& art = q.add_request_timer; + art_combiner.combine(art); + const auto& rct = q.request_complete_timer; + rct_combiner.combine(rct); + } + out << "Server add_request_timer: count:" << art_combiner.get_count() << + ", mean:" << art_combiner.get_mean() << + ", std_dev:" << art_combiner.get_std_dev() << + ", low:" << art_combiner.get_low() << + ", high:" << art_combiner.get_high() << std::endl; + out << "Server request_complete_timer: count:" << rct_combiner.get_count() << + ", mean:" << rct_combiner.get_mean() << + ", std_dev:" << rct_combiner.get_std_dev() << + ", low:" << rct_combiner.get_low() << + ", high:" << rct_combiner.get_high() << std::endl; + out << "Server combined mean: " << + (art_combiner.get_mean() + rct_combiner.get_mean()) << + std::endl; +#endif +} diff --git a/src/dmclock/sim/src/test_ssched.cc b/src/dmclock/sim/src/test_ssched.cc new file mode 100644 index 000000000..b06273dc0 --- /dev/null +++ b/src/dmclock/sim/src/test_ssched.cc @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include "ssched_recs.h" +#include "ssched_server.h" +#include "ssched_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "test_ssched.h" + + +namespace test = crimson::test_simple_scheduler; +namespace ssched = crimson::simple_scheduler; + + +void test::simple_server_accumulate_f(test::SimpleAccum& a, + const ssched::NullData& add_info) { + ++a.request_count; +} + + +void test::simple_client_accumulate_f(test::SimpleAccum& a, + const ssched::NullData& ignore) { + // empty +} diff --git a/src/dmclock/sim/src/test_ssched.h b/src/dmclock/sim/src/test_ssched.h new file mode 100644 index 000000000..0d778709a --- /dev/null +++ b/src/dmclock/sim/src/test_ssched.h @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include "ssched_server.h" +#include "ssched_client.h" + +#include "sim_recs.h" +#include "sim_server.h" +#include "sim_client.h" + +#include "simulate.h" + + +namespace crimson { + namespace test_simple_scheduler { + + namespace ssched = crimson::simple_scheduler; + namespace sim = crimson::qos_simulation; + + using Time = double; + + struct SimpleAccum { + uint32_t request_count = 0; + }; + + using SimpleQueue = ssched::SimpleQueue<ClientId,sim::TestRequest,Time>; + + using SimpleServer = sim::SimulatedServer<SimpleQueue, + ssched::ReqParams, + ssched::NullData, + SimpleAccum>; + using SimpleClient = sim::SimulatedClient<ssched::ServiceTracker<ServerId>, + ssched::ReqParams, + ssched::NullData, + SimpleAccum>; + + using CreateQueueF = + std::function<SimpleQueue*(SimpleQueue::CanHandleRequestFunc, + SimpleQueue::HandleRequestFunc)>; + + + using MySim = sim::Simulation<ServerId,ClientId,SimpleServer,SimpleClient>; + + using SubmitFunc = SimpleClient::SubmitFunc; + + extern void simple_server_accumulate_f(SimpleAccum& a, + const ssched::NullData& add_info); + + extern void simple_client_accumulate_f(SimpleAccum& a, + const ssched::NullData& ignore); + } // namespace test_simple +} // namespace crimson diff --git a/src/dmclock/sim/src/test_ssched_main.cc b/src/dmclock/sim/src/test_ssched_main.cc new file mode 100644 index 000000000..ace4f8cce --- /dev/null +++ b/src/dmclock/sim/src/test_ssched_main.cc @@ -0,0 +1,199 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + * + * Author: J. Eric Ivancich <ivancich@redhat.com> + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version + * 2.1, as published by the Free Software Foundation. See file + * COPYING. + */ + + +#include "test_ssched.h" + + +#ifdef PROFILE +#include "profile.h" +#endif + + +namespace test = crimson::test_simple_scheduler; +namespace ssched = crimson::simple_scheduler; +namespace sim = crimson::qos_simulation; + +using namespace std::placeholders; + + +namespace crimson { + namespace test_simple_scheduler { + void client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec); + + void server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec); + } // namespace test_simple +} // namespace crimson + + +using Cost = uint32_t; + + +int main(int argc, char* argv[]) { + // server params + + const unsigned server_count = 100; + const unsigned server_iops = 40; + const unsigned server_threads = 1; + + // client params + + const unsigned client_total_ops = 1000; + const unsigned client_count = 100; + const unsigned client_server_select_range = 10; + const unsigned client_wait_count = 1; + const unsigned client_iops_goal = 50; + const unsigned client_outstanding_ops = 100; + const std::chrono::seconds client_wait(10); + + auto client_disp_filter = [=] (const ClientId& i) -> bool { + return i < 3 || i >= (client_count - 3); + }; + + auto server_disp_filter = [=] (const ServerId& i) -> bool { + return i < 3 || i >= (server_count - 3); + }; + + + test::MySim *simulation; + + // lambda to post a request to the identified server; called by client + test::SubmitFunc server_post_f = + [&simulation](const ServerId& server_id, + sim::TestRequest&& request, + const ClientId& client_id, + const ssched::ReqParams& req_params) { + auto& server = simulation->get_server(server_id); + server.post(std::move(request), client_id, req_params, 1u); + }; + + static std::vector<sim::CliInst> no_wait = + { { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } }; + static std::vector<sim::CliInst> wait = + { { sim::wait_op, client_wait }, + { sim::req_op, client_total_ops, client_iops_goal, client_outstanding_ops } }; + + simulation = new test::MySim(); + +#if 1 + test::MySim::ClientBasedServerSelectFunc server_select_f = + simulation->make_server_select_alt_range(client_server_select_range); +#elif 0 + test::MySim::ClientBasedServerSelectFunc server_select_f = + std::bind(&test::MySim::server_select_random, simulation, _1, _2); +#else + test::MySim::ClientBasedServerSelectFunc server_select_f = + std::bind(&test::MySim::server_select_0, simulation, _1, _2); +#endif + + test::SimpleServer::ClientRespFunc client_response_f = + [&simulation](ClientId client_id, + const sim::TestResponse& resp, + const ServerId& server_id, + const ssched::NullData& resp_params, + Cost request_cost) { + simulation->get_client(client_id).receive_response(resp, + server_id, + resp_params, + request_cost); + }; + + test::CreateQueueF create_queue_f = + [&](test::SimpleQueue::CanHandleRequestFunc can_f, + test::SimpleQueue::HandleRequestFunc handle_f) -> test::SimpleQueue* { + return new test::SimpleQueue(can_f, handle_f); + }; + + auto create_server_f = [&](ServerId id) -> test::SimpleServer* { + return new test::SimpleServer(id, + server_iops, server_threads, + client_response_f, + test::simple_server_accumulate_f, + create_queue_f); + }; + + auto create_client_f = [&](ClientId id) -> test::SimpleClient* { + return new test::SimpleClient(id, + server_post_f, + std::bind(server_select_f, _1, id), + test::simple_client_accumulate_f, + id < (client_count - client_wait_count) + ? no_wait : wait); + }; + + simulation->add_servers(server_count, create_server_f); + simulation->add_clients(client_count, create_client_f); + + simulation->run(); + simulation->display_stats(std::cout, + &test::server_data, &test::client_data, + server_disp_filter, client_disp_filter); +} // main + + +void test::client_data(std::ostream& out, + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec) { + // empty +} + + +void test::server_data(std::ostream& out, + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec) { + out << std::setw(head_w) << "requests:"; + int total_req = 0; + for (unsigned i = 0; i < sim->get_server_count(); ++i) { + const auto& server = sim->get_server(i); + auto req_count = server.get_accumulator().request_count; + total_req += req_count; + if (!server_disp_filter(i)) continue; + out << std::setw(data_w) << req_count; + } + out << std::setw(data_w) << std::setprecision(data_prec) << + std::fixed << total_req << std::endl; + +#ifdef PROFILE + crimson::ProfileCombiner<std::chrono::nanoseconds> art_combiner; + crimson::ProfileCombiner<std::chrono::nanoseconds> rct_combiner; + for (unsigned i = 0; i < sim->get_server_count(); ++i) { + const auto& q = sim->get_server(i).get_priority_queue(); + const auto& art = q.add_request_timer; + art_combiner.combine(art); + const auto& rct = q.request_complete_timer; + rct_combiner.combine(rct); + } + out << "Server add_request_timer: count:" << art_combiner.get_count() << + ", mean:" << art_combiner.get_mean() << + ", std_dev:" << art_combiner.get_std_dev() << + ", low:" << art_combiner.get_low() << + ", high:" << art_combiner.get_high() << std::endl; + out << "Server request_complete_timer: count:" << rct_combiner.get_count() << + ", mean:" << rct_combiner.get_mean() << + ", std_dev:" << rct_combiner.get_std_dev() << + ", low:" << rct_combiner.get_low() << + ", high:" << rct_combiner.get_high() << std::endl; + out << "Server combined mean: " << + (art_combiner.get_mean() + rct_combiner.get_mean()) << + std::endl; +#endif +} |