From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/seastar/apps/io_tester/CMakeLists.txt | 27 ++ src/seastar/apps/io_tester/conf.yaml | 25 ++ src/seastar/apps/io_tester/io_tester.cc | 612 ++++++++++++++++++++++++++++++ 3 files changed, 664 insertions(+) create mode 100644 src/seastar/apps/io_tester/CMakeLists.txt create mode 100644 src/seastar/apps/io_tester/conf.yaml create mode 100644 src/seastar/apps/io_tester/io_tester.cc (limited to 'src/seastar/apps/io_tester') diff --git a/src/seastar/apps/io_tester/CMakeLists.txt b/src/seastar/apps/io_tester/CMakeLists.txt new file mode 100644 index 00000000..266ff4e5 --- /dev/null +++ b/src/seastar/apps/io_tester/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +seastar_add_app (io_tester + SOURCES io_tester.cc) + +target_link_libraries (app_io_tester + PRIVATE yaml-cpp::yaml-cpp) diff --git a/src/seastar/apps/io_tester/conf.yaml b/src/seastar/apps/io_tester/conf.yaml new file mode 100644 index 00000000..e8b8091c --- /dev/null +++ b/src/seastar/apps/io_tester/conf.yaml @@ -0,0 +1,25 @@ +- name: big_writes + shards: all + type: seqwrite + shard_info: + parallelism: 10 + reqsize: 256kB + shares: 10 + think_time: 0 + +- name: latency_reads + shards: [0] + type: randread + shard_info: + parallelism: 1 + reqsize: 512 + shares: 100 + think_time: 1000us + +- name: cpu_hog + shards: [0] + type: cpu + shard_info: + parallelim: 1 + execution_time: 90us + think_time: 10us diff --git a/src/seastar/apps/io_tester/io_tester.cc b/src/seastar/apps/io_tester/io_tester.cc new file mode 100644 index 00000000..16b64d38 --- /dev/null +++ b/src/seastar/apps/io_tester/io_tester.cc @@ -0,0 +1,612 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2017 ScyllaDB + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace seastar; +using namespace std::chrono_literals; +using namespace boost::accumulators; + +static auto random_seed = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); +static std::default_random_engine random_generator(random_seed); +// size of each individual file. Every class will have its file, so in a normal system with many shards, we'll naturally have many files and +// that will push the data out of the disk's cache. And static sizes per file are simpler. +static constexpr uint64_t file_data_size = 1ull << 30; + +struct context; +enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu }; + +namespace std { + +template <> +struct hash { + size_t operator() (const request_type& type) const { + return static_cast(type); + } +}; + +} + +struct byte_size { + uint64_t size; +}; + +struct duration_time { + std::chrono::duration time; +}; + +class shard_config { + std::unordered_set _shards; +public: + shard_config() + : _shards(boost::copy_range>(boost::irange(0u, smp::count))) {} + shard_config(std::unordered_set s) : _shards(std::move(s)) {} + + bool is_set(unsigned cpu) const { + return _shards.count(cpu); + } +}; + +struct shard_info { + unsigned parallelism = 10; + unsigned shares = 10; + uint64_t request_size = 4 << 10; + std::chrono::duration think_time = 0ms; + std::chrono::duration execution_time = 1ms; + seastar::scheduling_group scheduling_group = seastar::default_scheduling_group(); +}; + +class class_data; + +struct job_config { + std::string name; + request_type type; + shard_config shard_placement; + ::shard_info shard_info; + std::unique_ptr gen_class_data(); +}; + +std::array quantiles = { 0.5, 0.95, 0.99, 0.999}; + +class class_data { +protected: + using accumulator_type = accumulator_set>; + + job_config _config; + uint64_t _alignment; + uint64_t _last_pos = 0; + + io_priority_class _iop; + seastar::scheduling_group _sg; + + size_t _data = 0; + std::chrono::duration _total_duration; + + std::chrono::steady_clock::time_point _start = {}; + accumulator_type _latencies; + std::uniform_int_distribution _pos_distribution; + file _file; + + virtual future<> do_start(sstring dir) = 0; + virtual future issue_request(char *buf) = 0; +public: + static int idgen(); + class_data(job_config cfg) + : _config(std::move(cfg)) + , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512) + , _iop(engine().register_one_priority_class(format("test-class-{:d}", idgen()), _config.shard_info.shares)) + , _sg(cfg.shard_info.scheduling_group) + , _latencies(extended_p_square_probabilities = quantiles) + , _pos_distribution(0, file_data_size / _config.shard_info.request_size) + {} + + future<> issue_requests(std::chrono::steady_clock::time_point stop) { + _start = std::chrono::steady_clock::now(); + return with_scheduling_group(_sg, [this, stop] { + return parallel_for_each(boost::irange(0u, parallelism()), [this, stop] (auto dummy) mutable { + auto bufptr = allocate_aligned_buffer(this->req_size(), _alignment); + auto buf = bufptr.get(); + return do_until([this, stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable { + auto start = std::chrono::steady_clock::now(); + return issue_request(buf).then([this, start, stop] (auto size) { + auto now = std::chrono::steady_clock::now(); + if (now < stop) { + this->add_result(size, std::chrono::duration_cast(now - start)); + } + return think(); + }); + }).finally([bufptr = std::move(bufptr)] {}); + }); + }).then([this] { + _total_duration = std::chrono::steady_clock::now() - _start; + }); + } + + future<> think() { + if (_config.shard_info.think_time > 0us) { + return seastar::sleep(std::chrono::duration_cast(_config.shard_info.think_time)); + } else { + return make_ready_future<>(); + } + } + // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting + // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate + // this file differently depending on the type: + // + // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF + // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF + // random reads : will read the file at random positions, between 0 and EOF + // random writes : will overwrite the file at a random position, between 0 and EOF + // append : will write to the file from pos = EOF onwards, always appending to the end. + // cpu : CPU-only load, file is not created. + future<> start(sstring dir) { + return do_start(dir); + } +protected: + sstring type_str() const { + return std::unordered_map{ + { request_type::seqread, "SEQ READ" }, + { request_type::seqwrite, "SEQ WRITE" }, + { request_type::randread, "RAND READ" }, + { request_type::randwrite, "RAND WRITE" }, + { request_type::append , "APPEND" }, + { request_type::cpu , "CPU" }, + }[_config.type];; + } + + const sstring name() const { + return _config.name; + } + + request_type req_type() const { + return _config.type; + } + + sstring think_time() const { + if (_config.shard_info.think_time == std::chrono::duration(0)) { + return "NO think time"; + } else { + return format("{:d} us think time", std::chrono::duration_cast(_config.shard_info.think_time).count()); + } + } + + size_t req_size() const { + return _config.shard_info.request_size; + } + + unsigned parallelism() const { + return _config.shard_info.parallelism; + } + + unsigned shares() const { + return _config.shard_info.shares; + } + + std::chrono::duration total_duration() const { + return _total_duration; + } + + uint64_t total_data() const { + return _data; + } + + uint64_t max_latency() const { + return max(_latencies); + } + + uint64_t average_latency() const { + return mean(_latencies); + } + + uint64_t quantile_latency(double q) const { + return quantile(_latencies, quantile_probability = q); + } + + bool is_sequential() const { + return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite); + } + bool is_random() const { + return (req_type() == request_type::randread) || (req_type() == request_type::randwrite); + } + + uint64_t get_pos() { + uint64_t pos; + if (is_random()) { + pos = _pos_distribution(random_generator) * req_size(); + } else { + pos = _last_pos + req_size(); + if (is_sequential() && (pos >= file_data_size)) { + pos = 0; + } + } + _last_pos = pos; + return pos; + } + + void add_result(size_t data, std::chrono::microseconds latency) { + _data += data; + _latencies(latency.count()); + } + +public: + virtual sstring describe_class() = 0; + virtual sstring describe_results() = 0; +}; + +class io_class_data : public class_data { +public: + io_class_data(job_config cfg) : class_data(std::move(cfg)) {} + + future<> do_start(sstring dir) override { + auto fname = format("{}/test-{}-{:d}", dir, name(), engine().cpu_id()); + return open_file_dma(fname, open_flags::rw | open_flags::create | open_flags::truncate).then([this, fname] (auto f) { + _file = f; + return remove_file(fname); + }).then([this, fname] { + return do_with(seastar::semaphore(64), [this] (auto& write_parallelism) mutable { + auto bufsize = 256ul << 10; + auto pos = boost::irange(0ul, (file_data_size / bufsize) + 1); + return parallel_for_each(pos.begin(), pos.end(), [this, bufsize, &write_parallelism] (auto pos) mutable { + return get_units(write_parallelism, 1).then([this, bufsize, pos] (auto perm) mutable { + auto bufptr = allocate_aligned_buffer(bufsize, 4096); + auto buf = bufptr.get(); + std::uniform_int_distribution fill('@', '~'); + memset(buf, fill(random_generator), bufsize); + pos = pos * bufsize; + return _file.dma_write(pos, buf, bufsize).finally([this, bufsize, bufptr = std::move(bufptr), perm = std::move(perm), pos] { + if ((this->req_type() == request_type::append) && (pos > _last_pos)) { + _last_pos = pos; + } + }).discard_result(); + }); + }); + }); + }).then([this] { + return _file.flush(); + }); + } + + virtual sstring describe_class() override { + return fmt::format("{}: {} shares, {}-byte {}, {} concurrent requests, {}", name(), shares(), req_size(), type_str(), parallelism(), think_time()); + } + + virtual sstring describe_results() override { + auto throughput_kbs = (total_data() >> 10) / total_duration().count(); + sstring result; + result += fmt::format(" Throughput : {:>8} KB/s\n", throughput_kbs); + result += fmt::format(" Lat average : {:>8} usec\n", average_latency()); + for (auto& q: quantiles) { + result += fmt::format(" Lat quantile={:>5} : {:>8} usec\n", q, quantile_latency(q)); + } + result += fmt::format(" Lat max : {:>8} usec\n", max_latency()); + return result; + } +}; + +class read_io_class_data : public io_class_data { +public: + read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} + + future issue_request(char *buf) override { + return _file.dma_read(this->get_pos(), buf, this->req_size(), _iop); + } +}; + +class write_io_class_data : public io_class_data { +public: + write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} + + future issue_request(char *buf) override { + return _file.dma_write(this->get_pos(), buf, this->req_size(), _iop); + } +}; + +class cpu_class_data : public class_data { +public: + cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {} + + future<> do_start(sstring dir) override { + return make_ready_future<>(); + } + + future issue_request(char *buf) override { + // We do want the execution time to be a busy loop, and not just a bunch of + // continuations until our time is up: by doing this we can also simulate the behavior + // of I/O continuations in the face of reactor stalls. + auto start = std::chrono::steady_clock::now(); + do { + } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time); + return make_ready_future(1); + } + + virtual sstring describe_class() override { + auto exec = std::chrono::duration_cast(_config.shard_info.execution_time); + return fmt::format("{}: {} shares, {} us CPU execution time, {} concurrent requests, {}", name(), shares(), exec.count(), parallelism(), think_time()); + } + + virtual sstring describe_results() override { + auto throughput = total_data() / total_duration().count(); + return fmt::format(" Throughput : {:>8} continuations/s\n", throughput); + } +}; + +std::unique_ptr job_config::gen_class_data() { + if (type == request_type::cpu) { + return std::make_unique(*this); + } else if ((type == request_type::seqread) || (type == request_type::randread)) { + return std::make_unique(*this); + } else { + return std::make_unique(*this); + } +} + +/// YAML parsing functions +namespace YAML { +template<> +struct convert { + static bool decode(const Node& node, byte_size& bs) { + auto str = node.as(); + unsigned shift = 0; + if (str.back() == 'B') { + str.pop_back(); + shift = std::unordered_map{ + { 'k', 10 }, + { 'M', 20 }, + { 'G', 30 }, + }[str.back()]; + str.pop_back(); + } + bs.size = (boost::lexical_cast(str) << shift); + return bs.size >= 512; + } +}; + +template<> +struct convert { + static bool decode(const Node& node, duration_time& dt) { + auto str = node.as(); + if (str == "0") { + dt.time = 0ns; + return true; + } + if (str.back() != 's') { + return false; + } + str.pop_back(); + std::unordered_map> unit = { + { 'n', 1ns }, + { 'u', 1us }, + { 'm', 1ms }, + }; + + if (unit.count(str.back())) { + auto u = str.back(); + str.pop_back(); + dt.time = (boost::lexical_cast(str) * unit[u]); + } else { + dt.time = (boost::lexical_cast(str) * 1s); + } + return true; + } +}; + +template<> +struct convert { + static bool decode(const Node& node, shard_config& shards) { + try { + auto str = node.as(); + return (str == "all"); + } catch (YAML::TypedBadConversion& e) { + shards = shard_config(boost::copy_range>(node.as>())); + return true; + } + return false; + } +}; + +template<> +struct convert { + static bool decode(const Node& node, request_type& rt) { + static std::unordered_map mappings = { + { "seqread", request_type::seqread }, + { "seqwrite", request_type::seqwrite}, + { "randread", request_type::randread }, + { "randwrite", request_type::randwrite }, + { "append", request_type::append}, + { "cpu", request_type::cpu}, + }; + auto reqstr = node.as(); + if (!mappings.count(reqstr)) { + return false; + } + rt = mappings[reqstr]; + return true; + } +}; + +template<> +struct convert { + static bool decode(const Node& node, shard_info& sl) { + if (node["parallelism"]) { + sl.parallelism = node["parallelism"].as(); + } + if (node["shares"]) { + sl.shares = node["shares"].as(); + } + if (node["reqsize"]) { + sl.request_size = node["reqsize"].as().size; + } + if (node["think_time"]) { + sl.think_time = node["think_time"].as().time; + } + if (node["execution_time"]) { + sl.execution_time = node["execution_time"].as().time; + } + return true; + } +}; + +template<> +struct convert { + static bool decode(const Node& node, job_config& cl) { + cl.name = node["name"].as(); + cl.type = node["type"].as(); + cl.shard_placement = node["shards"].as(); + if (node["shard_info"]) { + cl.shard_info = node["shard_info"].as(); + } + return true; + } +}; +} + +/// Each shard has one context, and the context is responsible for creating the classes that should +/// run in this shard. +class context { + std::vector> _cl; + + sstring _dir; + std::chrono::seconds _duration; + + semaphore _finished; +public: + context(sstring dir, std::vector req_config, unsigned duration) + : _cl(boost::copy_range>>(req_config + | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(engine().cpu_id()); }) + | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); }) + )) + , _dir(dir) + , _duration(duration) + , _finished(0) + {} + + future<> stop() { return make_ready_future<>(); } + + future<> start() { + return parallel_for_each(_cl, [this] (std::unique_ptr& cl) { + return cl->start(_dir); + }); + } + + future<> issue_requests() { + return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr& cl) { + return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] { + _finished.signal(1); + }); + }); + } + + future<> print_stats() { + return _finished.wait(_cl.size()).then([this] { + fmt::print("Shard {:>2}\n", engine().cpu_id()); + auto idx = 0; + for (auto& cl: _cl) { + fmt::print("Class {:>2} ({})\n", idx++, cl->describe_class()); + fmt::print("{}\n", cl->describe_results()); + } + return make_ready_future<>(); + }); + } +}; + +int class_data::idgen() { + static thread_local int id = 0; + return id++; +} + +int main(int ac, char** av) { + namespace bpo = boost::program_options; + + app_template app; + auto opt_add = app.add_options(); + opt_add + ("directory", bpo::value()->default_value("."), "directory where to execute the test") + ("duration", bpo::value()->default_value(10), "for how long (in seconds) to run the test") + ("conf", bpo::value()->default_value("./conf.yaml"), "YAML file containing benchmark specification") + ; + + distributed ctx; + return app.run(ac, av, [&] { + return seastar::async([&] { + auto& opts = app.configuration(); + auto& directory = opts["directory"].as(); + + auto fs = file_system_at(directory).get0(); + if (fs != fs_type::xfs) { + throw std::runtime_error(format("This is a performance test. {} is not on XFS", directory)); + } + + auto& duration = opts["duration"].as(); + auto& yaml = opts["conf"].as(); + YAML::Node doc = YAML::LoadFile(yaml); + auto reqs = doc.as>(); + + parallel_for_each(reqs, [] (auto& r) { + return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) { + r.shard_info.scheduling_group = sg; + }); + }).get(); + + ctx.start(directory, reqs, duration).get0(); + engine().at_exit([&ctx] { + return ctx.stop(); + }); + std::cout << "Creating initial files..." << std::endl; + ctx.invoke_on_all([] (auto& c) { + return c.start(); + }).get(); + std::cout << "Starting evaluation..." << std::endl; + ctx.invoke_on_all([] (auto& c) { + return c.issue_requests(); + }).get(); + for (unsigned i = 0; i < smp::count; ++i) { + ctx.invoke_on(i, [] (auto& c) { + return c.print_stats(); + }).get(); + } + }).or_terminate(); + }); +} -- cgit v1.2.3