/* * This file is open source software, licensed to you under the terms * of the Apache License, Version 2.0 (the "License"). See the NOTICE file * distributed with this work for additional information regarding copyright * ownership. You may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Copyright (C) 2017 ScyllaDB */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace seastar; using namespace std::chrono_literals; using namespace boost::accumulators; static auto random_seed = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); static std::default_random_engine random_generator(random_seed); class context; enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu }; namespace std { template <> struct hash { size_t operator() (const request_type& type) const { return static_cast(type); } }; } future<> busyloop_sleep(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now) { return do_until([until] { return std::chrono::steady_clock::now() >= until; }, [] { return yield(); }); } template future<> timer_sleep(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now) { return seastar::sleep(std::chrono::duration_cast(until - now)); } using sleep_fn = std::function(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now)>; class pause_distribution { public: virtual std::chrono::duration get() = 0; template Dur get_as() { return std::chrono::duration_cast(get()); } virtual ~pause_distribution() {} }; using pause_fn = std::function(std::chrono::duration)>; class uniform_process : public pause_distribution { std::chrono::duration _pause; public: uniform_process(std::chrono::duration period) : _pause(period) { } std::chrono::duration get() override { return _pause; } }; std::unique_ptr make_uniform_pause(std::chrono::duration d) { return std::make_unique(d); } class poisson_process : public pause_distribution { std::random_device _rd; std::mt19937 _rng; std::exponential_distribution _exp; public: poisson_process(std::chrono::duration period) : _rng(_rd()) , _exp(1.0 / period.count()) { } std::chrono::duration get() override { return std::chrono::duration(_exp(_rng)); } }; std::unique_ptr make_poisson_pause(std::chrono::duration d) { return std::make_unique(d); } struct byte_size { uint64_t size; }; struct duration_time { std::chrono::duration time; }; class shard_config { std::unordered_set _shards; public: shard_config() : _shards(boost::copy_range>(boost::irange(0u, smp::count))) {} shard_config(std::unordered_set s) : _shards(std::move(s)) {} bool is_set(unsigned cpu) const { return _shards.count(cpu); } }; struct shard_info { unsigned parallelism = 0; unsigned rps = 0; unsigned shares = 10; uint64_t request_size = 4 << 10; uint64_t bandwidth = 0; std::chrono::duration think_time = 0ms; std::chrono::duration think_after = 0ms; std::chrono::duration execution_time = 1ms; seastar::scheduling_group scheduling_group = seastar::default_scheduling_group(); }; struct options { bool dsync = false; ::sleep_fn sleep_fn = timer_sleep; ::pause_fn pause_fn = make_uniform_pause; }; class class_data; struct job_config { std::string name; request_type type; shard_config shard_placement; ::shard_info shard_info; ::options options; // size of each individual file. Every class and every shard have its file, so in a normal // system with many shards we'll naturally have many files and that will push the data out // of the disk's cache uint64_t file_size; uint64_t offset_in_bdev; std::unique_ptr gen_class_data(); }; std::array quantiles = { 0.5, 0.95, 0.99, 0.999}; static bool keep_files = false; class class_data { protected: using accumulator_type = accumulator_set>; job_config _config; uint64_t _alignment; uint64_t _last_pos = 0; uint64_t _offset = 0; io_priority_class _iop; seastar::scheduling_group _sg; size_t _data = 0; std::chrono::duration _total_duration; std::chrono::steady_clock::time_point _start = {}; accumulator_type _latencies; uint64_t _requests = 0; std::uniform_int_distribution _pos_distribution; file _file; bool _think = false; ::sleep_fn _sleep_fn = timer_sleep; timer<> _thinker; virtual future<> do_start(sstring dir, directory_entry_type type) = 0; virtual future issue_request(char *buf, io_intent* intent) = 0; public: class_data(job_config cfg) : _config(std::move(cfg)) , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512) , _iop(io_priority_class::register_one(name(), _config.shard_info.shares)) , _sg(cfg.shard_info.scheduling_group) , _latencies(extended_p_square_probabilities = quantiles) , _pos_distribution(0, _config.file_size / _config.shard_info.request_size) , _sleep_fn(_config.options.sleep_fn) , _thinker([this] { think_tick(); }) { if (_config.shard_info.think_after > 0us) { _thinker.arm(std::chrono::duration_cast(_config.shard_info.think_after)); } else if (_config.shard_info.think_time > 0us) { _think = true; } } virtual ~class_data() = default; private: void think_tick() { if (_think) { _think = false; _thinker.arm(std::chrono::duration_cast(_config.shard_info.think_after)); } else { _think = true; _thinker.arm(std::chrono::duration_cast(_config.shard_info.think_time)); } } future<> issue_requests_in_parallel(std::chrono::steady_clock::time_point stop, unsigned parallelism) { return parallel_for_each(boost::irange(0u, parallelism), [this, stop] (auto dummy) mutable { auto bufptr = allocate_aligned_buffer(this->req_size(), _alignment); auto buf = bufptr.get(); return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable { auto start = std::chrono::steady_clock::now(); return issue_request(buf, nullptr).then([this, start, stop] (auto size) { auto now = std::chrono::steady_clock::now(); if (now < stop) { this->add_result(size, std::chrono::duration_cast(now - start)); } return think(); }); }).finally([bufptr = std::move(bufptr)] {}); }); } future<> issue_requests_at_rate(std::chrono::steady_clock::time_point stop, unsigned rps, unsigned parallelism) { return do_with(io_intent{}, 0u, [this, stop, rps, parallelism] (io_intent& intent, unsigned& in_flight) { return parallel_for_each(boost::irange(0u, parallelism), [this, stop, rps, &intent, &in_flight, parallelism] (auto dummy) mutable { auto bufptr = allocate_aligned_buffer(this->req_size(), _alignment); auto buf = bufptr.get(); auto pause = std::chrono::duration_cast(1s) / rps; auto pause_dist = _config.options.pause_fn(pause); return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight] () mutable { return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop, pause, &intent, &in_flight] () mutable { auto start = std::chrono::steady_clock::now(); in_flight++; return issue_request(buf, &intent).then_wrapped([this, start, pause, stop, &in_flight] (auto size_f) { size_t size; try { size = size_f.get0(); } catch (...) { // cancelled in_flight--; return make_ready_future<>(); } auto now = std::chrono::steady_clock::now(); if (now < stop) { this->add_result(size, std::chrono::duration_cast(now - start)); } in_flight--; auto p = pause->template get_as(); auto next = start + p; if (next > now) { return this->_sleep_fn(next, now); } else { // probably the system cannot keep-up with this rate return make_ready_future<>(); } }); }); }).then([&intent, &in_flight] { intent.cancel(); return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); }); }).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist)] {}); }); }); } public: future<> issue_requests(std::chrono::steady_clock::time_point stop) { _start = std::chrono::steady_clock::now(); return with_scheduling_group(_sg, [this, stop] { if (rps() == 0) { return issue_requests_in_parallel(stop, parallelism()); } else { return issue_requests_at_rate(stop, rps(), parallelism()); } }).then([this] { _total_duration = std::chrono::steady_clock::now() - _start; }); } future<> think() { if (_think) { return seastar::sleep(std::chrono::duration_cast(_config.shard_info.think_time)); } else { return make_ready_future<>(); } } // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate // this file differently depending on the type: // // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF // random reads : will read the file at random positions, between 0 and EOF // random writes : will overwrite the file at a random position, between 0 and EOF // append : will write to the file from pos = EOF onwards, always appending to the end. // cpu : CPU-only load, file is not created. future<> start(sstring dir, directory_entry_type type) { return do_start(dir, type).then([this] { if (this_shard_id() == 0 && _config.shard_info.bandwidth != 0) { return _iop.update_bandwidth(_config.shard_info.bandwidth); } else { return make_ready_future<>(); } }); } future<> stop() { if (_file) { return _file.close(); } return make_ready_future<>(); } const sstring name() const { return _config.name; } protected: sstring type_str() const { return std::unordered_map{ { request_type::seqread, "SEQ READ" }, { request_type::seqwrite, "SEQ WRITE" }, { request_type::randread, "RAND READ" }, { request_type::randwrite, "RAND WRITE" }, { request_type::append , "APPEND" }, { request_type::cpu , "CPU" }, }[_config.type];; } request_type req_type() const { return _config.type; } sstring think_time() const { if (_config.shard_info.think_time == std::chrono::duration(0)) { return "NO think time"; } else { return format("{:d} us think time", std::chrono::duration_cast(_config.shard_info.think_time).count()); } } size_t req_size() const { return _config.shard_info.request_size; } unsigned parallelism() const { return _config.shard_info.parallelism; } unsigned rps() const { return _config.shard_info.rps; } unsigned shares() const { return _config.shard_info.shares; } std::chrono::duration total_duration() const { return _total_duration; } uint64_t file_size_mb() const { return _config.file_size >> 20; } uint64_t total_data() const { return _data; } uint64_t max_latency() const { return max(_latencies); } uint64_t average_latency() const { return mean(_latencies); } uint64_t quantile_latency(double q) const { return quantile(_latencies, quantile_probability = q); } uint64_t requests() const noexcept { return _requests; } bool is_sequential() const { return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite); } bool is_random() const { return (req_type() == request_type::randread) || (req_type() == request_type::randwrite); } uint64_t get_pos() { uint64_t pos; if (is_random()) { pos = _pos_distribution(random_generator) * req_size(); } else { pos = _last_pos + req_size(); if (is_sequential() && (pos >= _config.file_size)) { pos = 0; } } _last_pos = pos; return pos + _offset; } void add_result(size_t data, std::chrono::microseconds latency) { _data += data; _latencies(latency.count()); _requests++; } public: virtual void emit_results(YAML::Emitter& out) = 0; }; class io_class_data : public class_data { protected: bool _is_dev_null = false; future on_io_completed(future f) { if (!_is_dev_null) { return f; } return f.then([this] (auto size_f) { return make_ready_future(this->req_size()); }); } public: io_class_data(job_config cfg) : class_data(std::move(cfg)) {} future<> do_start(sstring path, directory_entry_type type) override { if (type == directory_entry_type::directory) { return do_start_on_directory(path); } if (type == directory_entry_type::block_device) { return do_start_on_bdev(path); } if (type == directory_entry_type::char_device && path == "/dev/null") { return do_start_on_dev_null(); } throw std::runtime_error(format("Unsupported storage. {} should be directory or block device", path)); } private: future<> do_start_on_directory(sstring dir) { auto fname = format("{}/test-{}-{:d}", dir, name(), this_shard_id()); auto flags = open_flags::rw | open_flags::create; if (_config.options.dsync) { flags |= open_flags::dsync; } file_open_options options; options.extent_allocation_size_hint = _config.file_size; options.append_is_unlikely = true; return open_file_dma(fname, flags, options).then([this, fname] (auto f) { _file = f; auto maybe_remove_file = [] (sstring fname) { return keep_files ? make_ready_future<>() : remove_file(fname); }; return maybe_remove_file(fname).then([this] { return _file.size().then([this] (uint64_t size) { return _file.truncate(_config.file_size).then([this, size] { if (size >= _config.file_size) { return make_ready_future<>(); } auto bufsize = 256ul << 10; return do_with(boost::irange(0ul, (_config.file_size / bufsize) + 1), [this, bufsize] (auto& pos) mutable { return max_concurrent_for_each(pos.begin(), pos.end(), 64, [this, bufsize] (auto pos) mutable { auto bufptr = allocate_aligned_buffer(bufsize, 4096); auto buf = bufptr.get(); std::uniform_int_distribution fill('@', '~'); memset(buf, fill(random_generator), bufsize); pos = pos * bufsize; return _file.dma_write(pos, buf, bufsize).finally([this, bufptr = std::move(bufptr), pos] { if ((this->req_type() == request_type::append) && (pos > _last_pos)) { _last_pos = pos; } }).discard_result(); }); }).then([this] { return _file.flush(); }); }); }); }); }); } future<> do_start_on_bdev(sstring name) { auto flags = open_flags::rw; if (_config.options.dsync) { flags |= open_flags::dsync; } return open_file_dma(name, flags).then([this] (auto f) { _file = std::move(f); return _file.size().then([this] (uint64_t size) { auto shard_area_size = align_down(size / smp::count, 1 << 20); if (_config.offset_in_bdev + _config.file_size > shard_area_size) { throw std::runtime_error("Data doesn't fit the blockdevice"); } _offset = shard_area_size * this_shard_id() + _config.offset_in_bdev; return make_ready_future<>(); }); }); } future<> do_start_on_dev_null() { file_open_options options; options.append_is_unlikely = true; return open_file_dma("/dev/null", open_flags::rw, std::move(options)).then([this] (auto f) { _file = std::move(f); _is_dev_null = true; return make_ready_future<>(); }); } void emit_one_metrics(YAML::Emitter& out, sstring m_name) { const auto& values = seastar::metrics::impl::get_value_map(); const auto& mf = values.find(m_name); assert(mf != values.end()); for (auto&& mi : mf->second) { auto&& cname = mi.first.find("class"); if (cname != mi.first.end() && cname->second == name()) { out << YAML::Key << m_name << YAML::Value << mi.second->get_function()().d(); } } } void emit_metrics(YAML::Emitter& out) { emit_one_metrics(out, "io_queue_total_exec_sec"); emit_one_metrics(out, "io_queue_total_delay_sec"); emit_one_metrics(out, "io_queue_total_operations"); emit_one_metrics(out, "io_queue_starvation_time_sec"); emit_one_metrics(out, "io_queue_consumption"); emit_one_metrics(out, "io_queue_adjusted_consumption"); } public: virtual void emit_results(YAML::Emitter& out) override { auto throughput_kbs = (total_data() >> 10) / total_duration().count(); auto iops = requests() / total_duration().count(); out << YAML::Key << "throughput" << YAML::Value << throughput_kbs << YAML::Comment("kB/s"); out << YAML::Key << "IOPS" << YAML::Value << iops; out << YAML::Key << "latencies" << YAML::Comment("usec"); out << YAML::BeginMap; out << YAML::Key << "average" << YAML::Value << average_latency(); for (auto& q: quantiles) { out << YAML::Key << fmt::format("p{}", q) << YAML::Value << quantile_latency(q); } out << YAML::Key << "max" << YAML::Value << max_latency(); out << YAML::EndMap; out << YAML::Key << "stats" << YAML::BeginMap; out << YAML::Key << "total_requests" << YAML::Value << requests(); emit_metrics(out); out << YAML::EndMap; } }; class read_io_class_data : public io_class_data { public: read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} future issue_request(char *buf, io_intent* intent) override { auto f = _file.dma_read(this->get_pos(), buf, this->req_size(), _iop, intent); return on_io_completed(std::move(f)); } }; class write_io_class_data : public io_class_data { public: write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} future issue_request(char *buf, io_intent* intent) override { auto f = _file.dma_write(this->get_pos(), buf, this->req_size(), _iop, intent); return on_io_completed(std::move(f)); } }; class cpu_class_data : public class_data { public: cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {} future<> do_start(sstring dir, directory_entry_type type) override { return make_ready_future<>(); } future issue_request(char *buf, io_intent* intent) override { // We do want the execution time to be a busy loop, and not just a bunch of // continuations until our time is up: by doing this we can also simulate the behavior // of I/O continuations in the face of reactor stalls. auto start = std::chrono::steady_clock::now(); do { } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time); return make_ready_future(1); } virtual void emit_results(YAML::Emitter& out) override { auto throughput = total_data() / total_duration().count(); out << YAML::Key << "throughput" << YAML::Value << throughput; } }; std::unique_ptr job_config::gen_class_data() { if (type == request_type::cpu) { return std::make_unique(*this); } else if ((type == request_type::seqread) || (type == request_type::randread)) { return std::make_unique(*this); } else { return std::make_unique(*this); } } /// YAML parsing functions namespace YAML { template<> struct convert { static bool decode(const Node& node, byte_size& bs) { auto str = node.as(); unsigned shift = 0; if (str.back() == 'B') { str.pop_back(); shift = std::unordered_map{ { 'k', 10 }, { 'M', 20 }, { 'G', 30 }, }[str.back()]; str.pop_back(); } bs.size = (boost::lexical_cast(str) << shift); return bs.size >= 512; } }; template<> struct convert { static bool decode(const Node& node, duration_time& dt) { auto str = node.as(); if (str == "0") { dt.time = 0ns; return true; } if (str.back() != 's') { return false; } str.pop_back(); std::unordered_map> unit = { { 'n', 1ns }, { 'u', 1us }, { 'm', 1ms }, }; if (unit.count(str.back())) { auto u = str.back(); str.pop_back(); dt.time = (boost::lexical_cast(str) * unit[u]); } else { dt.time = (boost::lexical_cast(str) * 1s); } return true; } }; template<> struct convert { static bool decode(const Node& node, shard_config& shards) { try { auto str = node.as(); return (str == "all"); } catch (YAML::TypedBadConversion& e) { shards = shard_config(boost::copy_range>(node.as>())); return true; } return false; } }; template<> struct convert { static bool decode(const Node& node, request_type& rt) { static std::unordered_map mappings = { { "seqread", request_type::seqread }, { "seqwrite", request_type::seqwrite}, { "randread", request_type::randread }, { "randwrite", request_type::randwrite }, { "append", request_type::append}, { "cpu", request_type::cpu}, }; auto reqstr = node.as(); if (!mappings.count(reqstr)) { return false; } rt = mappings[reqstr]; return true; } }; template<> struct convert { static bool decode(const Node& node, shard_info& sl) { if (node["parallelism"]) { sl.parallelism = node["parallelism"].as(); } if (node["rps"]) { sl.rps = node["rps"].as(); } if (node["shares"]) { sl.shares = node["shares"].as(); } if (node["bandwidth"]) { sl.bandwidth = node["bandwidth"].as().size; } if (node["reqsize"]) { sl.request_size = node["reqsize"].as().size; } if (node["think_time"]) { sl.think_time = node["think_time"].as().time; } if (node["think_after"]) { sl.think_after = node["think_after"].as().time; } if (node["execution_time"]) { sl.execution_time = node["execution_time"].as().time; } return true; } }; template<> struct convert { static bool decode(const Node& node, options& op) { if (node["dsync"]) { op.dsync = node["dsync"].as(); } if (node["sleep_type"]) { auto st = node["sleep_type"].as(); if (st == "busyloop") { op.sleep_fn = busyloop_sleep; } else if (st == "lowres") { op.sleep_fn = timer_sleep; } else if (st == "steady") { op.sleep_fn = timer_sleep; } else { throw std::runtime_error(format("Unknown sleep_type {}", st)); } } if (node["pause_distribution"]) { auto pd = node["pause_distribution"].as(); if (pd == "uniform") { op.pause_fn = make_uniform_pause; } else if (pd == "poisson") { op.pause_fn = make_poisson_pause; } else { throw std::runtime_error(format("Unknown pause_distribution {}", pd)); } } return true; } }; template<> struct convert { static bool decode(const Node& node, job_config& cl) { cl.name = node["name"].as(); cl.type = node["type"].as(); cl.shard_placement = node["shards"].as(); // The data_size is used to divide the available (and effectively // constant) disk space between workloads. Each shard inside the // workload thus uses its portion of the assigned space. if (node["data_size"]) { cl.file_size = node["data_size"].as().size / smp::count; } else { cl.file_size = 1ull << 30; // 1G by default } if (node["shard_info"]) { cl.shard_info = node["shard_info"].as(); } if (node["options"]) { cl.options = node["options"].as(); } return true; } }; } /// Each shard has one context, and the context is responsible for creating the classes that should /// run in this shard. class context { std::vector> _cl; sstring _dir; directory_entry_type _type; std::chrono::seconds _duration; semaphore _finished; public: context(sstring dir, directory_entry_type dtype, std::vector req_config, unsigned duration) : _cl(boost::copy_range>>(req_config | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(this_shard_id()); }) | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); }) )) , _dir(dir) , _type(dtype) , _duration(duration) , _finished(0) {} future<> stop() { return parallel_for_each(_cl, [] (std::unique_ptr& cl) { return cl->stop(); }); } future<> start() { return parallel_for_each(_cl, [this] (std::unique_ptr& cl) { return cl->start(_dir, _type); }); } future<> issue_requests() { return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr& cl) { return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] { _finished.signal(1); }); }); } future<> emit_results(YAML::Emitter& out) { return _finished.wait(_cl.size()).then([this, &out] { for (auto& cl: _cl) { out << YAML::Key << cl->name(); out << YAML::BeginMap; cl->emit_results(out); out << YAML::EndMap; } return make_ready_future<>(); }); } }; static void show_results(distributed& ctx) { YAML::Emitter out; out << YAML::BeginDoc; out << YAML::BeginSeq; for (unsigned i = 0; i < smp::count; ++i) { out << YAML::BeginMap; out << YAML::Key << "shard" << YAML::Value << i; ctx.invoke_on(i, [&out] (auto& c) { return c.emit_results(out); }).get(); out << YAML::EndMap; } out << YAML::EndSeq; out << YAML::EndDoc; std::cout << out.c_str(); } int main(int ac, char** av) { namespace bpo = boost::program_options; app_template app; auto opt_add = app.add_options(); opt_add ("storage", bpo::value()->default_value("."), "directory or block device where to execute the test") ("duration", bpo::value()->default_value(10), "for how long (in seconds) to run the test") ("conf", bpo::value()->default_value("./conf.yaml"), "YAML file containing benchmark specification") ("keep-files", bpo::value()->default_value(false), "keep test files, next run may re-use them") ; distributed ctx; return app.run(ac, av, [&] { return seastar::async([&] { auto& opts = app.configuration(); auto& storage = opts["storage"].as(); auto st_type = engine().file_type(storage).get0(); if (!st_type) { throw std::runtime_error(format("Unknown storage {}", storage)); } if (*st_type == directory_entry_type::directory) { auto fs = file_system_at(storage).get0(); if (fs != fs_type::xfs) { throw std::runtime_error(format("This is a performance test. {} is not on XFS", storage)); } } keep_files = opts["keep-files"].as(); auto& duration = opts["duration"].as(); auto& yaml = opts["conf"].as(); YAML::Node doc = YAML::LoadFile(yaml); auto reqs = doc.as>(); parallel_for_each(reqs, [] (auto& r) { return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) { r.shard_info.scheduling_group = sg; }); }).get(); if (*st_type == directory_entry_type::block_device) { uint64_t off = 0; for (job_config& r : reqs) { r.offset_in_bdev = off; off += r.file_size; } } ctx.start(storage, *st_type, reqs, duration).get0(); engine().at_exit([&ctx] { return ctx.stop(); }); std::cout << "Creating initial files..." << std::endl; ctx.invoke_on_all([] (auto& c) { return c.start(); }).get(); std::cout << "Starting evaluation..." << std::endl; ctx.invoke_on_all([] (auto& c) { return c.issue_requests(); }).get(); show_results(ctx); ctx.stop().get0(); }).or_terminate(); }); }