summaryrefslogtreecommitdiffstats
path: root/src/seastar/apps/io_tester/io_tester.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/seastar/apps/io_tester/io_tester.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/apps/io_tester/io_tester.cc')
-rw-r--r--src/seastar/apps/io_tester/io_tester.cc988
1 files changed, 988 insertions, 0 deletions
diff --git a/src/seastar/apps/io_tester/io_tester.cc b/src/seastar/apps/io_tester/io_tester.cc
new file mode 100644
index 000000000..040589201
--- /dev/null
+++ b/src/seastar/apps/io_tester/io_tester.cc
@@ -0,0 +1,988 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+#include <seastar/core/app-template.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/align.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/core/print.hh>
+#include <seastar/core/loop.hh>
+#include <seastar/core/with_scheduling_group.hh>
+#include <seastar/core/metrics_api.hh>
+#include <seastar/core/io_intent.hh>
+#include <seastar/util/later.hh>
+#include <chrono>
+#include <vector>
+#include <boost/range/irange.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/p_square_quantile.hpp>
+#include <boost/accumulators/statistics/extended_p_square.hpp>
+#include <boost/accumulators/statistics/extended_p_square_quantile.hpp>
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/array.hpp>
+#include <iomanip>
+#include <random>
+#include <yaml-cpp/yaml.h>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+using namespace boost::accumulators;
+
+static auto random_seed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
+static std::default_random_engine random_generator(random_seed);
+
+class context;
+enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu };
+
+namespace std {
+
+template <>
+struct hash<request_type> {
+ size_t operator() (const request_type& type) const {
+ return static_cast<size_t>(type);
+ }
+};
+
+}
+
+future<> busyloop_sleep(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now) {
+ return do_until([until] {
+ return std::chrono::steady_clock::now() >= until;
+ }, [] {
+ return yield();
+ });
+}
+
+template <typename Clock>
+future<> timer_sleep(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now) {
+ return seastar::sleep<Clock>(std::chrono::duration_cast<std::chrono::microseconds>(until - now));
+}
+
+using sleep_fn = std::function<future<>(std::chrono::steady_clock::time_point until, std::chrono::steady_clock::time_point now)>;
+
+class pause_distribution {
+public:
+
+ virtual std::chrono::duration<double> get() = 0;
+
+ template <typename Dur>
+ Dur get_as() {
+ return std::chrono::duration_cast<Dur>(get());
+ }
+
+ virtual ~pause_distribution() {}
+};
+
+using pause_fn = std::function<std::unique_ptr<pause_distribution>(std::chrono::duration<double>)>;
+
+class uniform_process : public pause_distribution {
+ std::chrono::duration<double> _pause;
+
+public:
+ uniform_process(std::chrono::duration<double> period)
+ : _pause(period)
+ {
+ }
+
+ std::chrono::duration<double> get() override {
+ return _pause;
+ }
+};
+
+std::unique_ptr<pause_distribution> make_uniform_pause(std::chrono::duration<double> d) {
+ return std::make_unique<uniform_process>(d);
+}
+
+class poisson_process : public pause_distribution {
+ std::random_device _rd;
+ std::mt19937 _rng;
+ std::exponential_distribution<double> _exp;
+
+public:
+ poisson_process(std::chrono::duration<double> period)
+ : _rng(_rd())
+ , _exp(1.0 / period.count())
+ {
+ }
+
+ std::chrono::duration<double> get() override {
+ return std::chrono::duration<double>(_exp(_rng));
+ }
+};
+
+std::unique_ptr<pause_distribution> make_poisson_pause(std::chrono::duration<double> d) {
+ return std::make_unique<poisson_process>(d);
+}
+
+struct byte_size {
+ uint64_t size;
+};
+
+struct duration_time {
+ std::chrono::duration<float> time;
+};
+
+class shard_config {
+ std::unordered_set<unsigned> _shards;
+public:
+ shard_config()
+ : _shards(boost::copy_range<std::unordered_set<unsigned>>(boost::irange(0u, smp::count))) {}
+ shard_config(std::unordered_set<unsigned> s) : _shards(std::move(s)) {}
+
+ bool is_set(unsigned cpu) const {
+ return _shards.count(cpu);
+ }
+};
+
+struct shard_info {
+ unsigned parallelism = 0;
+ unsigned rps = 0;
+ unsigned shares = 10;
+ uint64_t request_size = 4 << 10;
+ uint64_t bandwidth = 0;
+ std::chrono::duration<float> think_time = 0ms;
+ std::chrono::duration<float> think_after = 0ms;
+ std::chrono::duration<float> execution_time = 1ms;
+ seastar::scheduling_group scheduling_group = seastar::default_scheduling_group();
+};
+
+struct options {
+ bool dsync = false;
+ ::sleep_fn sleep_fn = timer_sleep<lowres_clock>;
+ ::pause_fn pause_fn = make_uniform_pause;
+};
+
+class class_data;
+
+struct job_config {
+ std::string name;
+ request_type type;
+ shard_config shard_placement;
+ ::shard_info shard_info;
+ ::options options;
+ // size of each individual file. Every class and every shard have its file, so in a normal
+ // system with many shards we'll naturally have many files and that will push the data out
+ // of the disk's cache
+ uint64_t file_size;
+ uint64_t offset_in_bdev;
+ std::unique_ptr<class_data> gen_class_data();
+};
+
+std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999};
+static bool keep_files = false;
+
+class class_data {
+protected:
+ using accumulator_type = accumulator_set<double, stats<tag::extended_p_square_quantile(quadratic), tag::mean, tag::max>>;
+
+ job_config _config;
+ uint64_t _alignment;
+ uint64_t _last_pos = 0;
+ uint64_t _offset = 0;
+
+ io_priority_class _iop;
+ seastar::scheduling_group _sg;
+
+ size_t _data = 0;
+ std::chrono::duration<float> _total_duration;
+
+ std::chrono::steady_clock::time_point _start = {};
+ accumulator_type _latencies;
+ uint64_t _requests = 0;
+ std::uniform_int_distribution<uint32_t> _pos_distribution;
+ file _file;
+ bool _think = false;
+ ::sleep_fn _sleep_fn = timer_sleep<lowres_clock>;
+ timer<> _thinker;
+
+ virtual future<> do_start(sstring dir, directory_entry_type type) = 0;
+ virtual future<size_t> issue_request(char *buf, io_intent* intent) = 0;
+public:
+ class_data(job_config cfg)
+ : _config(std::move(cfg))
+ , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512)
+ , _iop(io_priority_class::register_one(name(), _config.shard_info.shares))
+ , _sg(cfg.shard_info.scheduling_group)
+ , _latencies(extended_p_square_probabilities = quantiles)
+ , _pos_distribution(0, _config.file_size / _config.shard_info.request_size)
+ , _sleep_fn(_config.options.sleep_fn)
+ , _thinker([this] { think_tick(); })
+ {
+ if (_config.shard_info.think_after > 0us) {
+ _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after));
+ } else if (_config.shard_info.think_time > 0us) {
+ _think = true;
+ }
+ }
+
+ virtual ~class_data() = default;
+
+private:
+
+ void think_tick() {
+ if (_think) {
+ _think = false;
+ _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after));
+ } else {
+ _think = true;
+ _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time));
+ }
+ }
+
+ future<> issue_requests_in_parallel(std::chrono::steady_clock::time_point stop, unsigned parallelism) {
+ return parallel_for_each(boost::irange(0u, parallelism), [this, stop] (auto dummy) mutable {
+ auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment);
+ auto buf = bufptr.get();
+ return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable {
+ auto start = std::chrono::steady_clock::now();
+ return issue_request(buf, nullptr).then([this, start, stop] (auto size) {
+ auto now = std::chrono::steady_clock::now();
+ if (now < stop) {
+ this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
+ }
+ return think();
+ });
+ }).finally([bufptr = std::move(bufptr)] {});
+ });
+ }
+
+ future<> issue_requests_at_rate(std::chrono::steady_clock::time_point stop, unsigned rps, unsigned parallelism) {
+ return do_with(io_intent{}, 0u, [this, stop, rps, parallelism] (io_intent& intent, unsigned& in_flight) {
+ return parallel_for_each(boost::irange(0u, parallelism), [this, stop, rps, &intent, &in_flight, parallelism] (auto dummy) mutable {
+ auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment);
+ auto buf = bufptr.get();
+ auto pause = std::chrono::duration_cast<std::chrono::microseconds>(1s) / rps;
+ auto pause_dist = _config.options.pause_fn(pause);
+ return seastar::sleep((pause / parallelism) * dummy).then([this, buf, stop, pause = pause_dist.get(), &intent, &in_flight] () mutable {
+ return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop, pause, &intent, &in_flight] () mutable {
+ auto start = std::chrono::steady_clock::now();
+ in_flight++;
+ return issue_request(buf, &intent).then_wrapped([this, start, pause, stop, &in_flight] (auto size_f) {
+ size_t size;
+ try {
+ size = size_f.get0();
+ } catch (...) {
+ // cancelled
+ in_flight--;
+ return make_ready_future<>();
+ }
+
+ auto now = std::chrono::steady_clock::now();
+ if (now < stop) {
+ this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
+ }
+ in_flight--;
+ auto p = pause->template get_as<std::chrono::microseconds>();
+ auto next = start + p;
+
+ if (next > now) {
+ return this->_sleep_fn(next, now);
+ } else {
+ // probably the system cannot keep-up with this rate
+ return make_ready_future<>();
+ }
+ });
+ });
+ }).then([&intent, &in_flight] {
+ intent.cancel();
+ return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); });
+ }).finally([bufptr = std::move(bufptr), pause = std::move(pause_dist)] {});
+ });
+ });
+ }
+
+public:
+ future<> issue_requests(std::chrono::steady_clock::time_point stop) {
+ _start = std::chrono::steady_clock::now();
+ return with_scheduling_group(_sg, [this, stop] {
+ if (rps() == 0) {
+ return issue_requests_in_parallel(stop, parallelism());
+ } else {
+ return issue_requests_at_rate(stop, rps(), parallelism());
+ }
+ }).then([this] {
+ _total_duration = std::chrono::steady_clock::now() - _start;
+ });
+ }
+
+ future<> think() {
+ if (_think) {
+ return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time));
+ } else {
+ return make_ready_future<>();
+ }
+ }
+ // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting
+ // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate
+ // this file differently depending on the type:
+ //
+ // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF
+ // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF
+ // random reads : will read the file at random positions, between 0 and EOF
+ // random writes : will overwrite the file at a random position, between 0 and EOF
+ // append : will write to the file from pos = EOF onwards, always appending to the end.
+ // cpu : CPU-only load, file is not created.
+ future<> start(sstring dir, directory_entry_type type) {
+ return do_start(dir, type).then([this] {
+ if (this_shard_id() == 0 && _config.shard_info.bandwidth != 0) {
+ return _iop.update_bandwidth(_config.shard_info.bandwidth);
+ } else {
+ return make_ready_future<>();
+ }
+ });
+ }
+
+ future<> stop() {
+ if (_file) {
+ return _file.close();
+ }
+ return make_ready_future<>();
+ }
+
+ const sstring name() const {
+ return _config.name;
+ }
+
+protected:
+ sstring type_str() const {
+ return std::unordered_map<request_type, sstring>{
+ { request_type::seqread, "SEQ READ" },
+ { request_type::seqwrite, "SEQ WRITE" },
+ { request_type::randread, "RAND READ" },
+ { request_type::randwrite, "RAND WRITE" },
+ { request_type::append , "APPEND" },
+ { request_type::cpu , "CPU" },
+ }[_config.type];;
+ }
+
+ request_type req_type() const {
+ return _config.type;
+ }
+
+ sstring think_time() const {
+ if (_config.shard_info.think_time == std::chrono::duration<float>(0)) {
+ return "NO think time";
+ } else {
+ return format("{:d} us think time", std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time).count());
+ }
+ }
+
+ size_t req_size() const {
+ return _config.shard_info.request_size;
+ }
+
+ unsigned parallelism() const {
+ return _config.shard_info.parallelism;
+ }
+
+ unsigned rps() const {
+ return _config.shard_info.rps;
+ }
+
+ unsigned shares() const {
+ return _config.shard_info.shares;
+ }
+
+ std::chrono::duration<float> total_duration() const {
+ return _total_duration;
+ }
+
+ uint64_t file_size_mb() const {
+ return _config.file_size >> 20;
+ }
+
+ uint64_t total_data() const {
+ return _data;
+ }
+
+ uint64_t max_latency() const {
+ return max(_latencies);
+ }
+
+ uint64_t average_latency() const {
+ return mean(_latencies);
+ }
+
+ uint64_t quantile_latency(double q) const {
+ return quantile(_latencies, quantile_probability = q);
+ }
+
+ uint64_t requests() const noexcept {
+ return _requests;
+ }
+
+ bool is_sequential() const {
+ return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite);
+ }
+ bool is_random() const {
+ return (req_type() == request_type::randread) || (req_type() == request_type::randwrite);
+ }
+
+ uint64_t get_pos() {
+ uint64_t pos;
+ if (is_random()) {
+ pos = _pos_distribution(random_generator) * req_size();
+ } else {
+ pos = _last_pos + req_size();
+ if (is_sequential() && (pos >= _config.file_size)) {
+ pos = 0;
+ }
+ }
+ _last_pos = pos;
+ return pos + _offset;
+ }
+
+ void add_result(size_t data, std::chrono::microseconds latency) {
+ _data += data;
+ _latencies(latency.count());
+ _requests++;
+ }
+
+public:
+ virtual void emit_results(YAML::Emitter& out) = 0;
+};
+
+class io_class_data : public class_data {
+protected:
+ bool _is_dev_null = false;
+
+ future<size_t> on_io_completed(future<size_t> f) {
+ if (!_is_dev_null) {
+ return f;
+ }
+
+ return f.then([this] (auto size_f) {
+ return make_ready_future<size_t>(this->req_size());
+ });
+ }
+
+public:
+ io_class_data(job_config cfg) : class_data(std::move(cfg)) {}
+
+ future<> do_start(sstring path, directory_entry_type type) override {
+ if (type == directory_entry_type::directory) {
+ return do_start_on_directory(path);
+ }
+
+ if (type == directory_entry_type::block_device) {
+ return do_start_on_bdev(path);
+ }
+
+ if (type == directory_entry_type::char_device && path == "/dev/null") {
+ return do_start_on_dev_null();
+ }
+
+ throw std::runtime_error(format("Unsupported storage. {} should be directory or block device", path));
+ }
+
+private:
+ future<> do_start_on_directory(sstring dir) {
+ auto fname = format("{}/test-{}-{:d}", dir, name(), this_shard_id());
+ auto flags = open_flags::rw | open_flags::create;
+ if (_config.options.dsync) {
+ flags |= open_flags::dsync;
+ }
+ file_open_options options;
+ options.extent_allocation_size_hint = _config.file_size;
+ options.append_is_unlikely = true;
+ return open_file_dma(fname, flags, options).then([this, fname] (auto f) {
+ _file = f;
+ auto maybe_remove_file = [] (sstring fname) {
+ return keep_files ? make_ready_future<>() : remove_file(fname);
+ };
+ return maybe_remove_file(fname).then([this] {
+ return _file.size().then([this] (uint64_t size) {
+ return _file.truncate(_config.file_size).then([this, size] {
+ if (size >= _config.file_size) {
+ return make_ready_future<>();
+ }
+
+ auto bufsize = 256ul << 10;
+ return do_with(boost::irange(0ul, (_config.file_size / bufsize) + 1), [this, bufsize] (auto& pos) mutable {
+ return max_concurrent_for_each(pos.begin(), pos.end(), 64, [this, bufsize] (auto pos) mutable {
+ auto bufptr = allocate_aligned_buffer<char>(bufsize, 4096);
+ auto buf = bufptr.get();
+ std::uniform_int_distribution<int> fill('@', '~');
+ memset(buf, fill(random_generator), bufsize);
+ pos = pos * bufsize;
+ return _file.dma_write(pos, buf, bufsize).finally([this, bufptr = std::move(bufptr), pos] {
+ if ((this->req_type() == request_type::append) && (pos > _last_pos)) {
+ _last_pos = pos;
+ }
+ }).discard_result();
+ });
+ }).then([this] {
+ return _file.flush();
+ });
+ });
+ });
+ });
+ });
+ }
+
+ future<> do_start_on_bdev(sstring name) {
+ auto flags = open_flags::rw;
+ if (_config.options.dsync) {
+ flags |= open_flags::dsync;
+ }
+
+ return open_file_dma(name, flags).then([this] (auto f) {
+ _file = std::move(f);
+ return _file.size().then([this] (uint64_t size) {
+ auto shard_area_size = align_down<uint64_t>(size / smp::count, 1 << 20);
+ if (_config.offset_in_bdev + _config.file_size > shard_area_size) {
+ throw std::runtime_error("Data doesn't fit the blockdevice");
+ }
+ _offset = shard_area_size * this_shard_id() + _config.offset_in_bdev;
+ return make_ready_future<>();
+ });
+ });
+ }
+
+ future<> do_start_on_dev_null() {
+ file_open_options options;
+ options.append_is_unlikely = true;
+ return open_file_dma("/dev/null", open_flags::rw, std::move(options)).then([this] (auto f) {
+ _file = std::move(f);
+ _is_dev_null = true;
+ return make_ready_future<>();
+ });
+ }
+
+ void emit_one_metrics(YAML::Emitter& out, sstring m_name) {
+ const auto& values = seastar::metrics::impl::get_value_map();
+ const auto& mf = values.find(m_name);
+ assert(mf != values.end());
+ for (auto&& mi : mf->second) {
+ auto&& cname = mi.first.find("class");
+ if (cname != mi.first.end() && cname->second == name()) {
+ out << YAML::Key << m_name << YAML::Value << mi.second->get_function()().d();
+ }
+ }
+ }
+
+ void emit_metrics(YAML::Emitter& out) {
+ emit_one_metrics(out, "io_queue_total_exec_sec");
+ emit_one_metrics(out, "io_queue_total_delay_sec");
+ emit_one_metrics(out, "io_queue_total_operations");
+ emit_one_metrics(out, "io_queue_starvation_time_sec");
+ emit_one_metrics(out, "io_queue_consumption");
+ emit_one_metrics(out, "io_queue_adjusted_consumption");
+ }
+
+public:
+ virtual void emit_results(YAML::Emitter& out) override {
+ auto throughput_kbs = (total_data() >> 10) / total_duration().count();
+ auto iops = requests() / total_duration().count();
+ out << YAML::Key << "throughput" << YAML::Value << throughput_kbs << YAML::Comment("kB/s");
+ out << YAML::Key << "IOPS" << YAML::Value << iops;
+ out << YAML::Key << "latencies" << YAML::Comment("usec");
+ out << YAML::BeginMap;
+ out << YAML::Key << "average" << YAML::Value << average_latency();
+ for (auto& q: quantiles) {
+ out << YAML::Key << fmt::format("p{}", q) << YAML::Value << quantile_latency(q);
+ }
+ out << YAML::Key << "max" << YAML::Value << max_latency();
+ out << YAML::EndMap;
+ out << YAML::Key << "stats" << YAML::BeginMap;
+ out << YAML::Key << "total_requests" << YAML::Value << requests();
+ emit_metrics(out);
+ out << YAML::EndMap;
+ }
+};
+
+class read_io_class_data : public io_class_data {
+public:
+ read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
+
+ future<size_t> issue_request(char *buf, io_intent* intent) override {
+ auto f = _file.dma_read(this->get_pos(), buf, this->req_size(), _iop, intent);
+ return on_io_completed(std::move(f));
+ }
+};
+
+class write_io_class_data : public io_class_data {
+public:
+ write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
+
+ future<size_t> issue_request(char *buf, io_intent* intent) override {
+ auto f = _file.dma_write(this->get_pos(), buf, this->req_size(), _iop, intent);
+ return on_io_completed(std::move(f));
+ }
+};
+
+class cpu_class_data : public class_data {
+public:
+ cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {}
+
+ future<> do_start(sstring dir, directory_entry_type type) override {
+ return make_ready_future<>();
+ }
+
+ future<size_t> issue_request(char *buf, io_intent* intent) override {
+ // We do want the execution time to be a busy loop, and not just a bunch of
+ // continuations until our time is up: by doing this we can also simulate the behavior
+ // of I/O continuations in the face of reactor stalls.
+ auto start = std::chrono::steady_clock::now();
+ do {
+ } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time);
+ return make_ready_future<size_t>(1);
+ }
+
+ virtual void emit_results(YAML::Emitter& out) override {
+ auto throughput = total_data() / total_duration().count();
+ out << YAML::Key << "throughput" << YAML::Value << throughput;
+ }
+};
+
+std::unique_ptr<class_data> job_config::gen_class_data() {
+ if (type == request_type::cpu) {
+ return std::make_unique<cpu_class_data>(*this);
+ } else if ((type == request_type::seqread) || (type == request_type::randread)) {
+ return std::make_unique<read_io_class_data>(*this);
+ } else {
+ return std::make_unique<write_io_class_data>(*this);
+ }
+}
+
+/// YAML parsing functions
+namespace YAML {
+template<>
+struct convert<byte_size> {
+ static bool decode(const Node& node, byte_size& bs) {
+ auto str = node.as<std::string>();
+ unsigned shift = 0;
+ if (str.back() == 'B') {
+ str.pop_back();
+ shift = std::unordered_map<char, unsigned>{
+ { 'k', 10 },
+ { 'M', 20 },
+ { 'G', 30 },
+ }[str.back()];
+ str.pop_back();
+ }
+ bs.size = (boost::lexical_cast<size_t>(str) << shift);
+ return bs.size >= 512;
+ }
+};
+
+template<>
+struct convert<duration_time> {
+ static bool decode(const Node& node, duration_time& dt) {
+ auto str = node.as<std::string>();
+ if (str == "0") {
+ dt.time = 0ns;
+ return true;
+ }
+ if (str.back() != 's') {
+ return false;
+ }
+ str.pop_back();
+ std::unordered_map<char, std::chrono::duration<float>> unit = {
+ { 'n', 1ns },
+ { 'u', 1us },
+ { 'm', 1ms },
+ };
+
+ if (unit.count(str.back())) {
+ auto u = str.back();
+ str.pop_back();
+ dt.time = (boost::lexical_cast<size_t>(str) * unit[u]);
+ } else {
+ dt.time = (boost::lexical_cast<size_t>(str) * 1s);
+ }
+ return true;
+ }
+};
+
+template<>
+struct convert<shard_config> {
+ static bool decode(const Node& node, shard_config& shards) {
+ try {
+ auto str = node.as<std::string>();
+ return (str == "all");
+ } catch (YAML::TypedBadConversion<std::string>& e) {
+ shards = shard_config(boost::copy_range<std::unordered_set<unsigned>>(node.as<std::vector<unsigned>>()));
+ return true;
+ }
+ return false;
+ }
+};
+
+template<>
+struct convert<request_type> {
+ static bool decode(const Node& node, request_type& rt) {
+ static std::unordered_map<std::string, request_type> mappings = {
+ { "seqread", request_type::seqread },
+ { "seqwrite", request_type::seqwrite},
+ { "randread", request_type::randread },
+ { "randwrite", request_type::randwrite },
+ { "append", request_type::append},
+ { "cpu", request_type::cpu},
+ };
+ auto reqstr = node.as<std::string>();
+ if (!mappings.count(reqstr)) {
+ return false;
+ }
+ rt = mappings[reqstr];
+ return true;
+ }
+};
+
+template<>
+struct convert<shard_info> {
+ static bool decode(const Node& node, shard_info& sl) {
+ if (node["parallelism"]) {
+ sl.parallelism = node["parallelism"].as<unsigned>();
+ }
+ if (node["rps"]) {
+ sl.rps = node["rps"].as<unsigned>();
+ }
+
+ if (node["shares"]) {
+ sl.shares = node["shares"].as<unsigned>();
+ }
+ if (node["bandwidth"]) {
+ sl.bandwidth = node["bandwidth"].as<byte_size>().size;
+ }
+ if (node["reqsize"]) {
+ sl.request_size = node["reqsize"].as<byte_size>().size;
+ }
+ if (node["think_time"]) {
+ sl.think_time = node["think_time"].as<duration_time>().time;
+ }
+ if (node["think_after"]) {
+ sl.think_after = node["think_after"].as<duration_time>().time;
+ }
+ if (node["execution_time"]) {
+ sl.execution_time = node["execution_time"].as<duration_time>().time;
+ }
+ return true;
+ }
+};
+
+template<>
+struct convert<options> {
+ static bool decode(const Node& node, options& op) {
+ if (node["dsync"]) {
+ op.dsync = node["dsync"].as<bool>();
+ }
+ if (node["sleep_type"]) {
+ auto st = node["sleep_type"].as<std::string>();
+ if (st == "busyloop") {
+ op.sleep_fn = busyloop_sleep;
+ } else if (st == "lowres") {
+ op.sleep_fn = timer_sleep<lowres_clock>;
+ } else if (st == "steady") {
+ op.sleep_fn = timer_sleep<std::chrono::steady_clock>;
+ } else {
+ throw std::runtime_error(format("Unknown sleep_type {}", st));
+ }
+ }
+ if (node["pause_distribution"]) {
+ auto pd = node["pause_distribution"].as<std::string>();
+ if (pd == "uniform") {
+ op.pause_fn = make_uniform_pause;
+ } else if (pd == "poisson") {
+ op.pause_fn = make_poisson_pause;
+ } else {
+ throw std::runtime_error(format("Unknown pause_distribution {}", pd));
+ }
+ }
+ return true;
+ }
+};
+
+template<>
+struct convert<job_config> {
+ static bool decode(const Node& node, job_config& cl) {
+ cl.name = node["name"].as<std::string>();
+ cl.type = node["type"].as<request_type>();
+ cl.shard_placement = node["shards"].as<shard_config>();
+ // The data_size is used to divide the available (and effectively
+ // constant) disk space between workloads. Each shard inside the
+ // workload thus uses its portion of the assigned space.
+ if (node["data_size"]) {
+ cl.file_size = node["data_size"].as<byte_size>().size / smp::count;
+ } else {
+ cl.file_size = 1ull << 30; // 1G by default
+ }
+ if (node["shard_info"]) {
+ cl.shard_info = node["shard_info"].as<shard_info>();
+ }
+ if (node["options"]) {
+ cl.options = node["options"].as<options>();
+ }
+ return true;
+ }
+};
+}
+
+/// Each shard has one context, and the context is responsible for creating the classes that should
+/// run in this shard.
+class context {
+ std::vector<std::unique_ptr<class_data>> _cl;
+
+ sstring _dir;
+ directory_entry_type _type;
+ std::chrono::seconds _duration;
+
+ semaphore _finished;
+public:
+ context(sstring dir, directory_entry_type dtype, std::vector<job_config> req_config, unsigned duration)
+ : _cl(boost::copy_range<std::vector<std::unique_ptr<class_data>>>(req_config
+ | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(this_shard_id()); })
+ | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); })
+ ))
+ , _dir(dir)
+ , _type(dtype)
+ , _duration(duration)
+ , _finished(0)
+ {}
+
+ future<> stop() {
+ return parallel_for_each(_cl, [] (std::unique_ptr<class_data>& cl) {
+ return cl->stop();
+ });
+ }
+
+ future<> start() {
+ return parallel_for_each(_cl, [this] (std::unique_ptr<class_data>& cl) {
+ return cl->start(_dir, _type);
+ });
+ }
+
+ future<> issue_requests() {
+ return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr<class_data>& cl) {
+ return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] {
+ _finished.signal(1);
+ });
+ });
+ }
+
+ future<> emit_results(YAML::Emitter& out) {
+ return _finished.wait(_cl.size()).then([this, &out] {
+ for (auto& cl: _cl) {
+ out << YAML::Key << cl->name();
+ out << YAML::BeginMap;
+ cl->emit_results(out);
+ out << YAML::EndMap;
+ }
+ return make_ready_future<>();
+ });
+ }
+};
+
+static void show_results(distributed<context>& ctx) {
+ YAML::Emitter out;
+ out << YAML::BeginDoc;
+ out << YAML::BeginSeq;
+ for (unsigned i = 0; i < smp::count; ++i) {
+ out << YAML::BeginMap;
+ out << YAML::Key << "shard" << YAML::Value << i;
+ ctx.invoke_on(i, [&out] (auto& c) {
+ return c.emit_results(out);
+ }).get();
+ out << YAML::EndMap;
+ }
+ out << YAML::EndSeq;
+ out << YAML::EndDoc;
+ std::cout << out.c_str();
+}
+
+int main(int ac, char** av) {
+ namespace bpo = boost::program_options;
+
+ app_template app;
+ auto opt_add = app.add_options();
+ opt_add
+ ("storage", bpo::value<sstring>()->default_value("."), "directory or block device where to execute the test")
+ ("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test")
+ ("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification")
+ ("keep-files", bpo::value<bool>()->default_value(false), "keep test files, next run may re-use them")
+ ;
+
+ distributed<context> ctx;
+ return app.run(ac, av, [&] {
+ return seastar::async([&] {
+ auto& opts = app.configuration();
+ auto& storage = opts["storage"].as<sstring>();
+
+ auto st_type = engine().file_type(storage).get0();
+
+ if (!st_type) {
+ throw std::runtime_error(format("Unknown storage {}", storage));
+ }
+
+ if (*st_type == directory_entry_type::directory) {
+ auto fs = file_system_at(storage).get0();
+ if (fs != fs_type::xfs) {
+ throw std::runtime_error(format("This is a performance test. {} is not on XFS", storage));
+ }
+ }
+
+ keep_files = opts["keep-files"].as<bool>();
+ auto& duration = opts["duration"].as<unsigned>();
+ auto& yaml = opts["conf"].as<sstring>();
+ YAML::Node doc = YAML::LoadFile(yaml);
+ auto reqs = doc.as<std::vector<job_config>>();
+
+ parallel_for_each(reqs, [] (auto& r) {
+ return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) {
+ r.shard_info.scheduling_group = sg;
+ });
+ }).get();
+
+ if (*st_type == directory_entry_type::block_device) {
+ uint64_t off = 0;
+ for (job_config& r : reqs) {
+ r.offset_in_bdev = off;
+ off += r.file_size;
+ }
+ }
+
+ ctx.start(storage, *st_type, reqs, duration).get0();
+ engine().at_exit([&ctx] {
+ return ctx.stop();
+ });
+ std::cout << "Creating initial files..." << std::endl;
+ ctx.invoke_on_all([] (auto& c) {
+ return c.start();
+ }).get();
+ std::cout << "Starting evaluation..." << std::endl;
+ ctx.invoke_on_all([] (auto& c) {
+ return c.issue_requests();
+ }).get();
+ show_results(ctx);
+ ctx.stop().get0();
+ }).or_terminate();
+ });
+}