diff options
Diffstat (limited to 'src/seastar/apps')
-rw-r--r-- | src/seastar/apps/CMakeLists.txt | 56 | ||||
-rw-r--r-- | src/seastar/apps/httpd/CMakeLists.txt | 37 | ||||
-rw-r--r-- | src/seastar/apps/httpd/demo.json | 73 | ||||
-rw-r--r-- | src/seastar/apps/httpd/main.cc | 91 | ||||
-rw-r--r-- | src/seastar/apps/io_tester/CMakeLists.txt | 27 | ||||
-rw-r--r-- | src/seastar/apps/io_tester/conf.yaml | 25 | ||||
-rw-r--r-- | src/seastar/apps/io_tester/io_tester.cc | 612 | ||||
-rw-r--r-- | src/seastar/apps/iotune/CMakeLists.txt | 29 | ||||
-rw-r--r-- | src/seastar/apps/iotune/iotune.cc | 694 | ||||
-rw-r--r-- | src/seastar/apps/memcached/CMakeLists.txt | 49 | ||||
-rw-r--r-- | src/seastar/apps/memcached/ascii.rl | 154 | ||||
-rw-r--r-- | src/seastar/apps/memcached/memcache.cc | 1464 | ||||
-rw-r--r-- | src/seastar/apps/memcached/memcached.hh | 74 | ||||
-rw-r--r-- | src/seastar/apps/memcached/tests/CMakeLists.txt | 75 | ||||
-rwxr-xr-x | src/seastar/apps/memcached/tests/test.py | 49 | ||||
-rw-r--r-- | src/seastar/apps/memcached/tests/test_ascii_parser.cc | 335 | ||||
-rwxr-xr-x | src/seastar/apps/memcached/tests/test_memcached.py | 600 | ||||
-rw-r--r-- | src/seastar/apps/seawreck/CMakeLists.txt | 24 | ||||
-rw-r--r-- | src/seastar/apps/seawreck/seawreck.cc | 225 |
19 files changed, 4693 insertions, 0 deletions
diff --git a/src/seastar/apps/CMakeLists.txt b/src/seastar/apps/CMakeLists.txt new file mode 100644 index 00000000..26e0e03b --- /dev/null +++ b/src/seastar/apps/CMakeLists.txt @@ -0,0 +1,56 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +# Logical target for all applications. +add_custom_target (apps) + +macro (seastar_add_app name) + set (args ${ARGN}) + + cmake_parse_arguments ( + parsed_args + "" + "" + "SOURCES" + ${args}) + + set (target app_${name}) + add_executable (${target} ${parsed_args_SOURCES}) + + target_include_directories (${target} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + + target_link_libraries (${target} + PRIVATE seastar_with_flags) + + set_target_properties (${target} + PROPERTIES + OUTPUT_NAME ${name}) + + add_dependencies (apps ${target}) +endmacro () + +add_subdirectory (httpd) +add_subdirectory (io_tester) +add_subdirectory (iotune) +add_subdirectory (memcached) +add_subdirectory (seawreck) diff --git a/src/seastar/apps/httpd/CMakeLists.txt b/src/seastar/apps/httpd/CMakeLists.txt new file mode 100644 index 00000000..d8c48197 --- /dev/null +++ b/src/seastar/apps/httpd/CMakeLists.txt @@ -0,0 +1,37 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +seastar_generate_swagger ( + TARGET app_httpd_swagger + VAR app_httpd_swagger_file + IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/demo.json + OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/demo.json.hh) + +seastar_add_app (httpd + SOURCES + ${app_httpd_swagger_file} + main.cc) + +target_include_directories (app_httpd + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + +add_dependencies (app_httpd app_httpd_swagger) diff --git a/src/seastar/apps/httpd/demo.json b/src/seastar/apps/httpd/demo.json new file mode 100644 index 00000000..12261c45 --- /dev/null +++ b/src/seastar/apps/httpd/demo.json @@ -0,0 +1,73 @@ +{ + "apiVersion": "0.0.1", + "swaggerVersion": "1.2", + "basePath": "{{Protocol}}://{{Host}}", + "resourcePath": "/hello", + "produces": [ + "application/json" + ], + "apis": [ + { + "path": "/hello/world/{var1}/{var2}", + "operations": [ + { + "method": "GET", + "summary": "Returns the number of seconds since the system was booted", + "type": "long", + "nickname": "hello_world", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name":"var2", + "description":"Full path of file or directory", + "required":true, + "allowMultiple":true, + "type":"string", + "paramType":"path" + }, + { + "name":"var1", + "description":"Full path of file or directory", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"query_enum", + "description":"The operation to perform", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query", + "enum":["VAL1", "VAL2", "VAL3"] + } + ] + } + ] + } + ], + "models" : { + "my_object": { + "id": "my_object", + "description": "Demonstrate an object", + "properties": { + "var1": { + "type": "string", + "description": "The first parameter in the path" + }, + "var2": { + "type": "string", + "description": "The second parameter in the path" + }, + "enum_var" : { + "type": "string", + "description": "Demonstrate an enum returned, note this is not the same enum type of the request", + "enum":["VAL1", "VAL2", "VAL3"] + } + } + } + } +} diff --git a/src/seastar/apps/httpd/main.cc b/src/seastar/apps/httpd/main.cc new file mode 100644 index 00000000..62af9434 --- /dev/null +++ b/src/seastar/apps/httpd/main.cc @@ -0,0 +1,91 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ + +#include <seastar/http/httpd.hh> +#include <seastar/http/handlers.hh> +#include <seastar/http/function_handlers.hh> +#include <seastar/http/file_handler.hh> +#include "demo.json.hh" +#include <seastar/http/api_docs.hh> + +namespace bpo = boost::program_options; + +using namespace seastar; +using namespace httpd; + +class handl : public httpd::handler_base { +public: + virtual future<std::unique_ptr<reply> > handle(const sstring& path, + std::unique_ptr<request> req, std::unique_ptr<reply> rep) { + rep->_content = "hello"; + rep->done("html"); + return make_ready_future<std::unique_ptr<reply>>(std::move(rep)); + } +}; + +void set_routes(routes& r) { + function_handler* h1 = new function_handler([](const_req req) { + return "hello"; + }); + function_handler* h2 = new function_handler([](std::unique_ptr<request> req) { + return make_ready_future<json::json_return_type>("json-future"); + }); + r.add(operation_type::GET, url("/"), h1); + r.add(operation_type::GET, url("/jf"), h2); + r.add(operation_type::GET, url("/file").remainder("path"), + new directory_handler("/")); + demo_json::hello_world.set(r, [] (const_req req) { + demo_json::my_object obj; + obj.var1 = req.param.at("var1"); + obj.var2 = req.param.at("var2"); + demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum")); + // This demonstrate enum conversion + obj.enum_var = v; + return obj; + }); +} + +int main(int ac, char** av) { + app_template app; + app.add_options()("port", bpo::value<uint16_t>()->default_value(10000), + "HTTP Server port"); + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto server = new http_server_control(); + auto rb = make_shared<api_registry_builder>("apps/httpd/"); + server->start().then([server] { + return server->set_routes(set_routes); + }).then([server, rb]{ + return server->set_routes([rb](routes& r){rb->set_api_doc(r);}); + }).then([server, rb]{ + return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");}); + }).then([server, port] { + return server->listen(port); + }).then([server, port] { + std::cout << "Seastar HTTP server listening on port " << port << " ...\n"; + engine().at_exit([server] { + return server->stop(); + }); + }); + + }); +} diff --git a/src/seastar/apps/io_tester/CMakeLists.txt b/src/seastar/apps/io_tester/CMakeLists.txt new file mode 100644 index 00000000..266ff4e5 --- /dev/null +++ b/src/seastar/apps/io_tester/CMakeLists.txt @@ -0,0 +1,27 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +seastar_add_app (io_tester + SOURCES io_tester.cc) + +target_link_libraries (app_io_tester + PRIVATE yaml-cpp::yaml-cpp) diff --git a/src/seastar/apps/io_tester/conf.yaml b/src/seastar/apps/io_tester/conf.yaml new file mode 100644 index 00000000..e8b8091c --- /dev/null +++ b/src/seastar/apps/io_tester/conf.yaml @@ -0,0 +1,25 @@ +- name: big_writes + shards: all + type: seqwrite + shard_info: + parallelism: 10 + reqsize: 256kB + shares: 10 + think_time: 0 + +- name: latency_reads + shards: [0] + type: randread + shard_info: + parallelism: 1 + reqsize: 512 + shares: 100 + think_time: 1000us + +- name: cpu_hog + shards: [0] + type: cpu + shard_info: + parallelim: 1 + execution_time: 90us + think_time: 10us diff --git a/src/seastar/apps/io_tester/io_tester.cc b/src/seastar/apps/io_tester/io_tester.cc new file mode 100644 index 00000000..16b64d38 --- /dev/null +++ b/src/seastar/apps/io_tester/io_tester.cc @@ -0,0 +1,612 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2017 ScyllaDB + */ +#include <seastar/core/app-template.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/future.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/file.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/align.hh> +#include <seastar/core/timer.hh> +#include <seastar/core/thread.hh> +#include <chrono> +#include <vector> +#include <boost/range/irange.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/accumulators/accumulators.hpp> +#include <boost/accumulators/statistics/stats.hpp> +#include <boost/accumulators/statistics/max.hpp> +#include <boost/accumulators/statistics/mean.hpp> +#include <boost/accumulators/statistics/p_square_quantile.hpp> +#include <boost/accumulators/statistics/extended_p_square.hpp> +#include <boost/accumulators/statistics/extended_p_square_quantile.hpp> +#include <boost/range/adaptor/filtered.hpp> +#include <boost/range/adaptor/map.hpp> +#include <boost/array.hpp> +#include <iomanip> +#include <random> +#include <yaml-cpp/yaml.h> + +using namespace seastar; +using namespace std::chrono_literals; +using namespace boost::accumulators; + +static auto random_seed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); +static std::default_random_engine random_generator(random_seed); +// size of each individual file. Every class will have its file, so in a normal system with many shards, we'll naturally have many files and +// that will push the data out of the disk's cache. And static sizes per file are simpler. +static constexpr uint64_t file_data_size = 1ull << 30; + +struct context; +enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu }; + +namespace std { + +template <> +struct hash<request_type> { + size_t operator() (const request_type& type) const { + return static_cast<size_t>(type); + } +}; + +} + +struct byte_size { + uint64_t size; +}; + +struct duration_time { + std::chrono::duration<float> time; +}; + +class shard_config { + std::unordered_set<unsigned> _shards; +public: + shard_config() + : _shards(boost::copy_range<std::unordered_set<unsigned>>(boost::irange(0u, smp::count))) {} + shard_config(std::unordered_set<unsigned> s) : _shards(std::move(s)) {} + + bool is_set(unsigned cpu) const { + return _shards.count(cpu); + } +}; + +struct shard_info { + unsigned parallelism = 10; + unsigned shares = 10; + uint64_t request_size = 4 << 10; + std::chrono::duration<float> think_time = 0ms; + std::chrono::duration<float> execution_time = 1ms; + seastar::scheduling_group scheduling_group = seastar::default_scheduling_group(); +}; + +class class_data; + +struct job_config { + std::string name; + request_type type; + shard_config shard_placement; + ::shard_info shard_info; + std::unique_ptr<class_data> gen_class_data(); +}; + +std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999}; + +class class_data { +protected: + using accumulator_type = accumulator_set<double, stats<tag::extended_p_square_quantile(quadratic), tag::mean, tag::max>>; + + job_config _config; + uint64_t _alignment; + uint64_t _last_pos = 0; + + io_priority_class _iop; + seastar::scheduling_group _sg; + + size_t _data = 0; + std::chrono::duration<float> _total_duration; + + std::chrono::steady_clock::time_point _start = {}; + accumulator_type _latencies; + std::uniform_int_distribution<uint32_t> _pos_distribution; + file _file; + + virtual future<> do_start(sstring dir) = 0; + virtual future<size_t> issue_request(char *buf) = 0; +public: + static int idgen(); + class_data(job_config cfg) + : _config(std::move(cfg)) + , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512) + , _iop(engine().register_one_priority_class(format("test-class-{:d}", idgen()), _config.shard_info.shares)) + , _sg(cfg.shard_info.scheduling_group) + , _latencies(extended_p_square_probabilities = quantiles) + , _pos_distribution(0, file_data_size / _config.shard_info.request_size) + {} + + future<> issue_requests(std::chrono::steady_clock::time_point stop) { + _start = std::chrono::steady_clock::now(); + return with_scheduling_group(_sg, [this, stop] { + return parallel_for_each(boost::irange(0u, parallelism()), [this, stop] (auto dummy) mutable { + auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment); + auto buf = bufptr.get(); + return do_until([this, stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable { + auto start = std::chrono::steady_clock::now(); + return issue_request(buf).then([this, start, stop] (auto size) { + auto now = std::chrono::steady_clock::now(); + if (now < stop) { + this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start)); + } + return think(); + }); + }).finally([bufptr = std::move(bufptr)] {}); + }); + }).then([this] { + _total_duration = std::chrono::steady_clock::now() - _start; + }); + } + + future<> think() { + if (_config.shard_info.think_time > 0us) { + return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time)); + } else { + return make_ready_future<>(); + } + } + // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting + // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate + // this file differently depending on the type: + // + // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF + // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF + // random reads : will read the file at random positions, between 0 and EOF + // random writes : will overwrite the file at a random position, between 0 and EOF + // append : will write to the file from pos = EOF onwards, always appending to the end. + // cpu : CPU-only load, file is not created. + future<> start(sstring dir) { + return do_start(dir); + } +protected: + sstring type_str() const { + return std::unordered_map<request_type, sstring>{ + { request_type::seqread, "SEQ READ" }, + { request_type::seqwrite, "SEQ WRITE" }, + { request_type::randread, "RAND READ" }, + { request_type::randwrite, "RAND WRITE" }, + { request_type::append , "APPEND" }, + { request_type::cpu , "CPU" }, + }[_config.type];; + } + + const sstring name() const { + return _config.name; + } + + request_type req_type() const { + return _config.type; + } + + sstring think_time() const { + if (_config.shard_info.think_time == std::chrono::duration<float>(0)) { + return "NO think time"; + } else { + return format("{:d} us think time", std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time).count()); + } + } + + size_t req_size() const { + return _config.shard_info.request_size; + } + + unsigned parallelism() const { + return _config.shard_info.parallelism; + } + + unsigned shares() const { + return _config.shard_info.shares; + } + + std::chrono::duration<float> total_duration() const { + return _total_duration; + } + + uint64_t total_data() const { + return _data; + } + + uint64_t max_latency() const { + return max(_latencies); + } + + uint64_t average_latency() const { + return mean(_latencies); + } + + uint64_t quantile_latency(double q) const { + return quantile(_latencies, quantile_probability = q); + } + + bool is_sequential() const { + return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite); + } + bool is_random() const { + return (req_type() == request_type::randread) || (req_type() == request_type::randwrite); + } + + uint64_t get_pos() { + uint64_t pos; + if (is_random()) { + pos = _pos_distribution(random_generator) * req_size(); + } else { + pos = _last_pos + req_size(); + if (is_sequential() && (pos >= file_data_size)) { + pos = 0; + } + } + _last_pos = pos; + return pos; + } + + void add_result(size_t data, std::chrono::microseconds latency) { + _data += data; + _latencies(latency.count()); + } + +public: + virtual sstring describe_class() = 0; + virtual sstring describe_results() = 0; +}; + +class io_class_data : public class_data { +public: + io_class_data(job_config cfg) : class_data(std::move(cfg)) {} + + future<> do_start(sstring dir) override { + auto fname = format("{}/test-{}-{:d}", dir, name(), engine().cpu_id()); + return open_file_dma(fname, open_flags::rw | open_flags::create | open_flags::truncate).then([this, fname] (auto f) { + _file = f; + return remove_file(fname); + }).then([this, fname] { + return do_with(seastar::semaphore(64), [this] (auto& write_parallelism) mutable { + auto bufsize = 256ul << 10; + auto pos = boost::irange(0ul, (file_data_size / bufsize) + 1); + return parallel_for_each(pos.begin(), pos.end(), [this, bufsize, &write_parallelism] (auto pos) mutable { + return get_units(write_parallelism, 1).then([this, bufsize, pos] (auto perm) mutable { + auto bufptr = allocate_aligned_buffer<char>(bufsize, 4096); + auto buf = bufptr.get(); + std::uniform_int_distribution<char> fill('@', '~'); + memset(buf, fill(random_generator), bufsize); + pos = pos * bufsize; + return _file.dma_write(pos, buf, bufsize).finally([this, bufsize, bufptr = std::move(bufptr), perm = std::move(perm), pos] { + if ((this->req_type() == request_type::append) && (pos > _last_pos)) { + _last_pos = pos; + } + }).discard_result(); + }); + }); + }); + }).then([this] { + return _file.flush(); + }); + } + + virtual sstring describe_class() override { + return fmt::format("{}: {} shares, {}-byte {}, {} concurrent requests, {}", name(), shares(), req_size(), type_str(), parallelism(), think_time()); + } + + virtual sstring describe_results() override { + auto throughput_kbs = (total_data() >> 10) / total_duration().count(); + sstring result; + result += fmt::format(" Throughput : {:>8} KB/s\n", throughput_kbs); + result += fmt::format(" Lat average : {:>8} usec\n", average_latency()); + for (auto& q: quantiles) { + result += fmt::format(" Lat quantile={:>5} : {:>8} usec\n", q, quantile_latency(q)); + } + result += fmt::format(" Lat max : {:>8} usec\n", max_latency()); + return result; + } +}; + +class read_io_class_data : public io_class_data { +public: + read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} + + future<size_t> issue_request(char *buf) override { + return _file.dma_read(this->get_pos(), buf, this->req_size(), _iop); + } +}; + +class write_io_class_data : public io_class_data { +public: + write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} + + future<size_t> issue_request(char *buf) override { + return _file.dma_write(this->get_pos(), buf, this->req_size(), _iop); + } +}; + +class cpu_class_data : public class_data { +public: + cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {} + + future<> do_start(sstring dir) override { + return make_ready_future<>(); + } + + future<size_t> issue_request(char *buf) override { + // We do want the execution time to be a busy loop, and not just a bunch of + // continuations until our time is up: by doing this we can also simulate the behavior + // of I/O continuations in the face of reactor stalls. + auto start = std::chrono::steady_clock::now(); + do { + } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time); + return make_ready_future<size_t>(1); + } + + virtual sstring describe_class() override { + auto exec = std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.execution_time); + return fmt::format("{}: {} shares, {} us CPU execution time, {} concurrent requests, {}", name(), shares(), exec.count(), parallelism(), think_time()); + } + + virtual sstring describe_results() override { + auto throughput = total_data() / total_duration().count(); + return fmt::format(" Throughput : {:>8} continuations/s\n", throughput); + } +}; + +std::unique_ptr<class_data> job_config::gen_class_data() { + if (type == request_type::cpu) { + return std::make_unique<cpu_class_data>(*this); + } else if ((type == request_type::seqread) || (type == request_type::randread)) { + return std::make_unique<read_io_class_data>(*this); + } else { + return std::make_unique<write_io_class_data>(*this); + } +} + +/// YAML parsing functions +namespace YAML { +template<> +struct convert<byte_size> { + static bool decode(const Node& node, byte_size& bs) { + auto str = node.as<std::string>(); + unsigned shift = 0; + if (str.back() == 'B') { + str.pop_back(); + shift = std::unordered_map<char, unsigned>{ + { 'k', 10 }, + { 'M', 20 }, + { 'G', 30 }, + }[str.back()]; + str.pop_back(); + } + bs.size = (boost::lexical_cast<size_t>(str) << shift); + return bs.size >= 512; + } +}; + +template<> +struct convert<duration_time> { + static bool decode(const Node& node, duration_time& dt) { + auto str = node.as<std::string>(); + if (str == "0") { + dt.time = 0ns; + return true; + } + if (str.back() != 's') { + return false; + } + str.pop_back(); + std::unordered_map<char, std::chrono::duration<float>> unit = { + { 'n', 1ns }, + { 'u', 1us }, + { 'm', 1ms }, + }; + + if (unit.count(str.back())) { + auto u = str.back(); + str.pop_back(); + dt.time = (boost::lexical_cast<size_t>(str) * unit[u]); + } else { + dt.time = (boost::lexical_cast<size_t>(str) * 1s); + } + return true; + } +}; + +template<> +struct convert<shard_config> { + static bool decode(const Node& node, shard_config& shards) { + try { + auto str = node.as<std::string>(); + return (str == "all"); + } catch (YAML::TypedBadConversion<std::string>& e) { + shards = shard_config(boost::copy_range<std::unordered_set<unsigned>>(node.as<std::vector<unsigned>>())); + return true; + } + return false; + } +}; + +template<> +struct convert<request_type> { + static bool decode(const Node& node, request_type& rt) { + static std::unordered_map<std::string, request_type> mappings = { + { "seqread", request_type::seqread }, + { "seqwrite", request_type::seqwrite}, + { "randread", request_type::randread }, + { "randwrite", request_type::randwrite }, + { "append", request_type::append}, + { "cpu", request_type::cpu}, + }; + auto reqstr = node.as<std::string>(); + if (!mappings.count(reqstr)) { + return false; + } + rt = mappings[reqstr]; + return true; + } +}; + +template<> +struct convert<shard_info> { + static bool decode(const Node& node, shard_info& sl) { + if (node["parallelism"]) { + sl.parallelism = node["parallelism"].as<unsigned>(); + } + if (node["shares"]) { + sl.shares = node["shares"].as<unsigned>(); + } + if (node["reqsize"]) { + sl.request_size = node["reqsize"].as<byte_size>().size; + } + if (node["think_time"]) { + sl.think_time = node["think_time"].as<duration_time>().time; + } + if (node["execution_time"]) { + sl.execution_time = node["execution_time"].as<duration_time>().time; + } + return true; + } +}; + +template<> +struct convert<job_config> { + static bool decode(const Node& node, job_config& cl) { + cl.name = node["name"].as<std::string>(); + cl.type = node["type"].as<request_type>(); + cl.shard_placement = node["shards"].as<shard_config>(); + if (node["shard_info"]) { + cl.shard_info = node["shard_info"].as<shard_info>(); + } + return true; + } +}; +} + +/// Each shard has one context, and the context is responsible for creating the classes that should +/// run in this shard. +class context { + std::vector<std::unique_ptr<class_data>> _cl; + + sstring _dir; + std::chrono::seconds _duration; + + semaphore _finished; +public: + context(sstring dir, std::vector<job_config> req_config, unsigned duration) + : _cl(boost::copy_range<std::vector<std::unique_ptr<class_data>>>(req_config + | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(engine().cpu_id()); }) + | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); }) + )) + , _dir(dir) + , _duration(duration) + , _finished(0) + {} + + future<> stop() { return make_ready_future<>(); } + + future<> start() { + return parallel_for_each(_cl, [this] (std::unique_ptr<class_data>& cl) { + return cl->start(_dir); + }); + } + + future<> issue_requests() { + return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr<class_data>& cl) { + return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] { + _finished.signal(1); + }); + }); + } + + future<> print_stats() { + return _finished.wait(_cl.size()).then([this] { + fmt::print("Shard {:>2}\n", engine().cpu_id()); + auto idx = 0; + for (auto& cl: _cl) { + fmt::print("Class {:>2} ({})\n", idx++, cl->describe_class()); + fmt::print("{}\n", cl->describe_results()); + } + return make_ready_future<>(); + }); + } +}; + +int class_data::idgen() { + static thread_local int id = 0; + return id++; +} + +int main(int ac, char** av) { + namespace bpo = boost::program_options; + + app_template app; + auto opt_add = app.add_options(); + opt_add + ("directory", bpo::value<sstring>()->default_value("."), "directory where to execute the test") + ("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test") + ("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification") + ; + + distributed<context> ctx; + return app.run(ac, av, [&] { + return seastar::async([&] { + auto& opts = app.configuration(); + auto& directory = opts["directory"].as<sstring>(); + + auto fs = file_system_at(directory).get0(); + if (fs != fs_type::xfs) { + throw std::runtime_error(format("This is a performance test. {} is not on XFS", directory)); + } + + auto& duration = opts["duration"].as<unsigned>(); + auto& yaml = opts["conf"].as<sstring>(); + YAML::Node doc = YAML::LoadFile(yaml); + auto reqs = doc.as<std::vector<job_config>>(); + + parallel_for_each(reqs, [] (auto& r) { + return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) { + r.shard_info.scheduling_group = sg; + }); + }).get(); + + ctx.start(directory, reqs, duration).get0(); + engine().at_exit([&ctx] { + return ctx.stop(); + }); + std::cout << "Creating initial files..." << std::endl; + ctx.invoke_on_all([] (auto& c) { + return c.start(); + }).get(); + std::cout << "Starting evaluation..." << std::endl; + ctx.invoke_on_all([] (auto& c) { + return c.issue_requests(); + }).get(); + for (unsigned i = 0; i < smp::count; ++i) { + ctx.invoke_on(i, [] (auto& c) { + return c.print_stats(); + }).get(); + } + }).or_terminate(); + }); +} diff --git a/src/seastar/apps/iotune/CMakeLists.txt b/src/seastar/apps/iotune/CMakeLists.txt new file mode 100644 index 00000000..18788c14 --- /dev/null +++ b/src/seastar/apps/iotune/CMakeLists.txt @@ -0,0 +1,29 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +seastar_add_app (iotune + SOURCES iotune.cc) + +target_link_libraries (app_iotune + PRIVATE + StdFilesystem::filesystem + yaml-cpp::yaml-cpp) diff --git a/src/seastar/apps/iotune/iotune.cc b/src/seastar/apps/iotune/iotune.cc new file mode 100644 index 00000000..32886073 --- /dev/null +++ b/src/seastar/apps/iotune/iotune.cc @@ -0,0 +1,694 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2018 ScyllaDB + * + * The goal of this program is to allow a user to properly configure the Seastar I/O + * scheduler. + */ +#include <iostream> +#include <chrono> +#include <random> +#include <memory> +#include <vector> +#include <cmath> +#include <sys/vfs.h> +#include <sys/sysmacros.h> +#include <boost/filesystem.hpp> +#include <boost/range/irange.hpp> +#include <boost/program_options.hpp> +#include <boost/iterator/counting_iterator.hpp> +#include <fstream> +#include <wordexp.h> +#include <yaml-cpp/yaml.h> +#include <seastar/core/thread.hh> +#include <seastar/core/sstring.hh> +#include <seastar/core/posix.hh> +#include <seastar/core/resource.hh> +#include <seastar/core/aligned_buffer.hh> +#include <seastar/core/sharded.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/fsqual.hh> +#include <seastar/util/defer.hh> +#include <seastar/util/log.hh> +#include <seastar/util/std-compat.hh> +#include <seastar/util/read_first_line.hh> + +using namespace seastar; +using namespace std::chrono_literals; +namespace fs = std::experimental::filesystem; + +logger iotune_logger("iotune"); + +using iotune_clock = std::chrono::steady_clock; +static thread_local std::default_random_engine random_generator(std::chrono::duration_cast<std::chrono::nanoseconds>(iotune_clock::now().time_since_epoch()).count()); + +template <typename Type> +Type read_sys_file_as(fs::path sys_file) { + return boost::lexical_cast<Type>(read_first_line(sys_file)); +} + +void check_device_properties(fs::path dev_sys_file) { + auto sched_file = dev_sys_file / "queue" / "scheduler"; + auto sched_string = read_first_line(sched_file); + auto beg = sched_string.find('['); + size_t len = sched_string.size(); + if (beg == sstring::npos) { + beg = 0; + } else { + auto end = sched_string.find(']'); + if (end != sstring::npos) { + len = end - beg - 1; + } + beg++; + } + auto scheduler = sched_string.substr(beg, len); + if ((scheduler != "noop") && (scheduler != "none")) { + iotune_logger.warn("Scheduler for {} set to {}. It is recommend to set it to noop before evaluation so as not to skew the results.", + sched_file.string(), scheduler); + } + + auto nomerges_file = dev_sys_file / "queue" / "nomerges"; + auto nomerges = read_sys_file_as<unsigned>(nomerges_file); + if (nomerges != 2u) { + iotune_logger.warn("nomerges for {} set to {}. It is recommend to set it to 2 before evaluation so that merges are disabled. Results can be skewed otherwise.", + nomerges_file.string(), nomerges); + } +} + +struct evaluation_directory { + sstring _name; + // We know that if we issue more than this, they will be blocked on linux anyway. + unsigned _max_iodepth = 0; + uint64_t _available_space; + uint64_t _min_data_transfer_size = 512; + unsigned _disks_per_array = 0; + + void scan_device(unsigned dev_maj, unsigned dev_min) { + scan_device(fmt::format("{}:{}", dev_maj, dev_min)); + } + + void scan_device(std::string dev_str) { + scan_device(fs::path("/sys/dev/block") / dev_str); + } + + void scan_device(fs::path sys_file) { + try { + sys_file = fs::canonical(sys_file); + bool is_leaf = true; + if (fs::exists(sys_file / "slaves")) { + for (auto& dev : fs::directory_iterator(sys_file / "slaves")) { + is_leaf = false; + scan_device(read_first_line(dev / "dev")); + } + } + + // our work is done if not leaf. We'll tune the leaves + if (!is_leaf) { + return; + } + + if (fs::exists(sys_file / "partition")) { + scan_device(sys_file.remove_filename()); + } else { + check_device_properties(sys_file); + auto queue_dir = sys_file / "queue"; + auto disk_min_io_size = read_sys_file_as<uint64_t>(queue_dir / "minimum_io_size"); + + _min_data_transfer_size = std::max(_min_data_transfer_size, disk_min_io_size); + _max_iodepth += read_sys_file_as<uint64_t>(queue_dir / "nr_requests"); + _disks_per_array++; + } + } catch (std::system_error& se) { + iotune_logger.error("Error while parsing sysfs. Will continue with guessed values: {}", se.what()); + _max_iodepth = 128; + } + _disks_per_array = std::max(_disks_per_array, 1u); + } +public: + evaluation_directory(sstring name) + : _name(name) + , _available_space(fs::space(fs::path(_name)).available) + {} + + unsigned max_iodepth() const { + return _max_iodepth; + } + + fs::path path() const { + return fs::path(_name); + } + + const sstring& name() const { + return _name; + } + + unsigned disks_per_array() const { + return _disks_per_array; + } + + uint64_t minimum_io_size() const { + return _min_data_transfer_size; + } + + future<> discover_directory() { + return seastar::async([this] { + auto f = open_directory(_name).get0(); + auto st = f.stat().get0(); + f.close().get(); + + scan_device(major(st.st_dev), minor(st.st_dev)); + }); + } + + uint64_t available_space() const { + return _available_space; + } +}; + +struct io_rates { + float bytes_per_sec = 0; + float iops = 0; + io_rates operator+(const io_rates& a) const { + return io_rates{bytes_per_sec + a.bytes_per_sec, iops + a.iops}; + } + + io_rates& operator+=(const io_rates& a) { + bytes_per_sec += a.bytes_per_sec; + iops += a.iops; + return *this; + } +}; + +class invalid_position : public std::exception { +public: + virtual const char* what() const noexcept { + return "file access position invalid"; + } +}; + +struct position_generator { + virtual uint64_t get_pos() = 0; + virtual bool is_sequential() const = 0; + virtual ~position_generator() {} +}; + +class sequential_issuer : public position_generator { + size_t _buffer_size; + uint64_t _position = 0; + uint64_t _size_limit; +public: + sequential_issuer(size_t buffer_size, uint64_t size_limit) + : _buffer_size(buffer_size) + , _size_limit(size_limit) + {} + + virtual bool is_sequential() const { + return true; + } + + virtual uint64_t get_pos() { + if (_position >= _size_limit) { + throw invalid_position(); + } + auto pos = _position; + _position += _buffer_size; + return pos; + } +}; + +class random_issuer : public position_generator { + size_t _buffer_size; + uint64_t _last_position; + std::uniform_int_distribution<uint64_t> _pos_distribution; +public: + random_issuer(size_t buffer_size, uint64_t last_position) + : _buffer_size(buffer_size) + , _last_position(last_position) + , _pos_distribution(0, (last_position / buffer_size) - 1) + {} + + virtual bool is_sequential() const { + return false; + } + + virtual uint64_t get_pos() { + uint64_t pos = _pos_distribution(random_generator) * _buffer_size; + if (pos >= _last_position) { + throw invalid_position(); + } + return pos; + } +}; + +class request_issuer { +public: + virtual future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) = 0; + virtual ~request_issuer() {} +}; + + +class write_request_issuer : public request_issuer { + file _file; +public: + explicit write_request_issuer(file f) : _file(f) {} + future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) override { + return _file.dma_write(pos, buf, size); + } +}; + +class read_request_issuer : public request_issuer { + file _file; +public: + explicit read_request_issuer(file f) : _file(f) {} + future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) override { + return _file.dma_read(pos, buf, size); + } +}; + +class io_worker { + uint64_t _bytes = 0; + unsigned _requests = 0; + size_t _buffer_size; + std::chrono::duration<double> _duration; + std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _start_measuring; + std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_measuring; + std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_load; + // track separately because in the sequential case we may exhaust the file before _duration + std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _last_time_seen; + + std::unique_ptr<position_generator> _pos_impl; + std::unique_ptr<request_issuer> _req_impl; +public: + bool is_sequential() const { + return _pos_impl->is_sequential(); + } + + bool should_stop() const { + return iotune_clock::now() >= _end_load; + } + + io_worker(size_t buffer_size, std::chrono::duration<double> duration, std::unique_ptr<request_issuer> reqs, std::unique_ptr<position_generator> pos) + : _buffer_size(buffer_size) + , _duration(duration) + , _start_measuring(iotune_clock::now() + std::chrono::duration<double>(10ms)) + , _end_measuring(_start_measuring + duration) + , _end_load(_end_measuring + 10ms) + , _last_time_seen(_start_measuring) + , _pos_impl(std::move(pos)) + , _req_impl(std::move(reqs)) + {} + + std::unique_ptr<char[], free_deleter> get_buffer() { + return allocate_aligned_buffer<char>(_buffer_size, _buffer_size); + } + + future<> issue_request(char* buf) { + return _req_impl->issue_request(_pos_impl->get_pos(), buf, _buffer_size).then([this] (size_t size) { + auto now = iotune_clock::now(); + if ((now > _start_measuring) && (now < _end_measuring)) { + _last_time_seen = now; + _bytes += size; + _requests++; + } + }); + } + + uint64_t bytes() const { + return _bytes; + } + + io_rates get_io_rates() const { + io_rates rates; + auto t = _last_time_seen - _start_measuring; + if (!t.count()) { + throw std::runtime_error("No data collected"); + } + rates.bytes_per_sec = _bytes / t.count(); + rates.iops = _requests / t.count(); + return rates; + } +}; + +class test_file { +public: + enum class pattern { sequential, random }; +private: + fs::path _dirpath; + uint64_t _file_size; + file _file; + + std::unique_ptr<position_generator> get_position_generator(size_t buffer_size, pattern access_pattern) { + if (access_pattern == pattern::sequential) { + return std::make_unique<sequential_issuer>(buffer_size, _file_size); + } else { + return std::make_unique<random_issuer>(buffer_size, _file_size); + } + } +public: + test_file(const ::evaluation_directory& dir, uint64_t maximum_size) + : _dirpath(dir.path() / fs::path(fmt::format("ioqueue-discovery-{}", engine().cpu_id()))) + , _file_size(maximum_size) + {} + + future<> create_data_file() { + // XFS likes access in many directories better. + return make_directory(_dirpath.string()).then([this] { + auto testfile = _dirpath / fs::path("testfile"); + file_open_options options; + options.extent_allocation_size_hint = _file_size; + return open_file_dma(testfile.string(), open_flags::rw | open_flags::create, std::move(options)).then([this, testfile] (file file) { + _file = file; + return remove_file(testfile.string()).then([this] { + return remove_file(_dirpath.string()); + }); + }).then([this] { + return _file.truncate(_file_size); + }); + }); + } + + future<io_rates> do_workload(std::unique_ptr<io_worker> worker_ptr, unsigned max_os_concurrency, bool update_file_size = false) { + if (update_file_size) { + _file_size = 0; + } + + auto worker = worker_ptr.get(); + auto concurrency = boost::irange<unsigned, unsigned>(0, max_os_concurrency, 1); + return parallel_for_each(std::move(concurrency), [this, worker] (unsigned idx) { + auto bufptr = worker->get_buffer(); + auto buf = bufptr.get(); + return do_until([worker] { return worker->should_stop(); }, [this, buf, worker, idx] { + return worker->issue_request(buf); + }).finally([this, alive = std::move(bufptr)] {}); + }).then_wrapped([this, worker = std::move(worker_ptr), update_file_size] (future<> f) { + try { + f.get(); + } catch (invalid_position& ip) { + // expected if sequential. Example: reading and the file ended. + if (!worker->is_sequential()) { + throw; + } + } + + if (update_file_size) { + _file_size = worker->bytes(); + } + return make_ready_future<io_rates>(worker->get_io_rates()); + }); + } + + future<io_rates> read_workload(size_t buffer_size, pattern access_pattern, unsigned max_os_concurrency, std::chrono::duration<double> duration) { + buffer_size = std::max(buffer_size, _file.disk_read_dma_alignment()); + auto worker = std::make_unique<io_worker>(buffer_size, duration, std::make_unique<read_request_issuer>(_file), get_position_generator(buffer_size, access_pattern)); + return do_workload(std::move(worker), max_os_concurrency); + } + + future<io_rates> write_workload(size_t buffer_size, pattern access_pattern, unsigned max_os_concurrency, std::chrono::duration<double> duration) { + buffer_size = std::max(buffer_size, _file.disk_write_dma_alignment()); + auto worker = std::make_unique<io_worker>(buffer_size, duration, std::make_unique<write_request_issuer>(_file), get_position_generator(buffer_size, access_pattern)); + bool update_file_size = worker->is_sequential(); + return do_workload(std::move(worker), max_os_concurrency, update_file_size).then([this] (io_rates r) { + return _file.flush().then([r = std::move(r)] () mutable { + return make_ready_future<io_rates>(std::move(r)); + }); + }); + } + + future<> stop() { + return make_ready_future<>(); + } +}; + +class iotune_multi_shard_context { + ::evaluation_directory _test_directory; + + unsigned per_shard_io_depth() const { + auto iodepth = _test_directory.max_iodepth() / smp::count; + if (engine().cpu_id() < _test_directory.max_iodepth() % smp::count) { + iodepth++; + } + return std::min(iodepth, 128u); + } + seastar::sharded<test_file> _iotune_test_file; +public: + future<> stop() { + return _iotune_test_file.stop(); + } + + future<> start() { + return _iotune_test_file.start(_test_directory, _test_directory.available_space() / (2 * smp::count)); + } + + future<> create_data_file() { + return _iotune_test_file.invoke_on_all([this] (test_file& tf) { + return tf.create_data_file(); + }); + } + + future<io_rates> write_sequential_data(unsigned shard, size_t buffer_size, std::chrono::duration<double> duration) { + return _iotune_test_file.invoke_on(shard, [this, buffer_size, duration] (test_file& tf) { + return tf.write_workload(buffer_size, test_file::pattern::sequential, 4 * _test_directory.disks_per_array(), duration); + }); + } + + future<io_rates> read_sequential_data(unsigned shard, size_t buffer_size, std::chrono::duration<double> duration) { + return _iotune_test_file.invoke_on(shard, [this, buffer_size, duration] (test_file& tf) { + return tf.read_workload(buffer_size, test_file::pattern::sequential, 4 * _test_directory.disks_per_array(), duration); + }); + } + + future<io_rates> write_random_data(size_t buffer_size, std::chrono::duration<double> duration) { + return _iotune_test_file.map_reduce0([buffer_size, this, duration] (test_file& tf) { + return tf.write_workload(buffer_size, test_file::pattern::random, per_shard_io_depth(), duration); + }, io_rates(), std::plus<io_rates>()); + } + + future<io_rates> read_random_data(size_t buffer_size, std::chrono::duration<double> duration) { + return _iotune_test_file.map_reduce0([buffer_size, this, duration] (test_file& tf) { + return tf.read_workload(buffer_size, test_file::pattern::random, per_shard_io_depth(), duration); + }, io_rates(), std::plus<io_rates>()); + } + + iotune_multi_shard_context(::evaluation_directory dir) + : _test_directory(dir) + {} +}; + +struct disk_descriptor { + std::string mountpoint; + uint64_t read_iops; + uint64_t read_bw; + uint64_t write_iops; + uint64_t write_bw; +}; + +void string_to_file(sstring conf_file, sstring buf) { + auto f = file_desc::open(conf_file, O_WRONLY | O_CLOEXEC | O_CREAT | O_TRUNC, 0664); + auto ret = f.write(buf.data(), buf.size()); + if (!ret || (*ret != buf.size())) { + throw std::runtime_error(fmt::format("Can't write {}: {}", conf_file, *ret)); + } +} + +void write_configuration_file(sstring conf_file, std::string format, sstring properties_file) { + sstring buf; + if (format == "seastar") { + buf = fmt::format("io-properties-file={}\n", properties_file); + } else { + buf = fmt::format("SEASTAR_IO=\"--io-properties-file={}\"\n", properties_file); + } + string_to_file(conf_file, buf); +} + +void write_property_file(sstring conf_file, struct std::vector<disk_descriptor> disk_descriptors) { + YAML::Emitter out; + out << YAML::BeginMap; + out << YAML::Key << "disks"; + out << YAML::BeginSeq; + for (auto& desc : disk_descriptors) { + out << YAML::BeginMap; + out << YAML::Key << "mountpoint" << YAML::Value << desc.mountpoint; + out << YAML::Key << "read_iops" << YAML::Value << desc.read_iops; + out << YAML::Key << "read_bandwidth" << YAML::Value << desc.read_bw; + out << YAML::Key << "write_iops" << YAML::Value << desc.write_iops; + out << YAML::Key << "write_bandwidth" << YAML::Value << desc.write_bw; + out << YAML::EndMap; + } + out << YAML::EndSeq; + out << YAML::EndMap; + out << YAML::Newline; + + string_to_file(conf_file, sstring(out.c_str(), out.size())); +} + +// Returns the mountpoint of a path. It works by walking backwards from the canonical path +// (absolute, with symlinks resolved), until we find a point that crosses a device ID. +fs::path mountpoint_of(sstring filename) { + fs::path mnt_candidate = fs::canonical(fs::path(filename)); + compat::optional<dev_t> candidate_id = {}; + auto current = mnt_candidate; + do { + auto f = open_directory(current.string()).get0(); + auto st = f.stat().get0(); + if ((candidate_id) && (*candidate_id != st.st_dev)) { + return mnt_candidate; + } + mnt_candidate = current; + candidate_id = st.st_dev; + current = current.parent_path(); + } while (!current.empty()); + + return mnt_candidate; +} + +int main(int ac, char** av) { + namespace bpo = boost::program_options; + bool fs_check = false; + + app_template::config app_cfg; + app_cfg.name = "IOTune"; + + app_template app(std::move(app_cfg)); + auto opt_add = app.add_options(); + opt_add + ("evaluation-directory", bpo::value<std::vector<sstring>>()->required(), "directory where to execute the evaluation") + ("properties-file", bpo::value<sstring>(), "path in which to write the YAML file") + ("options-file", bpo::value<sstring>(), "path in which to write the legacy conf file") + ("duration", bpo::value<unsigned>()->default_value(120), "time, in seconds, for which to run the test") + ("format", bpo::value<sstring>()->default_value("seastar"), "Configuration file format (seastar | envfile)") + ("fs-check", bpo::bool_switch(&fs_check), "perform FS check only") + ; + + return app.run(ac, av, [&] { + return seastar::async([&] { + auto& configuration = app.configuration(); + auto eval_dirs = configuration["evaluation-directory"].as<std::vector<sstring>>(); + auto format = configuration["format"].as<sstring>(); + auto duration = std::chrono::duration<double>(configuration["duration"].as<unsigned>() * 1s); + + struct std::vector<disk_descriptor> disk_descriptors; + std::unordered_map<sstring, sstring> mountpoint_map; + // We want to evaluate once per mountpoint, but we still want to write in one of the + // directories that we were provided - we may not have permissions to write into the + // mountpoint itself. If we are passed more than one directory per mountpoint, we don't + // really care to which one we write, so this simple hash will do. + for (auto& eval_dir : eval_dirs) { + mountpoint_map[mountpoint_of(eval_dir).string()] = eval_dir; + } + for (auto eval: mountpoint_map) { + auto mountpoint = eval.first; + auto eval_dir = eval.second; + + if (filesystem_has_good_aio_support(eval_dir, false) == false) { + iotune_logger.error("Exception when qualifying filesystem at {}", eval_dir); + return 1; + } + + auto rec = 10000000000ULL; + auto avail = fs_avail(eval_dir).get0(); + if (avail < rec) { + uint64_t val; + const char* units; + if (avail >= 1000000000) { + val = (avail + 500000000) / 1000000000; + units = "GB"; + } else if (avail >= 1000000) { + val = (avail + 500000) / 1000000; + units = "MB"; + } else { + val = avail; + units = "bytes"; + } + iotune_logger.warn("Available space on filesystem at {}: {} {}: is less than recommended: {} GB", + eval_dir, val, units, rec / 1000000000ULL); + } + + iotune_logger.info("{} passed sanity checks", eval_dir); + if (fs_check) { + return 0; + } + + // Directory is the same object for all tests. + ::evaluation_directory test_directory(eval_dir); + test_directory.discover_directory().get(); + + ::iotune_multi_shard_context iotune_tests(test_directory); + iotune_tests.start().get(); + iotune_tests.create_data_file().get(); + + auto stop = defer([&iotune_tests] { + iotune_tests.stop().get(); + }); + + fmt::print("Starting Evaluation. This may take a while...\n"); + fmt::print("Measuring sequential write bandwidth: "); + std::cout.flush(); + io_rates write_bw; + size_t sequential_buffer_size = 1 << 20; + for (unsigned shard = 0; shard < smp::count; ++shard) { + write_bw += iotune_tests.write_sequential_data(shard, sequential_buffer_size, duration * 0.70 / smp::count).get0(); + } + write_bw.bytes_per_sec /= smp::count; + fmt::print("{} MB/s\n", uint64_t(write_bw.bytes_per_sec / (1024 * 1024))); + + fmt::print("Measuring sequential read bandwidth: "); + std::cout.flush(); + auto read_bw = iotune_tests.read_sequential_data(0, sequential_buffer_size, duration * 0.1).get0(); + fmt::print("{} MB/s\n", uint64_t(read_bw.bytes_per_sec / (1024 * 1024))); + + fmt::print("Measuring random write IOPS: "); + std::cout.flush(); + auto write_iops = iotune_tests.write_random_data(test_directory.minimum_io_size(), duration * 0.1).get0(); + fmt::print("{} IOPS\n", uint64_t(write_iops.iops)); + + fmt::print("Measuring random read IOPS: "); + std::cout.flush(); + auto read_iops = iotune_tests.read_random_data(test_directory.minimum_io_size(), duration * 0.1).get0(); + fmt::print("{} IOPS\n", uint64_t(read_iops.iops)); + + struct disk_descriptor desc; + desc.mountpoint = mountpoint; + desc.read_iops = read_iops.iops; + desc.read_bw = read_bw.bytes_per_sec; + desc.write_iops = write_iops.iops; + desc.write_bw = write_bw.bytes_per_sec; + disk_descriptors.push_back(std::move(desc)); + } + + auto file = "properties file"; + try { + if (configuration.count("properties-file")) { + fmt::print("Writing result to {}\n", configuration["properties-file"].as<sstring>()); + write_property_file(configuration["properties-file"].as<sstring>(), disk_descriptors); + } + + file = "configuration file"; + if (configuration.count("options-file")) { + fmt::print("Writing result to {}\n", configuration["options-file"].as<sstring>()); + write_configuration_file(configuration["options-file"].as<sstring>(), format, configuration["properties-file"].as<sstring>()); + } + } catch (...) { + iotune_logger.error("Exception when writing {}: {}.\nPlease add the above values manually to your seastar command line.", file, std::current_exception()); + return 1; + } + return 0; + }); + }); +} diff --git a/src/seastar/apps/memcached/CMakeLists.txt b/src/seastar/apps/memcached/CMakeLists.txt new file mode 100644 index 00000000..ec213fd0 --- /dev/null +++ b/src/seastar/apps/memcached/CMakeLists.txt @@ -0,0 +1,49 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +set (Seastar_APP_MEMCACHED_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}) +set (Seastar_APP_MEMCACHED_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}) + +seastar_generate_ragel ( + TARGET app_memcached_ascii + VAR app_memcached_ascii_file + IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/ascii.rl + OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/ascii.hh) + +seastar_add_app (memcached + SOURCES + ${app_memcached_ascii_file} + memcache.cc + memcached.hh) + +target_include_directories (app_memcached + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) + +add_dependencies (app_memcached app_memcached_ascii) + +# +# Tests. +# + +if (Seastar_TESTING) + add_subdirectory (tests) +endif () diff --git a/src/seastar/apps/memcached/ascii.rl b/src/seastar/apps/memcached/ascii.rl new file mode 100644 index 00000000..04d161ff --- /dev/null +++ b/src/seastar/apps/memcached/ascii.rl @@ -0,0 +1,154 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/core/ragel.hh> +#include "memcached.hh" +#include <memory> +#include <algorithm> +#include <functional> + +using namespace seastar; + +%%{ + +machine memcache_ascii_protocol; + +access _fsm_; + +action mark { + g.mark_start(p); +} + +action start_blob { + g.mark_start(p); + _size_left = _size; +} + +action advance_blob { + auto len = std::min((uint32_t)(pe - p), _size_left); + _size_left -= len; + p += len; + if (_size_left == 0) { + _blob = str(); + p--; + fret; + } + p--; +} + +crlf = '\r\n'; +sp = ' '; +u32 = digit+ >{ _u32 = 0; } ${ _u32 *= 10; _u32 += fc - '0'; }; +u64 = digit+ >{ _u64 = 0; } ${ _u64 *= 10; _u64 += fc - '0'; }; +key = [^ ]+ >mark %{ _key = memcache::item_key(str()); }; +flags = digit+ >mark %{ _flags_str = str(); }; +expiration = u32 %{ _expiration = _u32; }; +size = u32 >mark %{ _size = _u32; _size_str = str(); }; +blob := any+ >start_blob $advance_blob; +maybe_noreply = (sp "noreply" @{ _noreply = true; })? >{ _noreply = false; }; +maybe_expiration = (sp expiration)? >{ _expiration = 0; }; +version_field = u64 %{ _version = _u64; }; + +insertion_params = sp key sp flags sp expiration sp size maybe_noreply (crlf @{ fcall blob; } ) crlf; +set = "set" insertion_params @{ _state = state::cmd_set; }; +add = "add" insertion_params @{ _state = state::cmd_add; }; +replace = "replace" insertion_params @{ _state = state::cmd_replace; }; +cas = "cas" sp key sp flags sp expiration sp size sp version_field maybe_noreply (crlf @{ fcall blob; } ) crlf @{ _state = state::cmd_cas; }; +get = "get" (sp key %{ _keys.emplace_back(std::move(_key)); })+ crlf @{ _state = state::cmd_get; }; +gets = "gets" (sp key %{ _keys.emplace_back(std::move(_key)); })+ crlf @{ _state = state::cmd_gets; }; +delete = "delete" sp key maybe_noreply crlf @{ _state = state::cmd_delete; }; +flush = "flush_all" maybe_expiration maybe_noreply crlf @{ _state = state::cmd_flush_all; }; +version = "version" crlf @{ _state = state::cmd_version; }; +stats = "stats" crlf @{ _state = state::cmd_stats; }; +stats_hash = "stats hash" crlf @{ _state = state::cmd_stats_hash; }; +incr = "incr" sp key sp u64 maybe_noreply crlf @{ _state = state::cmd_incr; }; +decr = "decr" sp key sp u64 maybe_noreply crlf @{ _state = state::cmd_decr; }; +main := (add | replace | set | get | gets | delete | flush | version | cas | stats | incr | decr + | stats_hash) >eof{ _state = state::eof; }; + +prepush { + prepush(); +} + +postpop { + postpop(); +} + +}%% + +class memcache_ascii_parser : public ragel_parser_base<memcache_ascii_parser> { + %% write data nofinal noprefix; +public: + enum class state { + error, + eof, + cmd_set, + cmd_cas, + cmd_add, + cmd_replace, + cmd_get, + cmd_gets, + cmd_delete, + cmd_flush_all, + cmd_version, + cmd_stats, + cmd_stats_hash, + cmd_incr, + cmd_decr, + }; + state _state; + uint32_t _u32; + uint64_t _u64; + memcache::item_key _key; + sstring _flags_str; + uint32_t _expiration; + uint32_t _size; + sstring _size_str; + uint32_t _size_left; + uint64_t _version; + sstring _blob; + bool _noreply; + std::vector<memcache::item_key> _keys; +public: + void init() { + init_base(); + _state = state::error; + _keys.clear(); + %% write init; + } + + char* parse(char* p, char* pe, char* eof) { + sstring_builder::guard g(_builder, p, pe); + auto str = [this, &g, &p] { g.mark_end(p); return get_str(); }; + %% write exec; + if (_state != state::error) { + return p; + } + if (p != pe) { + p = pe; + return p; + } + return nullptr; + } + bool eof() const { + return _state == state::eof; + } +}; diff --git a/src/seastar/apps/memcached/memcache.cc b/src/seastar/apps/memcached/memcache.cc new file mode 100644 index 00000000..4cce0e92 --- /dev/null +++ b/src/seastar/apps/memcached/memcache.cc @@ -0,0 +1,1464 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2014-2015 Cloudius Systems + */ + +#include <boost/intrusive/unordered_set.hpp> +#include <boost/intrusive/list.hpp> +#include <boost/intrusive_ptr.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/optional.hpp> +#include <iostream> +#include <iomanip> +#include <sstream> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/timer-set.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/stream.hh> +#include <seastar/core/memory.hh> +#include <seastar/core/units.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/vector-data-sink.hh> +#include <seastar/core/bitops.hh> +#include <seastar/core/slab.hh> +#include <seastar/core/align.hh> +#include <seastar/net/api.hh> +#include <seastar/net/packet-data-source.hh> +#include "ascii.hh" +#include "memcached.hh" +#include <unistd.h> + +#define PLATFORM "seastar" +#define VERSION "v1.0" +#define VERSION_STRING PLATFORM " " VERSION + +using namespace seastar; +using namespace net; + +namespace memcache { + +namespace bi = boost::intrusive; + +static constexpr double default_slab_growth_factor = 1.25; +static constexpr uint64_t default_slab_page_size = 1UL*MB; +static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled. +static __thread slab_allocator<item>* slab; +static thread_local std::unique_ptr<slab_allocator<item>> slab_holder; + +template<typename T> +using optional = boost::optional<T>; + +using clock_type = lowres_clock; + +// +// "Expiration" is a uint32_t value. +// The minimal value of _time is when "expiration" is set to (seconds_in_a_month +// + 1). +// In this case _time will have a value of +// +// (seconds_in_a_month + 1 - Wall_Clock_Time_Since_Epoch) +// +// because lowres_clock now() initialized to zero when the application starts. +// +// We will use a timepoint at LLONG_MIN to represent a "never expire" value +// since it will not collide with the minimum _time value mentioned above for +// about 290 thousand years to come. +// +static constexpr clock_type::time_point never_expire_timepoint = clock_type::time_point(clock_type::duration::min()); + +struct expiration { + using time_point = clock_type::time_point; + using duration = time_point::duration; + + static constexpr uint32_t seconds_in_a_month = 60U * 60 * 24 * 30; + time_point _time = never_expire_timepoint; + + expiration() {} + + expiration(clock_type::duration wc_to_clock_type_delta, uint32_t s) { + using namespace std::chrono; + + static_assert(sizeof(clock_type::duration::rep) >= 8, "clock_type::duration::rep must be at least 8 bytes wide"); + + if (s == 0U) { + return; // means never expire. + } else if (s <= seconds_in_a_month) { + _time = clock_type::now() + seconds(s); // from delta + } else { + // + // seastar::reactor supports only a monotonic clock at the moment + // therefore this may make the elements with the absolute expiration + // time expire at the wrong time if the wall clock has been updated + // during the expiration period. However the original memcached has + // the same weakness. + // + // TODO: Fix this when a support for system_clock-based timers is + // added to the seastar::reactor. + // + _time = time_point(seconds(s) + wc_to_clock_type_delta); // from real time + } + } + + bool ever_expires() { + return _time != never_expire_timepoint; + } + + time_point to_time_point() { + return _time; + } +}; + +class item : public slab_item_base { +public: + using version_type = uint64_t; + using time_point = expiration::time_point; + using duration = expiration::duration; + static constexpr uint8_t field_alignment = alignof(void*); +private: + using hook_type = bi::unordered_set_member_hook<>; + // TODO: align shared data to cache line boundary + version_type _version; + hook_type _cache_link; + bi::list_member_hook<> _timer_link; + size_t _key_hash; + expiration _expiry; + uint32_t _value_size; + uint32_t _slab_page_index; + uint16_t _ref_count; + uint8_t _key_size; + uint8_t _ascii_prefix_size; + char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value. + friend class cache; +public: + item(uint32_t slab_page_index, item_key&& key, sstring&& ascii_prefix, + sstring&& value, expiration expiry, version_type version = 1) + : _version(version) + , _key_hash(key.hash()) + , _expiry(expiry) + , _value_size(value.size()) + , _slab_page_index(slab_page_index) + , _ref_count(0U) + , _key_size(key.key().size()) + , _ascii_prefix_size(ascii_prefix.size()) + { + assert(_key_size <= std::numeric_limits<uint8_t>::max()); + assert(_ascii_prefix_size <= std::numeric_limits<uint8_t>::max()); + // storing key + memcpy(_data, key.key().c_str(), _key_size); + // storing ascii_prefix + memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size); + // storing value + memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment), + value.c_str(), _value_size); + } + + item(const item&) = delete; + item(item&&) = delete; + + clock_type::time_point get_timeout() { + return _expiry.to_time_point(); + } + + version_type version() { + return _version; + } + + const compat::string_view key() const { + return compat::string_view(_data, _key_size); + } + + const compat::string_view ascii_prefix() const { + const char *p = _data + align_up(_key_size, field_alignment); + return compat::string_view(p, _ascii_prefix_size); + } + + const compat::string_view value() const { + const char *p = _data + align_up(_key_size, field_alignment) + + align_up(_ascii_prefix_size, field_alignment); + return compat::string_view(p, _value_size); + } + + size_t key_size() const { + return _key_size; + } + + size_t ascii_prefix_size() const { + return _ascii_prefix_size; + } + + size_t value_size() const { + return _value_size; + } + + optional<uint64_t> data_as_integral() { + auto str = value().data(); + if (str[0] == '-') { + return {}; + } + + auto len = _value_size; + + // Strip trailing space + while (len && str[len - 1] == ' ') { + len--; + } + + try { + return {boost::lexical_cast<uint64_t>(str, len)}; + } catch (const boost::bad_lexical_cast& e) { + return {}; + } + } + + // needed by timer_set + bool cancel() { + return false; + } + + // Methods required by slab allocator. + uint32_t get_slab_page_index() const { + return _slab_page_index; + } + bool is_unlocked() const { + return _ref_count == 1; + } + + friend bool operator==(const item &a, const item &b) { + return (a._key_hash == b._key_hash) && + (a._key_size == b._key_size) && + (memcmp(a._data, b._data, a._key_size) == 0); + } + + friend std::size_t hash_value(const item &i) { + return i._key_hash; + } + + friend inline void intrusive_ptr_add_ref(item* it) { + assert(it->_ref_count >= 0); + ++it->_ref_count; + if (it->_ref_count == 2) { + slab->lock_item(it); + } + } + + friend inline void intrusive_ptr_release(item* it) { + --it->_ref_count; + if (it->_ref_count == 1) { + slab->unlock_item(it); + } else if (it->_ref_count == 0) { + slab->free(it); + } + assert(it->_ref_count >= 0); + } + + friend class item_key_cmp; +}; + +struct item_key_cmp +{ +private: + bool compare(const item_key& key, const item& it) const { + return (it._key_hash == key.hash()) && + (it._key_size == key.key().size()) && + (memcmp(it._data, key.key().c_str(), it._key_size) == 0); + } +public: + bool operator()(const item_key& key, const item& it) const { + return compare(key, it); + } + + bool operator()(const item& it, const item_key& key) const { + return compare(key, it); + } +}; + +using item_ptr = foreign_ptr<boost::intrusive_ptr<item>>; + +struct cache_stats { + size_t _get_hits {}; + size_t _get_misses {}; + size_t _set_adds {}; + size_t _set_replaces {}; + size_t _cas_hits {}; + size_t _cas_misses {}; + size_t _cas_badval {}; + size_t _delete_misses {}; + size_t _delete_hits {}; + size_t _incr_misses {}; + size_t _incr_hits {}; + size_t _decr_misses {}; + size_t _decr_hits {}; + size_t _expired {}; + size_t _evicted {}; + size_t _bytes {}; + size_t _resize_failure {}; + size_t _size {}; + size_t _reclaims{}; + + void operator+=(const cache_stats& o) { + _get_hits += o._get_hits; + _get_misses += o._get_misses; + _set_adds += o._set_adds; + _set_replaces += o._set_replaces; + _cas_hits += o._cas_hits; + _cas_misses += o._cas_misses; + _cas_badval += o._cas_badval; + _delete_misses += o._delete_misses; + _delete_hits += o._delete_hits; + _incr_misses += o._incr_misses; + _incr_hits += o._incr_hits; + _decr_misses += o._decr_misses; + _decr_hits += o._decr_hits; + _expired += o._expired; + _evicted += o._evicted; + _bytes += o._bytes; + _resize_failure += o._resize_failure; + _size += o._size; + _reclaims += o._reclaims; + } +}; + +enum class cas_result { + not_found, stored, bad_version +}; + +struct remote_origin_tag { + template <typename T> + static inline + T move_if_local(T& ref) { + return ref; + } +}; + +struct local_origin_tag { + template <typename T> + static inline + T move_if_local(T& ref) { + return std::move(ref); + } +}; + +struct item_insertion_data { + item_key key; + sstring ascii_prefix; + sstring data; + expiration expiry; +}; + +class cache { +private: + using cache_type = bi::unordered_set<item, + bi::member_hook<item, item::hook_type, &item::_cache_link>, + bi::power_2_buckets<true>, + bi::constant_time_size<true>>; + using cache_iterator = typename cache_type::iterator; + static constexpr size_t initial_bucket_count = 1 << 10; + static constexpr float load_factor = 0.75f; + size_t _resize_up_threshold = load_factor * initial_bucket_count; + std::vector<cache_type::bucket_type> _buckets; + cache_type _cache; + seastar::timer_set<item, &item::_timer_link> _alive; + timer<clock_type> _timer; + // delta in seconds between the current values of a wall clock and a clock_type clock + clock_type::duration _wc_to_clock_type_delta; + cache_stats _stats; + timer<clock_type> _flush_timer; +private: + size_t item_size(item& item_ref) { + constexpr size_t field_alignment = alignof(void*); + return sizeof(item) + + align_up(item_ref.key_size(), field_alignment) + + align_up(item_ref.ascii_prefix_size(), field_alignment) + + item_ref.value_size(); + } + + size_t item_size(item_insertion_data& insertion) { + constexpr size_t field_alignment = alignof(void*); + auto size = sizeof(item) + + align_up(insertion.key.key().size(), field_alignment) + + align_up(insertion.ascii_prefix.size(), field_alignment) + + insertion.data.size(); +#ifdef __DEBUG__ + static bool print_item_footprint = true; + if (print_item_footprint) { + print_item_footprint = false; + std::cout << __FUNCTION__ << ": " << size << "\n"; + std::cout << "sizeof(item) " << sizeof(item) << "\n"; + std::cout << "key.size " << insertion.key.key().size() << "\n"; + std::cout << "value.size " << insertion.data.size() << "\n"; + std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n"; + } +#endif + return size; + } + + template <bool IsInCache = true, bool IsInTimerList = true, bool Release = true> + void erase(item& item_ref) { + if (IsInCache) { + _cache.erase(_cache.iterator_to(item_ref)); + } + if (IsInTimerList) { + if (item_ref._expiry.ever_expires()) { + _alive.remove(item_ref); + } + } + _stats._bytes -= item_size(item_ref); + if (Release) { + // memory used by item shouldn't be freed when slab is replacing it with another item. + intrusive_ptr_release(&item_ref); + } + } + + void expire() { + using namespace std::chrono; + + // + // Adjust the delta on every timer event to minimize an error caused + // by a wall clock adjustment. + // + _wc_to_clock_type_delta = + duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch()); + + auto exp = _alive.expire(clock_type::now()); + while (!exp.empty()) { + auto item = &*exp.begin(); + exp.pop_front(); + erase<true, false>(*item); + _stats._expired++; + } + _timer.arm(_alive.get_next_timeout()); + } + + inline + cache_iterator find(const item_key& key) { + return _cache.find(key, std::hash<item_key>(), item_key_cmp()); + } + + template <typename Origin> + inline + cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) { + auto& old_item = *i; + uint64_t old_item_version = old_item._version; + + erase(old_item); + + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1); + intrusive_ptr_add_ref(new_item); + + auto insert_result = _cache.insert(*new_item); + assert(insert_result.second); + if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) { + _timer.rearm(new_item->get_timeout()); + } + _stats._bytes += size; + return insert_result.first; + } + + template <typename Origin> + inline + void add_new(item_insertion_data& insertion) { + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + Origin::move_if_local(insertion.data), insertion.expiry); + intrusive_ptr_add_ref(new_item); + auto& item_ref = *new_item; + _cache.insert(item_ref); + if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) { + _timer.rearm(item_ref.get_timeout()); + } + _stats._bytes += size; + maybe_rehash(); + } + + void maybe_rehash() { + if (_cache.size() >= _resize_up_threshold) { + auto new_size = _cache.bucket_count() * 2; + std::vector<cache_type::bucket_type> old_buckets; + try { + old_buckets = std::exchange(_buckets, std::vector<cache_type::bucket_type>(new_size)); + } catch (const std::bad_alloc& e) { + _stats._resize_failure++; + return; + } + _cache.rehash(typename cache_type::bucket_traits(_buckets.data(), new_size)); + _resize_up_threshold = _cache.bucket_count() * load_factor; + } + } +public: + cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size) + : _buckets(initial_bucket_count) + , _cache(cache_type::bucket_traits(_buckets.data(), initial_bucket_count)) + { + using namespace std::chrono; + + _wc_to_clock_type_delta = + duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch()); + + _timer.set_callback([this] { expire(); }); + _flush_timer.set_callback([this] { flush_all(); }); + + // initialize per-thread slab allocator. + slab_holder = std::make_unique<slab_allocator<item>>(default_slab_growth_factor, per_cpu_slab_size, slab_page_size, + [this](item& item_ref) { erase<true, true, false>(item_ref); _stats._evicted++; }); + slab = slab_holder.get(); +#ifdef __DEBUG__ + static bool print_slab_classes = true; + if (print_slab_classes) { + print_slab_classes = false; + slab->print_slab_classes(); + } +#endif + } + + ~cache() { + flush_all(); + } + + void flush_all() { + _flush_timer.cancel(); + _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) { + erase<false, true>(*it); + }); + } + + void flush_at(uint32_t time) { + auto expiry = expiration(get_wc_to_clock_type_delta(), time); + _flush_timer.rearm(expiry.to_time_point()); + } + + template <typename Origin = local_origin_tag> + bool set(item_insertion_data& insertion) { + auto i = find(insertion.key); + if (i != _cache.end()) { + add_overriding<Origin>(i, insertion); + _stats._set_replaces++; + return true; + } else { + add_new<Origin>(insertion); + _stats._set_adds++; + return false; + } + } + + template <typename Origin = local_origin_tag> + bool add(item_insertion_data& insertion) { + auto i = find(insertion.key); + if (i != _cache.end()) { + return false; + } + + _stats._set_adds++; + add_new<Origin>(insertion); + return true; + } + + template <typename Origin = local_origin_tag> + bool replace(item_insertion_data& insertion) { + auto i = find(insertion.key); + if (i == _cache.end()) { + return false; + } + + _stats._set_replaces++; + add_overriding<Origin>(i, insertion); + return true; + } + + bool remove(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + _stats._delete_misses++; + return false; + } + _stats._delete_hits++; + auto& item_ref = *i; + erase(item_ref); + return true; + } + + item_ptr get(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + _stats._get_misses++; + return nullptr; + } + _stats._get_hits++; + auto& item_ref = *i; + return item_ptr(&item_ref); + } + + template <typename Origin = local_origin_tag> + cas_result cas(item_insertion_data& insertion, item::version_type version) { + auto i = find(insertion.key); + if (i == _cache.end()) { + _stats._cas_misses++; + return cas_result::not_found; + } + auto& item_ref = *i; + if (item_ref._version != version) { + _stats._cas_badval++; + return cas_result::bad_version; + } + _stats._cas_hits++; + add_overriding<Origin>(i, insertion); + return cas_result::stored; + } + + size_t size() { + return _cache.size(); + } + + size_t bucket_count() { + return _cache.bucket_count(); + } + + cache_stats stats() { + _stats._size = size(); + return _stats; + } + + template <typename Origin = local_origin_tag> + std::pair<item_ptr, bool> incr(item_key& key, uint64_t delta) { + auto i = find(key); + if (i == _cache.end()) { + _stats._incr_misses++; + return {item_ptr{}, false}; + } + auto& item_ref = *i; + _stats._incr_hits++; + auto value = item_ref.data_as_integral(); + if (!value) { + return {boost::intrusive_ptr<item>(&item_ref), false}; + } + item_insertion_data insertion { + .key = Origin::move_if_local(key), + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), + .data = to_sstring(*value + delta), + .expiry = item_ref._expiry + }; + i = add_overriding<local_origin_tag>(i, insertion); + return {boost::intrusive_ptr<item>(&*i), true}; + } + + template <typename Origin = local_origin_tag> + std::pair<item_ptr, bool> decr(item_key& key, uint64_t delta) { + auto i = find(key); + if (i == _cache.end()) { + _stats._decr_misses++; + return {item_ptr{}, false}; + } + auto& item_ref = *i; + _stats._decr_hits++; + auto value = item_ref.data_as_integral(); + if (!value) { + return {boost::intrusive_ptr<item>(&item_ref), false}; + } + item_insertion_data insertion { + .key = Origin::move_if_local(key), + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), + .data = to_sstring(*value - std::min(*value, delta)), + .expiry = item_ref._expiry + }; + i = add_overriding<local_origin_tag>(i, insertion); + return {boost::intrusive_ptr<item>(&*i), true}; + } + + std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> print_hash_stats() { + static constexpr unsigned bits = sizeof(size_t) * 8; + size_t histo[bits + 1] {}; + size_t max_size = 0; + unsigned max_bucket = 0; + + for (size_t i = 0; i < _cache.bucket_count(); i++) { + size_t size = _cache.bucket_size(i); + unsigned bucket; + if (size == 0) { + bucket = 0; + } else { + bucket = bits - count_leading_zeros(size); + } + max_bucket = std::max(max_bucket, bucket); + max_size = std::max(max_size, size); + histo[bucket]++; + } + + std::stringstream ss; + + ss << "size: " << _cache.size() << "\n"; + ss << "buckets: " << _cache.bucket_count() << "\n"; + ss << "load: " << format("{:.2f}", (double)_cache.size() / _cache.bucket_count()) << "\n"; + ss << "max bucket occupancy: " << max_size << "\n"; + ss << "bucket occupancy histogram:\n"; + + for (unsigned i = 0; i < (max_bucket + 2); i++) { + ss << " "; + if (i == 0) { + ss << "0: "; + } else if (i == 1) { + ss << "1: "; + } else { + ss << (1 << (i - 1)) << "+: "; + } + ss << histo[i] << "\n"; + } + return {engine().cpu_id(), make_foreign(make_lw_shared<std::string>(ss.str()))}; + } + + future<> stop() { return make_ready_future<>(); } + clock_type::duration get_wc_to_clock_type_delta() { return _wc_to_clock_type_delta; } +}; + +class sharded_cache { +private: + distributed<cache>& _peers; + + inline + unsigned get_cpu(const item_key& key) { + return std::hash<item_key>()(key) % smp::count; + } +public: + sharded_cache(distributed<cache>& peers) : _peers(peers) {} + + future<> flush_all() { + return _peers.invoke_on_all(&cache::flush_all); + } + + future<> flush_at(uint32_t time) { + return _peers.invoke_on_all(&cache::flush_at, time); + } + + auto get_wc_to_clock_type_delta() { return _peers.local().get_wc_to_clock_type_delta(); } + + // The caller must keep @insertion live until the resulting future resolves. + future<bool> set(item_insertion_data& insertion) { + auto cpu = get_cpu(insertion.key); + if (engine().cpu_id() == cpu) { + return make_ready_future<bool>(_peers.local().set(insertion)); + } + return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion)); + } + + // The caller must keep @insertion live until the resulting future resolves. + future<bool> add(item_insertion_data& insertion) { + auto cpu = get_cpu(insertion.key); + if (engine().cpu_id() == cpu) { + return make_ready_future<bool>(_peers.local().add(insertion)); + } + return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion)); + } + + // The caller must keep @insertion live until the resulting future resolves. + future<bool> replace(item_insertion_data& insertion) { + auto cpu = get_cpu(insertion.key); + if (engine().cpu_id() == cpu) { + return make_ready_future<bool>(_peers.local().replace(insertion)); + } + return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion)); + } + + // The caller must keep @key live until the resulting future resolves. + future<bool> remove(const item_key& key) { + auto cpu = get_cpu(key); + return _peers.invoke_on(cpu, &cache::remove, std::ref(key)); + } + + // The caller must keep @key live until the resulting future resolves. + future<item_ptr> get(const item_key& key) { + auto cpu = get_cpu(key); + return _peers.invoke_on(cpu, &cache::get, std::ref(key)); + } + + // The caller must keep @insertion live until the resulting future resolves. + future<cas_result> cas(item_insertion_data& insertion, item::version_type version) { + auto cpu = get_cpu(insertion.key); + if (engine().cpu_id() == cpu) { + return make_ready_future<cas_result>(_peers.local().cas(insertion, version)); + } + return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version)); + } + + future<cache_stats> stats() { + return _peers.map_reduce(adder<cache_stats>(), &cache::stats); + } + + // The caller must keep @key live until the resulting future resolves. + future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) { + auto cpu = get_cpu(key); + if (engine().cpu_id() == cpu) { + return make_ready_future<std::pair<item_ptr, bool>>( + _peers.local().incr<local_origin_tag>(key, delta)); + } + return _peers.invoke_on(cpu, &cache::incr<remote_origin_tag>, std::ref(key), std::move(delta)); + } + + // The caller must keep @key live until the resulting future resolves. + future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) { + auto cpu = get_cpu(key); + if (engine().cpu_id() == cpu) { + return make_ready_future<std::pair<item_ptr, bool>>( + _peers.local().decr(key, delta)); + } + return _peers.invoke_on(cpu, &cache::decr<remote_origin_tag>, std::ref(key), std::move(delta)); + } + + future<> print_hash_stats(output_stream<char>& out) { + return _peers.map_reduce([&out] (std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> data) mutable { + return out.write("=== CPU " + std::to_string(data.first) + " ===\r\n") + .then([&out, str = std::move(data.second)] { + return out.write(*str); + }); + }, &cache::print_hash_stats); + } +}; + +struct system_stats { + uint32_t _curr_connections {}; + uint32_t _total_connections {}; + uint64_t _cmd_get {}; + uint64_t _cmd_set {}; + uint64_t _cmd_flush {}; + clock_type::time_point _start_time; +public: + system_stats() { + _start_time = clock_type::time_point::max(); + } + system_stats(clock_type::time_point start_time) + : _start_time(start_time) { + } + system_stats self() { + return *this; + } + void operator+=(const system_stats& other) { + _curr_connections += other._curr_connections; + _total_connections += other._total_connections; + _cmd_get += other._cmd_get; + _cmd_set += other._cmd_set; + _cmd_flush += other._cmd_flush; + _start_time = std::min(_start_time, other._start_time); + } + future<> stop() { return make_ready_future<>(); } +}; + +class ascii_protocol { +private: + using this_type = ascii_protocol; + sharded_cache& _cache; + distributed<system_stats>& _system_stats; + memcache_ascii_parser _parser; + item_key _item_key; + item_insertion_data _insertion; + std::vector<item_ptr> _items; +private: + static constexpr const char *msg_crlf = "\r\n"; + static constexpr const char *msg_error = "ERROR\r\n"; + static constexpr const char *msg_stored = "STORED\r\n"; + static constexpr const char *msg_not_stored = "NOT_STORED\r\n"; + static constexpr const char *msg_end = "END\r\n"; + static constexpr const char *msg_value = "VALUE "; + static constexpr const char *msg_deleted = "DELETED\r\n"; + static constexpr const char *msg_not_found = "NOT_FOUND\r\n"; + static constexpr const char *msg_ok = "OK\r\n"; + static constexpr const char *msg_version = "VERSION " VERSION_STRING "\r\n"; + static constexpr const char *msg_exists = "EXISTS\r\n"; + static constexpr const char *msg_stat = "STAT "; + static constexpr const char *msg_out_of_memory = "SERVER_ERROR Out of memory allocating new item\r\n"; + static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"; +private: + template <bool WithVersion> + static void append_item(scattered_message<char>& msg, item_ptr item) { + if (!item) { + return; + } + + msg.append_static("VALUE "); + msg.append_static(item->key()); + msg.append_static(item->ascii_prefix()); + + if (WithVersion) { + msg.append_static(" "); + msg.append(to_sstring(item->version())); + } + + msg.append_static(msg_crlf); + msg.append_static(item->value()); + msg.append_static(msg_crlf); + msg.on_delete([item = std::move(item)] {}); + } + + template <bool WithVersion> + future<> handle_get(output_stream<char>& out) { + _system_stats.local()._cmd_get++; + if (_parser._keys.size() == 1) { + return _cache.get(_parser._keys[0]).then([&out] (auto item) -> future<> { + scattered_message<char> msg; + this_type::append_item<WithVersion>(msg, std::move(item)); + msg.append_static(msg_end); + return out.write(std::move(msg)); + }); + } else { + _items.clear(); + return parallel_for_each(_parser._keys.begin(), _parser._keys.end(), [this] (const auto& key) { + return _cache.get(key).then([this] (auto item) { + _items.emplace_back(std::move(item)); + }); + }).then([this, &out] () { + scattered_message<char> msg; + for (auto& item : _items) { + append_item<WithVersion>(msg, std::move(item)); + } + msg.append_static(msg_end); + return out.write(std::move(msg)); + }); + } + } + + template <typename Value> + static future<> print_stat(output_stream<char>& out, const char* key, Value value) { + return out.write(msg_stat) + .then([&out, key] { return out.write(key); }) + .then([&out] { return out.write(" "); }) + .then([&out, value] { return out.write(to_sstring(value)); }) + .then([&out] { return out.write(msg_crlf); }); + } + + future<> print_stats(output_stream<char>& out) { + return _cache.stats().then([this, &out] (auto stats) { + return _system_stats.map_reduce(adder<system_stats>(), &system_stats::self) + .then([&out, all_cache_stats = std::move(stats)] (auto all_system_stats) -> future<> { + auto now = clock_type::now(); + auto total_items = all_cache_stats._set_replaces + all_cache_stats._set_adds + + all_cache_stats._cas_hits; + return print_stat(out, "pid", getpid()) + .then([&out, uptime = now - all_system_stats._start_time] { + return print_stat(out, "uptime", + std::chrono::duration_cast<std::chrono::seconds>(uptime).count()); + }).then([now, &out] { + return print_stat(out, "time", + std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count()); + }).then([&out] { + return print_stat(out, "version", VERSION_STRING); + }).then([&out] { + return print_stat(out, "pointer_size", sizeof(void*)*8); + }).then([&out, v = all_system_stats._curr_connections] { + return print_stat(out, "curr_connections", v); + }).then([&out, v = all_system_stats._total_connections] { + return print_stat(out, "total_connections", v); + }).then([&out, v = all_system_stats._curr_connections] { + return print_stat(out, "connection_structures", v); + }).then([&out, v = all_system_stats._cmd_get] { + return print_stat(out, "cmd_get", v); + }).then([&out, v = all_system_stats._cmd_set] { + return print_stat(out, "cmd_set", v); + }).then([&out, v = all_system_stats._cmd_flush] { + return print_stat(out, "cmd_flush", v); + }).then([&out] { + return print_stat(out, "cmd_touch", 0); + }).then([&out, v = all_cache_stats._get_hits] { + return print_stat(out, "get_hits", v); + }).then([&out, v = all_cache_stats._get_misses] { + return print_stat(out, "get_misses", v); + }).then([&out, v = all_cache_stats._delete_misses] { + return print_stat(out, "delete_misses", v); + }).then([&out, v = all_cache_stats._delete_hits] { + return print_stat(out, "delete_hits", v); + }).then([&out, v = all_cache_stats._incr_misses] { + return print_stat(out, "incr_misses", v); + }).then([&out, v = all_cache_stats._incr_hits] { + return print_stat(out, "incr_hits", v); + }).then([&out, v = all_cache_stats._decr_misses] { + return print_stat(out, "decr_misses", v); + }).then([&out, v = all_cache_stats._decr_hits] { + return print_stat(out, "decr_hits", v); + }).then([&out, v = all_cache_stats._cas_misses] { + return print_stat(out, "cas_misses", v); + }).then([&out, v = all_cache_stats._cas_hits] { + return print_stat(out, "cas_hits", v); + }).then([&out, v = all_cache_stats._cas_badval] { + return print_stat(out, "cas_badval", v); + }).then([&out] { + return print_stat(out, "touch_hits", 0); + }).then([&out] { + return print_stat(out, "touch_misses", 0); + }).then([&out] { + return print_stat(out, "auth_cmds", 0); + }).then([&out] { + return print_stat(out, "auth_errors", 0); + }).then([&out] { + return print_stat(out, "threads", smp::count); + }).then([&out, v = all_cache_stats._size] { + return print_stat(out, "curr_items", v); + }).then([&out, v = total_items] { + return print_stat(out, "total_items", v); + }).then([&out, v = all_cache_stats._expired] { + return print_stat(out, "seastar.expired", v); + }).then([&out, v = all_cache_stats._resize_failure] { + return print_stat(out, "seastar.resize_failure", v); + }).then([&out, v = all_cache_stats._evicted] { + return print_stat(out, "evictions", v); + }).then([&out, v = all_cache_stats._bytes] { + return print_stat(out, "bytes", v); + }).then([&out] { + return out.write(msg_end); + }); + }); + }); + } +public: + ascii_protocol(sharded_cache& cache, distributed<system_stats>& system_stats) + : _cache(cache) + , _system_stats(system_stats) + {} + + void prepare_insertion() { + _insertion = item_insertion_data{ + .key = std::move(_parser._key), + .ascii_prefix = make_sstring(" ", _parser._flags_str, " ", _parser._size_str), + .data = std::move(_parser._blob), + .expiry = expiration(_cache.get_wc_to_clock_type_delta(), _parser._expiration) + }; + } + + future<> handle(input_stream<char>& in, output_stream<char>& out) { + _parser.init(); + return in.consume(_parser).then([this, &out] () -> future<> { + switch (_parser._state) { + case memcache_ascii_parser::state::eof: + return make_ready_future<>(); + + case memcache_ascii_parser::state::error: + return out.write(msg_error); + + case memcache_ascii_parser::state::cmd_set: + { + _system_stats.local()._cmd_set++; + prepare_insertion(); + auto f = _cache.set(_insertion); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (...) { + return out.write(msg_stored); + }); + } + + case memcache_ascii_parser::state::cmd_cas: + { + _system_stats.local()._cmd_set++; + prepare_insertion(); + auto f = _cache.cas(_insertion, _parser._version); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (auto result) { + switch (result) { + case cas_result::stored: + return out.write(msg_stored); + case cas_result::not_found: + return out.write(msg_not_found); + case cas_result::bad_version: + return out.write(msg_exists); + default: + std::abort(); + } + }); + } + + case memcache_ascii_parser::state::cmd_add: + { + _system_stats.local()._cmd_set++; + prepare_insertion(); + auto f = _cache.add(_insertion); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (bool added) { + return out.write(added ? msg_stored : msg_not_stored); + }); + } + + case memcache_ascii_parser::state::cmd_replace: + { + _system_stats.local()._cmd_set++; + prepare_insertion(); + auto f = _cache.replace(_insertion); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (auto replaced) { + return out.write(replaced ? msg_stored : msg_not_stored); + }); + } + + case memcache_ascii_parser::state::cmd_get: + return handle_get<false>(out); + + case memcache_ascii_parser::state::cmd_gets: + return handle_get<true>(out); + + case memcache_ascii_parser::state::cmd_delete: + { + auto f = _cache.remove(_parser._key); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (bool removed) { + return out.write(removed ? msg_deleted : msg_not_found); + }); + } + + case memcache_ascii_parser::state::cmd_flush_all: + { + _system_stats.local()._cmd_flush++; + if (_parser._expiration) { + auto f = _cache.flush_at(_parser._expiration); + if (_parser._noreply) { + return f; + } + return std::move(f).then([&out] { + return out.write(msg_ok); + }); + } else { + auto f = _cache.flush_all(); + if (_parser._noreply) { + return f; + } + return std::move(f).then([&out] { + return out.write(msg_ok); + }); + } + } + + case memcache_ascii_parser::state::cmd_version: + return out.write(msg_version); + + case memcache_ascii_parser::state::cmd_stats: + return print_stats(out); + + case memcache_ascii_parser::state::cmd_stats_hash: + return _cache.print_hash_stats(out); + + case memcache_ascii_parser::state::cmd_incr: + { + auto f = _cache.incr(_parser._key, _parser._u64); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (auto result) { + auto item = std::move(result.first); + if (!item) { + return out.write(msg_not_found); + } + auto incremented = result.second; + if (!incremented) { + return out.write(msg_error_non_numeric_value); + } + return out.write(item->value().data(), item->value_size()).then([&out] { + return out.write(msg_crlf); + }); + }); + } + + case memcache_ascii_parser::state::cmd_decr: + { + auto f = _cache.decr(_parser._key, _parser._u64); + if (_parser._noreply) { + return std::move(f).discard_result(); + } + return std::move(f).then([&out] (auto result) { + auto item = std::move(result.first); + if (!item) { + return out.write(msg_not_found); + } + auto decremented = result.second; + if (!decremented) { + return out.write(msg_error_non_numeric_value); + } + return out.write(item->value().data(), item->value_size()).then([&out] { + return out.write(msg_crlf); + }); + }); + } + }; + std::abort(); + }).then_wrapped([this, &out] (auto&& f) -> future<> { + // FIXME: then_wrapped() being scheduled even though no exception was triggered has a + // performance cost of about 2.6%. Not using it means maintainability penalty. + try { + f.get(); + } catch (std::bad_alloc& e) { + if (_parser._noreply) { + return make_ready_future<>(); + } + return out.write(msg_out_of_memory); + } + return make_ready_future<>(); + }); + }; +}; + +class udp_server { +public: + static const size_t default_max_datagram_size = 1400; +private: + sharded_cache& _cache; + distributed<system_stats>& _system_stats; + udp_channel _chan; + uint16_t _port; + size_t _max_datagram_size = default_max_datagram_size; + + struct header { + packed<uint16_t> _request_id; + packed<uint16_t> _sequence_number; + packed<uint16_t> _n; + packed<uint16_t> _reserved; + + template<typename Adjuster> + auto adjust_endianness(Adjuster a) { + return a(_request_id, _sequence_number, _n); + } + } __attribute__((packed)); + + struct connection { + ipv4_addr _src; + uint16_t _request_id; + input_stream<char> _in; + output_stream<char> _out; + std::vector<packet> _out_bufs; + ascii_protocol _proto; + + connection(ipv4_addr src, uint16_t request_id, input_stream<char>&& in, size_t out_size, + sharded_cache& c, distributed<system_stats>& system_stats) + : _src(src) + , _request_id(request_id) + , _in(std::move(in)) + , _out(output_stream<char>(data_sink(std::make_unique<vector_data_sink>(_out_bufs)), out_size, true)) + , _proto(c, system_stats) + {} + + future<> respond(udp_channel& chan) { + int i = 0; + return do_for_each(_out_bufs.begin(), _out_bufs.end(), [this, i, &chan] (packet& p) mutable { + header* out_hdr = p.prepend_header<header>(0); + out_hdr->_request_id = _request_id; + out_hdr->_sequence_number = i++; + out_hdr->_n = _out_bufs.size(); + *out_hdr = hton(*out_hdr); + return chan.send(_src, std::move(p)); + }); + } + }; + +public: + udp_server(sharded_cache& c, distributed<system_stats>& system_stats, uint16_t port = 11211) + : _cache(c) + , _system_stats(system_stats) + , _port(port) + {} + + void set_max_datagram_size(size_t max_datagram_size) { + _max_datagram_size = max_datagram_size; + } + + void start() { + _chan = engine().net().make_udp_channel({_port}); + keep_doing([this] { + return _chan.receive().then([this](udp_datagram dgram) { + packet& p = dgram.get_data(); + if (p.len() < sizeof(header)) { + // dropping invalid packet + return make_ready_future<>(); + } + + header hdr = ntoh(*p.get_header<header>()); + p.trim_front(sizeof(hdr)); + + auto request_id = hdr._request_id; + auto in = as_input_stream(std::move(p)); + auto conn = make_lw_shared<connection>(dgram.get_src(), request_id, std::move(in), + _max_datagram_size - sizeof(header), _cache, _system_stats); + + if (hdr._n != 1 || hdr._sequence_number != 0) { + return conn->_out.write("CLIENT_ERROR only single-datagram requests supported\r\n").then([this, conn] { + return conn->_out.flush().then([this, conn] { + return conn->respond(_chan).then([conn] {}); + }); + }); + } + + return conn->_proto.handle(conn->_in, conn->_out).then([this, conn]() mutable { + return conn->_out.flush().then([this, conn] { + return conn->respond(_chan).then([conn] {}); + }); + }); + }); + }).or_terminate(); + }; + + future<> stop() { return make_ready_future<>(); } +}; + +class tcp_server { +private: + lw_shared_ptr<server_socket> _listener; + sharded_cache& _cache; + distributed<system_stats>& _system_stats; + uint16_t _port; + struct connection { + connected_socket _socket; + socket_address _addr; + input_stream<char> _in; + output_stream<char> _out; + ascii_protocol _proto; + distributed<system_stats>& _system_stats; + connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed<system_stats>& system_stats) + : _socket(std::move(socket)) + , _addr(addr) + , _in(_socket.input()) + , _out(_socket.output()) + , _proto(c, system_stats) + , _system_stats(system_stats) + { + _system_stats.local()._curr_connections++; + _system_stats.local()._total_connections++; + } + ~connection() { + _system_stats.local()._curr_connections--; + } + }; +public: + tcp_server(sharded_cache& cache, distributed<system_stats>& system_stats, uint16_t port = 11211) + : _cache(cache) + , _system_stats(system_stats) + , _port(port) + {} + + void start() { + listen_options lo; + lo.reuse_address = true; + _listener = engine().listen(make_ipv4_address({_port}), lo); + keep_doing([this] { + return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable { + auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats); + do_until([conn] { return conn->_in.eof(); }, [conn] { + return conn->_proto.handle(conn->_in, conn->_out).then([conn] { + return conn->_out.flush(); + }); + }).finally([conn] { + return conn->_out.close().finally([conn]{}); + }); + }); + }).or_terminate(); + } + + future<> stop() { return make_ready_future<>(); } +}; + +class stats_printer { +private: + timer<> _timer; + sharded_cache& _cache; +public: + stats_printer(sharded_cache& cache) + : _cache(cache) {} + + void start() { + _timer.set_callback([this] { + _cache.stats().then([] (auto stats) { + auto gets_total = stats._get_hits + stats._get_misses; + auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0; + auto sets_total = stats._set_adds + stats._set_replaces; + auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0; + std::cout << "items: " << stats._size << " " + << std::setprecision(2) << std::fixed + << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) " + << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%)"; + std::cout << std::endl; + }); + }); + _timer.arm_periodic(std::chrono::seconds(1)); + } + + future<> stop() { return make_ready_future<>(); } +}; + +} /* namespace memcache */ + +int main(int ac, char** av) { + distributed<memcache::cache> cache_peers; + memcache::sharded_cache cache(cache_peers); + distributed<memcache::system_stats> system_stats; + distributed<memcache::udp_server> udp_server; + distributed<memcache::tcp_server> tcp_server; + memcache::stats_printer stats(cache); + + namespace bpo = boost::program_options; + app_template app; + app.add_options() + ("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size), + "Maximum size of UDP datagram") + ("max-slab-size", bpo::value<uint64_t>()->default_value(memcache::default_per_cpu_slab_size/MB), + "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)") + ("slab-page-size", bpo::value<uint64_t>()->default_value(memcache::default_slab_page_size/MB), + "Size of slab page (value in megabytes)") + ("stats", + "Print basic statistics periodically (every second)") + ("port", bpo::value<uint16_t>()->default_value(11211), + "Specify UDP and TCP ports for memcached server to listen on") + ; + + return app.run_deprecated(ac, av, [&] { + engine().at_exit([&] { return tcp_server.stop(); }); + engine().at_exit([&] { return udp_server.stop(); }); + engine().at_exit([&] { return cache_peers.stop(); }); + engine().at_exit([&] { return system_stats.stop(); }); + + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + uint64_t per_cpu_slab_size = config["max-slab-size"].as<uint64_t>() * MB; + uint64_t slab_page_size = config["slab-page-size"].as<uint64_t>() * MB; + return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] { + return system_stats.start(memcache::clock_type::now()); + }).then([&] { + std::cout << PLATFORM << " memcached " << VERSION << "\n"; + return make_ready_future<>(); + }).then([&, port] { + return tcp_server.start(std::ref(cache), std::ref(system_stats), port); + }).then([&tcp_server] { + return tcp_server.invoke_on_all(&memcache::tcp_server::start); + }).then([&, port] { + if (engine().net().has_per_core_namespace()) { + return udp_server.start(std::ref(cache), std::ref(system_stats), port); + } else { + return udp_server.start_single(std::ref(cache), std::ref(system_stats), port); + } + }).then([&] { + return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size, + (size_t)config["max-datagram-size"].as<int>()); + }).then([&] { + return udp_server.invoke_on_all(&memcache::udp_server::start); + }).then([&stats, start_stats = config.count("stats")] { + if (start_stats) { + stats.start(); + } + }); + }); +} diff --git a/src/seastar/apps/memcached/memcached.hh b/src/seastar/apps/memcached/memcached.hh new file mode 100644 index 00000000..9a587578 --- /dev/null +++ b/src/seastar/apps/memcached/memcached.hh @@ -0,0 +1,74 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <seastar/core/sstring.hh> + +namespace memcache { + +using namespace seastar; + +class item; +class cache; + +class item_key { +private: + sstring _key; + size_t _hash; +public: + item_key() = default; + item_key(item_key&) = default; + item_key(sstring key) + : _key(key) + , _hash(std::hash<sstring>()(key)) + {} + item_key(item_key&& other) + : _key(std::move(other._key)) + , _hash(other._hash) + { + other._hash = 0; + } + size_t hash() const { + return _hash; + } + const sstring& key() const { + return _key; + } + bool operator==(const item_key& other) const { + return other._hash == _hash && other._key == _key; + } + void operator=(item_key&& other) { + _key = std::move(other._key); + _hash = other._hash; + other._hash = 0; + } +}; + +} + +namespace std { + +template <> +struct hash<memcache::item_key> { + size_t operator()(const memcache::item_key& key) { + return key.hash(); + } +}; + +} /* namespace std */ diff --git a/src/seastar/apps/memcached/tests/CMakeLists.txt b/src/seastar/apps/memcached/tests/CMakeLists.txt new file mode 100644 index 00000000..9301cea7 --- /dev/null +++ b/src/seastar/apps/memcached/tests/CMakeLists.txt @@ -0,0 +1,75 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +if (Seastar_EXECUTE_ONLY_FAST_TESTS) + set (memcached_test_args --fast) +else () + set (memcached_test_args "") +endif () + +add_custom_target (app_memcached_test_memcached_run + DEPENDS + ${memcached_app} + ${CMAKE_CURRENT_SOURCE_DIR}/test.py + ${CMAKE_CURRENT_SOURCE_DIR}/test_memcached.py + COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/test.py --memcached $<TARGET_FILE:app_memcached> ${memcached_test_args} + USES_TERMINAL) + +add_test ( + NAME Seastar.app.memcached.memcached + COMMAND ${CMAKE_COMMAND} --build ${Seastar_BINARY_DIR} --target app_memcached_test_memcached_run) + +set_tests_properties (Seastar.app.memcached.memcached + PROPERTIES + TIMEOUT ${Seastar_TEST_TIMEOUT}) + +add_executable (app_memcached_test_ascii + test_ascii_parser.cc) + +add_dependencies (app_memcached_test_ascii app_memcached) + +target_include_directories (app_memcached_test_ascii + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${Seastar_APP_MEMCACHED_BINARY_DIR} + ${Seastar_APP_MEMCACHED_SOURCE_DIR}) + +target_compile_definitions (app_memcached_test_ascii + PRIVATE SEASTAR_TESTING_MAIN) + +target_link_libraries (app_memcached_test_ascii + PRIVATE + seastar_with_flags + seastar_testing) + +add_custom_target (app_memcached_test_ascii_run + DEPENDS app_memcached_test_ascii + COMMAND app_memcached_test_ascii -- -c 2 + USES_TERMINAL) + +add_test ( + NAME Seastar.app.memcached.ascii + COMMAND ${CMAKE_COMMAND} --build ${Seastar_BINARY_DIR} --target app_memcached_test_ascii_run) + +set_tests_properties (Seastar.app.memcached.ascii + PROPERTIES + TIMEOUT ${Seastar_TEST_TIMEOUT}) diff --git a/src/seastar/apps/memcached/tests/test.py b/src/seastar/apps/memcached/tests/test.py new file mode 100755 index 00000000..c2f2b80c --- /dev/null +++ b/src/seastar/apps/memcached/tests/test.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import time +import sys +import os +import argparse +import subprocess + +DIR_PATH = os.path.dirname(os.path.realpath(__file__)) + +def run(args, cmd): + mc = subprocess.Popen([args.memcached, '--smp=2']) + print('Memcached started.') + try: + cmdline = [DIR_PATH + '/test_memcached.py'] + cmd + if args.fast: + cmdline.append('--fast') + print('Running: ' + ' '.join(cmdline)) + subprocess.check_call(cmdline) + finally: + print('Killing memcached...') + mc.terminate(); + mc.wait() + print('Memcached killed.') + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Seastar test runner") + parser.add_argument('--fast', action="store_true", help="Run only fast tests") + parser.add_argument('--memcached', required=True, help='Path of the memcached executable') + args = parser.parse_args() + + run(args, []) + run(args, ['-U']) diff --git a/src/seastar/apps/memcached/tests/test_ascii_parser.cc b/src/seastar/apps/memcached/tests/test_ascii_parser.cc new file mode 100644 index 00000000..596d193e --- /dev/null +++ b/src/seastar/apps/memcached/tests/test_ascii_parser.cc @@ -0,0 +1,335 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <iostream> +#include <limits> +#include <seastar/testing/test_case.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/net/packet-data-source.hh> +#include "ascii.hh" +#include <seastar/core/future-util.hh> + +using namespace seastar; +using namespace net; +using namespace memcache; + +using parser_type = memcache_ascii_parser; + +static packet make_packet(std::vector<std::string> chunks, size_t buffer_size) { + packet p; + for (auto&& chunk : chunks) { + size_t size = chunk.size(); + for (size_t pos = 0; pos < size; pos += buffer_size) { + auto now = std::min(pos + buffer_size, chunk.size()) - pos; + p.append(packet(chunk.data() + pos, now)); + } + } + return p; +} + +static auto make_input_stream(packet&& p) { + return input_stream<char>(data_source( + std::make_unique<packet_data_source>(std::move(p)))); +} + +static auto parse(packet&& p) { + auto is = make_lw_shared<input_stream<char>>(make_input_stream(std::move(p))); + auto parser = make_lw_shared<parser_type>(); + parser->init(); + return is->consume(*parser).then([is, parser] { + return make_ready_future<lw_shared_ptr<parser_type>>(parser); + }); +} + +auto for_each_fragment_size = [] (auto&& func) { + auto buffer_sizes = { 100000, 1000, 100, 10, 5, 2, 1 }; + return do_for_each(buffer_sizes.begin(), buffer_sizes.end(), [func] (size_t buffer_size) { + return func([buffer_size] (std::vector<std::string> chunks) { + return make_packet(chunks, buffer_size); + }); + }); +}; + +SEASTAR_TEST_CASE(test_set_command_is_parsed) { + return for_each_fragment_size([] (auto make_packet) { + return parse(make_packet({"set key 1 2 3\r\nabc\r\n"})).then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags_str == "1"); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_blob == "abc"); + }); + }); +} + +SEASTAR_TEST_CASE(test_empty_data_is_parsed) { + return for_each_fragment_size([] (auto make_packet) { + return parse(make_packet({"set key 1 2 0\r\n\r\n"})).then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags_str == "1"); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 0); + BOOST_REQUIRE(p->_size_str == "0"); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_blob == ""); + }); + }); +} + +SEASTAR_TEST_CASE(test_superflous_data_is_an_error) { + return for_each_fragment_size([] (auto make_packet) { + return parse(make_packet({"set key 0 0 0\r\nasd\r\n"})).then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }); +} + +SEASTAR_TEST_CASE(test_not_enough_data_is_an_error) { + return for_each_fragment_size([] (auto make_packet) { + return parse(make_packet({"set key 0 0 3\r\n"})).then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }); +} + +SEASTAR_TEST_CASE(test_u32_parsing) { + return for_each_fragment_size([] (auto make_packet) { + return make_ready_future<>().then([make_packet] { + return parse(make_packet({"set key 0 0 0\r\n\r\n"})).then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags_str == "0"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 12345 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags_str == "12345"); + }); + }).then([make_packet] { + return parse(make_packet({"set key -1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([make_packet] { + return parse(make_packet({"set key 1-1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([make_packet] { + return parse(make_packet({"set key " + std::to_string(std::numeric_limits<uint32_t>::max()) + " 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags_str == to_sstring(std::numeric_limits<uint32_t>::max())); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_parsing_of_split_data) { + return for_each_fragment_size([] (auto make_packet) { + return make_ready_future<>() + .then([make_packet] { + return parse(make_packet({"set key 11", "1 222 3\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 11", "1 22", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set k", "ey 11", "1 2", "2", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 111 222 3\r\n", "asd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 111 222 3\r\na", "sd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 111 222 3\r\nasd", "\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([make_packet] { + return parse(make_packet({"set key 111 222 3\r\nasd\r", "\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key"); + BOOST_REQUIRE(p->_flags_str == "111"); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_size_str == "3"); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }); + }); +} + +static std::vector<sstring> as_strings(std::vector<item_key>& keys) { + std::vector<sstring> v; + for (auto&& key : keys) { + v.push_back(key.key()); + } + return v; +} + +SEASTAR_TEST_CASE(test_get_parsing) { + return for_each_fragment_size([] (auto make_packet) { + return make_ready_future<>() + .then([make_packet] { + return parse(make_packet({"get key1\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1"})); + }); + }).then([make_packet] { + return parse(make_packet({"get key1 key2\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1", "key2"})); + }); + }).then([make_packet] { + return parse(make_packet({"get key1 key2 key3\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1", "key2", "key3"})); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_catches_errors_in_get) { + return for_each_fragment_size([] (auto make_packet) { + return make_ready_future<>() + .then([make_packet] { + return parse(make_packet({"get\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_parser_returns_eof_state_when_no_command_follows) { + return for_each_fragment_size([] (auto make_packet) { + auto p = make_shared<parser_type>(); + auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"get key\r\n"}))); + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + }).then([is, p] { + p->init(); + return is->consume(*p).then([p, is] { + BOOST_REQUIRE(p->_state == parser_type::state::eof); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_incomplete_command_is_an_error) { + return for_each_fragment_size([] (auto make_packet) { + auto p = make_shared<parser_type>(); + auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"get"}))); + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }).then([is, p] { + p->init(); + return is->consume(*p).then([p, is] { + BOOST_REQUIRE(p->_state == parser_type::state::eof); + }); + }); + }); +} + +SEASTAR_TEST_CASE(test_multiple_requests_in_one_stream) { + return for_each_fragment_size([] (auto make_packet) { + auto p = make_shared<parser_type>(); + auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"set key1 1 1 5\r\ndata1\r\nset key2 2 2 6\r\ndata2+\r\n"}))); + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key1"); + BOOST_REQUIRE(p->_flags_str == "1"); + BOOST_REQUIRE(p->_expiration == 1); + BOOST_REQUIRE(p->_size == 5); + BOOST_REQUIRE(p->_size_str == "5"); + BOOST_REQUIRE(p->_blob == "data1"); + }).then([is, p] { + p->init(); + return is->consume(*p).then([p, is] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key.key() == "key2"); + BOOST_REQUIRE(p->_flags_str == "2"); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 6); + BOOST_REQUIRE(p->_size_str == "6"); + BOOST_REQUIRE(p->_blob == "data2+"); + }); + }); + }); +} diff --git a/src/seastar/apps/memcached/tests/test_memcached.py b/src/seastar/apps/memcached/tests/test_memcached.py new file mode 100755 index 00000000..4aca858e --- /dev/null +++ b/src/seastar/apps/memcached/tests/test_memcached.py @@ -0,0 +1,600 @@ +#!/usr/bin/env python3 +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from contextlib import contextmanager +import socket +import struct +import sys +import random +import argparse +import time +import re +import unittest + +server_addr = None +call = None +args = None + +class TimeoutError(Exception): + pass + +@contextmanager +def tcp_connection(timeout=1): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) + s.connect(server_addr) + def call(msg): + s.send(msg.encode()) + return s.recv(16*1024) + yield call + s.close() + +def slow(f): + def wrapper(self): + if args.fast: + raise unittest.SkipTest('Slow') + return f(self) + return wrapper + +def recv_all(s): + m = b'' + while True: + data = s.recv(1024) + if not data: + break + m += data + return m + +def tcp_call(msg, timeout=1): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) + s.connect(server_addr) + s.send(msg.encode()) + s.shutdown(socket.SHUT_WR) + data = recv_all(s) + s.close() + return data + +def udp_call_for_fragments(msg, timeout=1): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + this_req_id = random.randint(-32768, 32767) + + datagram = struct.pack(">hhhh", this_req_id, 0, 1, 0) + msg.encode() + sock.sendto(datagram, server_addr) + + messages = {} + n_determined = None + while True: + data, addr = sock.recvfrom(1500) + req_id, seq, n, res = struct.unpack_from(">hhhh", data) + content = data[8:] + + if n_determined and n_determined != n: + raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined, n)) + n_determined = n + + if req_id != this_req_id: + raise Exception('Invalid request id: ' + req_id + ', expected ' + this_req_id) + + if seq in messages: + raise Exception('Duplicate message for seq=' + seq) + + messages[seq] = content + if len(messages) == n: + break + + for k, v in sorted(messages.items(), key=lambda e: e[0]): + yield v + + sock.close() + +def udp_call(msg, **kwargs): + return b''.join(udp_call_for_fragments(msg, **kwargs)) + +class MemcacheTest(unittest.TestCase): + def set(self, key, value, flags=0, expiry=0): + self.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n') + + def delete(self, key): + self.assertEqual(call('delete %s\r\n' % key), b'DELETED\r\n') + + def assertHasKey(self, key): + resp = call('get %s\r\n' % key) + if not resp.startswith(('VALUE %s' % key).encode()): + self.fail('Key \'%s\' should be present, but got: %s' % (key, resp.decode())) + + def assertNoKey(self, key): + resp = call('get %s\r\n' % key) + if resp != b'END\r\n': + self.fail('Key \'%s\' should not be present, but got: %s' % (key, resp.decode())) + + def setKey(self, key): + self.set(key, 'some value') + + def getItemVersion(self, key): + m = re.match(r'VALUE %s \d+ \d+ (?P<version>\d+)' % key, call('gets %s\r\n' % key).decode()) + return int(m.group('version')) + + def getStat(self, name, call_fn=None): + if not call_fn: call_fn = call + resp = call_fn('stats\r\n').decode() + m = re.search(r'STAT %s (?P<value>.+)' % re.escape(name), resp, re.MULTILINE) + return m.group('value') + + def flush(self): + self.assertEqual(call('flush_all\r\n'), b'OK\r\n') + + def tearDown(self): + self.flush() + +class TcpSpecificTests(MemcacheTest): + def test_recovers_from_errors_in_the_stream(self): + with tcp_connection() as conn: + self.assertEqual(conn('get\r\n'), b'ERROR\r\n') + self.assertEqual(conn('get key\r\n'), b'END\r\n') + + def test_incomplete_command_results_in_error(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(server_addr) + s.send(b'get') + s.shutdown(socket.SHUT_WR) + self.assertEqual(recv_all(s), b'ERROR\r\n') + s.close() + + def test_stream_closed_results_in_error(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(server_addr) + s.shutdown(socket.SHUT_WR) + self.assertEqual(recv_all(s), b'') + s.close() + + def test_unsuccesful_parsing_does_not_leave_data_behind(self): + with tcp_connection() as conn: + self.assertEqual(conn('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + self.assertRegex(conn('delete a b c\r\n'), b'^(CLIENT_)?ERROR.*\r\n$') + self.assertEqual(conn('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + self.assertEqual(conn('delete key\r\n'), b'DELETED\r\n') + + def test_flush_all_no_reply(self): + self.assertEqual(call('flush_all noreply\r\n'), b'') + + def test_set_no_reply(self): + self.assertEqual(call('set key 0 0 5 noreply\r\nhello\r\nget key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + self.delete('key') + + def test_delete_no_reply(self): + self.setKey('key') + self.assertEqual(call('delete key noreply\r\nget key\r\n'), b'END\r\n') + + def test_add_no_reply(self): + self.assertEqual(call('add key 0 0 1 noreply\r\na\r\nget key\r\n'), b'VALUE key 0 1\r\na\r\nEND\r\n') + self.delete('key') + + def test_replace_no_reply(self): + self.assertEqual(call('set key 0 0 1\r\na\r\n'), b'STORED\r\n') + self.assertEqual(call('replace key 0 0 1 noreply\r\nb\r\nget key\r\n'), b'VALUE key 0 1\r\nb\r\nEND\r\n') + self.delete('key') + + def test_cas_noreply(self): + self.assertNoKey('key') + self.assertEqual(call('cas key 0 0 1 1 noreply\r\na\r\n'), b'') + self.assertNoKey('key') + + self.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + version = self.getItemVersion('key') + + self.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version + 1)), b'') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + + self.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version)), b'') + self.assertEqual(call('get key\r\n'), b'VALUE key 1 5\r\naloha\r\nEND\r\n') + + self.delete('key') + + @slow + def test_connection_statistics(self): + with tcp_connection() as conn: + curr_connections = int(self.getStat('curr_connections', call_fn=conn)) + total_connections = int(self.getStat('total_connections', call_fn=conn)) + with tcp_connection() as conn2: + self.assertEqual(curr_connections + 1, int(self.getStat('curr_connections', call_fn=conn))) + self.assertEqual(total_connections + 1, int(self.getStat('total_connections', call_fn=conn))) + self.assertEqual(total_connections + 1, int(self.getStat('total_connections', call_fn=conn))) + time.sleep(0.1) + self.assertEqual(curr_connections, int(self.getStat('curr_connections', call_fn=conn))) + +class UdpSpecificTests(MemcacheTest): + def test_large_response_is_split_into_mtu_chunks(self): + max_datagram_size = 1400 + data = '1' * (max_datagram_size*3) + self.set('key', data) + + chunks = list(udp_call_for_fragments('get key\r\n')) + + for chunk in chunks: + self.assertLessEqual(len(chunk), max_datagram_size) + + self.assertEqual(b''.join(chunks).decode(), + 'VALUE key 0 %d\r\n%s\r\n' \ + 'END\r\n' % (len(data), data)) + + self.delete('key') + +class TestCommands(MemcacheTest): + def test_basic_commands(self): + self.assertEqual(call('get key\r\n'), b'END\r\n') + self.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + self.assertEqual(call('delete key\r\n'), b'DELETED\r\n') + self.assertEqual(call('delete key\r\n'), b'NOT_FOUND\r\n') + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_error_handling(self): + self.assertEqual(call('get\r\n'), b'ERROR\r\n') + + @slow + def test_expiry(self): + self.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(2) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + @slow + def test_expiry_at_epoch_time(self): + expiry = int(time.time()) + 1 + self.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(2) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_multiple_keys_in_get(self): + self.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b'STORED\r\n') + self.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b'STORED\r\n') + resp = call('get key1 key\r\n') + self.assertRegex(resp, b'^(VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n)|(VALUE key 0 2\r\nv2\r\nVALUE key1 0 2\r\nv1\r\nEND\r\n)$') + self.delete("key") + self.delete("key1") + + def test_flush_all(self): + self.set('key', 'value') + self.assertEqual(call('flush_all\r\n'), b'OK\r\n') + self.assertNoKey('key') + + def test_keys_set_after_flush_remain(self): + self.assertEqual(call('flush_all\r\n'), b'OK\r\n') + self.setKey('key') + self.assertHasKey('key') + self.delete('key') + + @slow + def test_flush_all_with_timeout_flushes_all_keys_even_those_set_after_flush(self): + self.setKey('key') + self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n') + self.assertHasKey('key') + self.setKey('key2') + time.sleep(3) + self.assertNoKey('key') + self.assertNoKey('key2') + + @slow + def test_subsequent_flush_is_merged(self): + self.setKey('key') + self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n') # Can flush in anything between 1-2 + self.assertEqual(call('flush_all 4\r\n'), b'OK\r\n') # Can flush in anything between 3-4 + time.sleep(3) + self.assertHasKey('key') + self.setKey('key2') + time.sleep(4) + self.assertNoKey('key') + self.assertNoKey('key2') + + @slow + def test_immediate_flush_cancels_delayed_flush(self): + self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n') + self.assertEqual(call('flush_all\r\n'), b'OK\r\n') + self.setKey('key') + time.sleep(1) + self.assertHasKey('key') + self.delete('key') + + @slow + def test_flushing_in_the_past(self): + self.setKey('key1') + time.sleep(1) + self.setKey('key2') + key2_time = int(time.time()) + self.assertEqual(call('flush_all %d\r\n' % (key2_time - 1)), b'OK\r\n') + time.sleep(1) + self.assertNoKey("key1") + self.assertNoKey("key2") + + @slow + def test_memcache_does_not_crash_when_flushing_with_already_expred_items(self): + self.assertEqual(call('set key1 0 2 5\r\nhello\r\n'), b'STORED\r\n') + time.sleep(1) + self.assertEqual(call('flush_all\r\n'), b'OK\r\n') + + def test_response_spanning_many_datagrams(self): + key1_data = '1' * 1000 + key2_data = '2' * 1000 + key3_data = '3' * 1000 + self.set('key1', key1_data) + self.set('key2', key2_data) + self.set('key3', key3_data) + + resp = call('get key1 key2 key3\r\n').decode() + + pattern = '^VALUE (?P<v1>.*?\r\n.*?)\r\nVALUE (?P<v2>.*?\r\n.*?)\r\nVALUE (?P<v3>.*?\r\n.*?)\r\nEND\r\n$' + self.assertRegex(resp, pattern) + + m = re.match(pattern, resp) + self.assertEqual(set([m.group('v1'), m.group('v2'), m.group('v3')]), + set(['key1 0 %d\r\n%s' % (len(key1_data), key1_data), + 'key2 0 %d\r\n%s' % (len(key2_data), key2_data), + 'key3 0 %d\r\n%s' % (len(key3_data), key3_data)])) + + self.delete('key1') + self.delete('key2') + self.delete('key3') + + def test_version(self): + self.assertRegex(call('version\r\n'), b'^VERSION .*\r\n$') + + def test_add(self): + self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'STORED\r\n') + self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'NOT_STORED\r\n') + self.delete('key') + + def test_replace(self): + self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'STORED\r\n') + self.assertEqual(call('replace key 0 0 1\r\na\r\n'), b'STORED\r\n') + self.delete('key') + self.assertEqual(call('replace key 0 0 1\r\na\r\n'), b'NOT_STORED\r\n') + + def test_cas_and_gets(self): + self.assertEqual(call('cas key 0 0 1 1\r\na\r\n'), b'NOT_FOUND\r\n') + self.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + version = self.getItemVersion('key') + + self.assertEqual(call('set key 1 0 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('gets key\r\n').decode(), 'VALUE key 1 5 %d\r\nhello\r\nEND\r\n' % (version + 1)) + + self.assertEqual(call('cas key 0 0 5 %d\r\nhello\r\n' % (version)), b'EXISTS\r\n') + self.assertEqual(call('cas key 0 0 5 %d\r\naloha\r\n' % (version + 1)), b'STORED\r\n') + self.assertEqual(call('gets key\r\n').decode(), 'VALUE key 0 5 %d\r\naloha\r\nEND\r\n' % (version + 2)) + + self.delete('key') + + def test_curr_items_stat(self): + self.assertEqual(0, int(self.getStat('curr_items'))) + self.setKey('key') + self.assertEqual(1, int(self.getStat('curr_items'))) + self.delete('key') + self.assertEqual(0, int(self.getStat('curr_items'))) + + def test_how_stats_change_with_different_commands(self): + get_count = int(self.getStat('cmd_get')) + set_count = int(self.getStat('cmd_set')) + flush_count = int(self.getStat('cmd_flush')) + total_items = int(self.getStat('total_items')) + get_misses = int(self.getStat('get_misses')) + get_hits = int(self.getStat('get_hits')) + cas_hits = int(self.getStat('cas_hits')) + cas_badval = int(self.getStat('cas_badval')) + cas_misses = int(self.getStat('cas_misses')) + delete_misses = int(self.getStat('delete_misses')) + delete_hits = int(self.getStat('delete_hits')) + curr_connections = int(self.getStat('curr_connections')) + incr_hits = int(self.getStat('incr_hits')) + incr_misses = int(self.getStat('incr_misses')) + decr_hits = int(self.getStat('decr_hits')) + decr_misses = int(self.getStat('decr_misses')) + + call('get key\r\n') + get_count += 1 + get_misses += 1 + + call('gets key\r\n') + get_count += 1 + get_misses += 1 + + call('set key1 0 0 1\r\na\r\n') + set_count += 1 + total_items += 1 + + call('get key1\r\n') + get_count += 1 + get_hits += 1 + + call('add key1 0 0 1\r\na\r\n') + set_count += 1 + + call('add key2 0 0 1\r\na\r\n') + set_count += 1 + total_items += 1 + + call('replace key1 0 0 1\r\na\r\n') + set_count += 1 + total_items += 1 + + call('replace key3 0 0 1\r\na\r\n') + set_count += 1 + + call('cas key4 0 0 1 1\r\na\r\n') + set_count += 1 + cas_misses += 1 + + call('cas key1 0 0 1 %d\r\na\r\n' % self.getItemVersion('key1')) + set_count += 1 + get_count += 1 + get_hits += 1 + cas_hits += 1 + total_items += 1 + + call('cas key1 0 0 1 %d\r\na\r\n' % (self.getItemVersion('key1') + 1)) + set_count += 1 + get_count += 1 + get_hits += 1 + cas_badval += 1 + + call('delete key1\r\n') + delete_hits += 1 + + call('delete key1\r\n') + delete_misses += 1 + + call('incr num 1\r\n') + incr_misses += 1 + call('decr num 1\r\n') + decr_misses += 1 + + call('set num 0 0 1\r\n0\r\n') + set_count += 1 + total_items += 1 + + call('incr num 1\r\n') + incr_hits += 1 + call('decr num 1\r\n') + decr_hits += 1 + + self.flush() + flush_count += 1 + + self.assertEqual(get_count, int(self.getStat('cmd_get'))) + self.assertEqual(set_count, int(self.getStat('cmd_set'))) + self.assertEqual(flush_count, int(self.getStat('cmd_flush'))) + self.assertEqual(total_items, int(self.getStat('total_items'))) + self.assertEqual(get_hits, int(self.getStat('get_hits'))) + self.assertEqual(get_misses, int(self.getStat('get_misses'))) + self.assertEqual(cas_misses, int(self.getStat('cas_misses'))) + self.assertEqual(cas_hits, int(self.getStat('cas_hits'))) + self.assertEqual(cas_badval, int(self.getStat('cas_badval'))) + self.assertEqual(delete_misses, int(self.getStat('delete_misses'))) + self.assertEqual(delete_hits, int(self.getStat('delete_hits'))) + self.assertEqual(0, int(self.getStat('curr_items'))) + self.assertEqual(curr_connections, int(self.getStat('curr_connections'))) + self.assertEqual(incr_misses, int(self.getStat('incr_misses'))) + self.assertEqual(incr_hits, int(self.getStat('incr_hits'))) + self.assertEqual(decr_misses, int(self.getStat('decr_misses'))) + self.assertEqual(decr_hits, int(self.getStat('decr_hits'))) + + def test_incr(self): + self.assertEqual(call('incr key 0\r\n'), b'NOT_FOUND\r\n') + + self.assertEqual(call('set key 0 0 1\r\n0\r\n'), b'STORED\r\n') + self.assertEqual(call('incr key 0\r\n'), b'0\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 1\r\n0\r\nEND\r\n') + + self.assertEqual(call('incr key 1\r\n'), b'1\r\n') + self.assertEqual(call('incr key 2\r\n'), b'3\r\n') + self.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 1)), b'2\r\n') + self.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 3)), b'18446744073709551615\r\n') + self.assertRegex(call('incr key 1\r\n').decode(), r'0(\w+)?\r\n') + + self.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b'STORED\r\n') + self.assertEqual(call('incr key 1\r\n'), b'2\r\n') + + self.assertEqual(call('set key 0 0 2\r\n09\r\n'), b'STORED\r\n') + self.assertEqual(call('incr key 1\r\n'), b'10\r\n') + + def test_decr(self): + self.assertEqual(call('decr key 0\r\n'), b'NOT_FOUND\r\n') + + self.assertEqual(call('set key 0 0 1\r\n7\r\n'), b'STORED\r\n') + self.assertEqual(call('decr key 1\r\n'), b'6\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 1\r\n6\r\nEND\r\n') + + self.assertEqual(call('decr key 6\r\n'), b'0\r\n') + self.assertEqual(call('decr key 2\r\n'), b'0\r\n') + + self.assertEqual(call('set key 0 0 2\r\n20\r\n'), b'STORED\r\n') + self.assertRegex(call('decr key 11\r\n').decode(), r'^9( )?\r\n$') + + self.assertEqual(call('set key 0 0 3\r\n100\r\n'), b'STORED\r\n') + self.assertRegex(call('decr key 91\r\n').decode(), r'^9( )?\r\n$') + + self.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b'STORED\r\n') + self.assertEqual(call('decr key 1\r\n'), b'0\r\n') + + self.assertEqual(call('set key 0 0 2\r\n09\r\n'), b'STORED\r\n') + self.assertEqual(call('decr key 1\r\n'), b'8\r\n') + + def test_incr_and_decr_on_invalid_input(self): + error_msg = b'CLIENT_ERROR cannot increment or decrement non-numeric value\r\n' + for cmd in ['incr', 'decr']: + for value in ['', '-1', 'a', '0x1', '18446744073709551616']: + self.assertEqual(call('set key 0 0 %d\r\n%s\r\n' % (len(value), value)), b'STORED\r\n') + prev = call('get key\r\n') + self.assertEqual(call(cmd + ' key 1\r\n'), error_msg, "cmd=%s, value=%s" % (cmd, value)) + self.assertEqual(call('get key\r\n'), prev) + self.delete('key') + +def wait_for_memcache_tcp(timeout=4): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + timeout_at = time.time() + timeout + while True: + if time.time() >= timeout_at: + raise TimeoutError() + try: + s.connect(server_addr) + s.close() + break + except ConnectionRefusedError: + time.sleep(0.1) + + +def wait_for_memcache_udp(timeout=4): + timeout_at = time.time() + timeout + while True: + if time.time() >= timeout_at: + raise TimeoutError() + try: + udp_call('version\r\n', timeout=0.2) + break + except socket.timeout: + pass + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="memcache protocol tests") + parser.add_argument('--server', '-s', action="store", help="server adddress in <host>:<port> format", default="localhost:11211") + parser.add_argument('--udp', '-U', action="store_true", help="Use UDP protocol") + parser.add_argument('--fast', action="store_true", help="Run only fast tests") + args = parser.parse_args() + + host, port = args.server.split(':') + server_addr = (host, int(port)) + + if args.udp: + call = udp_call + wait_for_memcache_udp() + else: + call = tcp_call + wait_for_memcache_tcp() + + runner = unittest.TextTestRunner() + loader = unittest.TestLoader() + suite = unittest.TestSuite() + suite.addTest(loader.loadTestsFromTestCase(TestCommands)) + if args.udp: + suite.addTest(loader.loadTestsFromTestCase(UdpSpecificTests)) + else: + suite.addTest(loader.loadTestsFromTestCase(TcpSpecificTests)) + result = runner.run(suite) + if not result.wasSuccessful(): + sys.exit(1) diff --git a/src/seastar/apps/seawreck/CMakeLists.txt b/src/seastar/apps/seawreck/CMakeLists.txt new file mode 100644 index 00000000..f9ffd357 --- /dev/null +++ b/src/seastar/apps/seawreck/CMakeLists.txt @@ -0,0 +1,24 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +seastar_add_app (seawreck + SOURCES seawreck.cc) diff --git a/src/seastar/apps/seawreck/seawreck.cc b/src/seastar/apps/seawreck/seawreck.cc new file mode 100644 index 00000000..61066956 --- /dev/null +++ b/src/seastar/apps/seawreck/seawreck.cc @@ -0,0 +1,225 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include <seastar/http/response_parser.hh> +#include <seastar/core/print.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/semaphore.hh> +#include <seastar/core/future-util.hh> +#include <chrono> + +using namespace seastar; + +template <typename... Args> +void http_debug(const char* fmt, Args&&... args) { +#if HTTP_DEBUG + print(fmt, std::forward<Args>(args)...); +#endif +} + +class http_client { +private: + unsigned _duration; + unsigned _conn_per_core; + unsigned _reqs_per_conn; + std::vector<connected_socket> _sockets; + semaphore _conn_connected{0}; + semaphore _conn_finished{0}; + timer<> _run_timer; + bool _timer_based; + bool _timer_done{false}; + uint64_t _total_reqs{0}; +public: + http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn) + : _duration(duration) + , _conn_per_core(total_conn / smp::count) + , _reqs_per_conn(reqs_per_conn) + , _run_timer([this] { _timer_done = true; }) + , _timer_based(reqs_per_conn == 0) { + } + + class connection { + private: + connected_socket _fd; + input_stream<char> _read_buf; + output_stream<char> _write_buf; + http_response_parser _parser; + http_client* _http_client; + uint64_t _nr_done{0}; + public: + connection(connected_socket&& fd, http_client* client) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) + , _http_client(client){ + } + + uint64_t nr_done() { + return _nr_done; + } + + future<> do_req() { + return _write_buf.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] { + return _write_buf.flush(); + }).then([this] { + _parser.init(); + return _read_buf.consume(_parser).then([this] { + // Read HTTP response header first + if (_parser.eof()) { + return make_ready_future<>(); + } + auto _rsp = _parser.get_parsed_response(); + auto it = _rsp->_headers.find("Content-Length"); + if (it == _rsp->_headers.end()) { + fmt::print("Error: HTTP response does not contain: Content-Length\n"); + return make_ready_future<>(); + } + auto content_len = std::stoi(it->second); + http_debug("Content-Length = %d\n", content_len); + // Read HTTP response body + return _read_buf.read_exactly(content_len).then([this] (temporary_buffer<char> buf) { + _nr_done++; + http_debug("%s\n", buf.get()); + if (_http_client->done(_nr_done)) { + return make_ready_future(); + } else { + return do_req(); + } + }); + }); + }); + } + }; + + future<uint64_t> total_reqs() { + fmt::print("Requests on cpu {:2d}: {:d}\n", engine().cpu_id(), _total_reqs); + return make_ready_future<uint64_t>(_total_reqs); + } + + bool done(uint64_t nr_done) { + if (_timer_based) { + return _timer_done; + } else { + return nr_done >= _reqs_per_conn; + } + } + + future<> connect(ipv4_addr server_addr) { + // Establish all the TCP connections first + for (unsigned i = 0; i < _conn_per_core; i++) { + engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) { + _sockets.push_back(std::move(fd)); + http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id()); + _conn_connected.signal(); + }).or_terminate(); + } + return _conn_connected.wait(_conn_per_core); + } + + future<> run() { + // All connected, start HTTP request + http_debug("Established all %6d tcp connections on cpu %3d\n", _conn_per_core, engine().cpu_id()); + if (_timer_based) { + _run_timer.arm(std::chrono::seconds(_duration)); + } + for (auto&& fd : _sockets) { + auto conn = new connection(std::move(fd), this); + conn->do_req().then_wrapped([this, conn] (auto&& f) { + http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id()); + _total_reqs += conn->nr_done(); + _conn_finished.signal(); + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + fmt::print("http request error: {}\n", ex.what()); + } + }); + } + + // All finished + return _conn_finished.wait(_conn_per_core); + } + future<> stop() { + return make_ready_future(); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("server,s", bpo::value<std::string>()->default_value("192.168.66.100:10000"), "Server address") + ("conn,c", bpo::value<unsigned>()->default_value(100), "total connections") + ("reqs,r", bpo::value<unsigned>()->default_value(0), "reqs per connection") + ("duration,d", bpo::value<unsigned>()->default_value(10), "duration of the test in seconds)"); + + return app.run(ac, av, [&app] () -> future<int> { + auto& config = app.configuration(); + auto server = config["server"].as<std::string>(); + auto reqs_per_conn = config["reqs"].as<unsigned>(); + auto total_conn= config["conn"].as<unsigned>(); + auto duration = config["duration"].as<unsigned>(); + + if (total_conn % smp::count != 0) { + fmt::print("Error: conn needs to be n * cpu_nr\n"); + return make_ready_future<int>(-1); + } + + auto http_clients = new distributed<http_client>; + + // Start http requests on all the cores + auto started = steady_clock_type::now(); + fmt::print("========== http_client ============\n"); + fmt::print("Server: {}\n", server); + fmt::print("Connections: {:d}\n", total_conn); + fmt::print("Requests/connection: {}\n", reqs_per_conn == 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn)); + return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn)).then([http_clients, server] { + return http_clients->invoke_on_all(&http_client::connect, ipv4_addr{server}); + }).then([http_clients] { + return http_clients->invoke_on_all(&http_client::run); + }).then([http_clients] { + return http_clients->map_reduce(adder<uint64_t>(), &http_client::total_reqs); + }).then([http_clients, started] (auto total_reqs) { + // All the http requests are finished + auto finished = steady_clock_type::now(); + auto elapsed = finished - started; + auto secs = static_cast<double>(elapsed.count() / 1000000000.0); + fmt::print("Total cpus: {:d}\n", smp::count); + fmt::print("Total requests: {:d}\n", total_reqs); + fmt::print("Total time: {:f}\n", secs); + fmt::print("Requests/sec: {:f}\n", static_cast<double>(total_reqs) / secs); + fmt::print("========== done ============\n"); + return http_clients->stop().then([http_clients] { + // FIXME: If we call engine().exit(0) here to exit when + // requests are done. The tcp connection will not be closed + // properly, becasue we exit too earily and the FIN packets are + // not exchanged. + delete http_clients; + return make_ready_future<int>(0); + }); + }); + }); +} |