summaryrefslogtreecommitdiffstats
path: root/src/seastar/apps
diff options
context:
space:
mode:
Diffstat (limited to 'src/seastar/apps')
-rw-r--r--src/seastar/apps/CMakeLists.txt56
-rw-r--r--src/seastar/apps/httpd/CMakeLists.txt37
-rw-r--r--src/seastar/apps/httpd/demo.json73
-rw-r--r--src/seastar/apps/httpd/main.cc91
-rw-r--r--src/seastar/apps/io_tester/CMakeLists.txt27
-rw-r--r--src/seastar/apps/io_tester/conf.yaml25
-rw-r--r--src/seastar/apps/io_tester/io_tester.cc612
-rw-r--r--src/seastar/apps/iotune/CMakeLists.txt29
-rw-r--r--src/seastar/apps/iotune/iotune.cc694
-rw-r--r--src/seastar/apps/memcached/CMakeLists.txt49
-rw-r--r--src/seastar/apps/memcached/ascii.rl154
-rw-r--r--src/seastar/apps/memcached/memcache.cc1464
-rw-r--r--src/seastar/apps/memcached/memcached.hh74
-rw-r--r--src/seastar/apps/memcached/tests/CMakeLists.txt75
-rwxr-xr-xsrc/seastar/apps/memcached/tests/test.py49
-rw-r--r--src/seastar/apps/memcached/tests/test_ascii_parser.cc335
-rwxr-xr-xsrc/seastar/apps/memcached/tests/test_memcached.py600
-rw-r--r--src/seastar/apps/seawreck/CMakeLists.txt24
-rw-r--r--src/seastar/apps/seawreck/seawreck.cc225
19 files changed, 4693 insertions, 0 deletions
diff --git a/src/seastar/apps/CMakeLists.txt b/src/seastar/apps/CMakeLists.txt
new file mode 100644
index 00000000..26e0e03b
--- /dev/null
+++ b/src/seastar/apps/CMakeLists.txt
@@ -0,0 +1,56 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+# Logical target for all applications.
+add_custom_target (apps)
+
+macro (seastar_add_app name)
+ set (args ${ARGN})
+
+ cmake_parse_arguments (
+ parsed_args
+ ""
+ ""
+ "SOURCES"
+ ${args})
+
+ set (target app_${name})
+ add_executable (${target} ${parsed_args_SOURCES})
+
+ target_include_directories (${target}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
+
+ target_link_libraries (${target}
+ PRIVATE seastar_with_flags)
+
+ set_target_properties (${target}
+ PROPERTIES
+ OUTPUT_NAME ${name})
+
+ add_dependencies (apps ${target})
+endmacro ()
+
+add_subdirectory (httpd)
+add_subdirectory (io_tester)
+add_subdirectory (iotune)
+add_subdirectory (memcached)
+add_subdirectory (seawreck)
diff --git a/src/seastar/apps/httpd/CMakeLists.txt b/src/seastar/apps/httpd/CMakeLists.txt
new file mode 100644
index 00000000..d8c48197
--- /dev/null
+++ b/src/seastar/apps/httpd/CMakeLists.txt
@@ -0,0 +1,37 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+seastar_generate_swagger (
+ TARGET app_httpd_swagger
+ VAR app_httpd_swagger_file
+ IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/demo.json
+ OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/demo.json.hh)
+
+seastar_add_app (httpd
+ SOURCES
+ ${app_httpd_swagger_file}
+ main.cc)
+
+target_include_directories (app_httpd
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
+
+add_dependencies (app_httpd app_httpd_swagger)
diff --git a/src/seastar/apps/httpd/demo.json b/src/seastar/apps/httpd/demo.json
new file mode 100644
index 00000000..12261c45
--- /dev/null
+++ b/src/seastar/apps/httpd/demo.json
@@ -0,0 +1,73 @@
+{
+ "apiVersion": "0.0.1",
+ "swaggerVersion": "1.2",
+ "basePath": "{{Protocol}}://{{Host}}",
+ "resourcePath": "/hello",
+ "produces": [
+ "application/json"
+ ],
+ "apis": [
+ {
+ "path": "/hello/world/{var1}/{var2}",
+ "operations": [
+ {
+ "method": "GET",
+ "summary": "Returns the number of seconds since the system was booted",
+ "type": "long",
+ "nickname": "hello_world",
+ "produces": [
+ "application/json"
+ ],
+ "parameters": [
+ {
+ "name":"var2",
+ "description":"Full path of file or directory",
+ "required":true,
+ "allowMultiple":true,
+ "type":"string",
+ "paramType":"path"
+ },
+ {
+ "name":"var1",
+ "description":"Full path of file or directory",
+ "required":true,
+ "allowMultiple":false,
+ "type":"string",
+ "paramType":"path"
+ },
+ {
+ "name":"query_enum",
+ "description":"The operation to perform",
+ "required":true,
+ "allowMultiple":false,
+ "type":"string",
+ "paramType":"query",
+ "enum":["VAL1", "VAL2", "VAL3"]
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "models" : {
+ "my_object": {
+ "id": "my_object",
+ "description": "Demonstrate an object",
+ "properties": {
+ "var1": {
+ "type": "string",
+ "description": "The first parameter in the path"
+ },
+ "var2": {
+ "type": "string",
+ "description": "The second parameter in the path"
+ },
+ "enum_var" : {
+ "type": "string",
+ "description": "Demonstrate an enum returned, note this is not the same enum type of the request",
+ "enum":["VAL1", "VAL2", "VAL3"]
+ }
+ }
+ }
+ }
+}
diff --git a/src/seastar/apps/httpd/main.cc b/src/seastar/apps/httpd/main.cc
new file mode 100644
index 00000000..62af9434
--- /dev/null
+++ b/src/seastar/apps/httpd/main.cc
@@ -0,0 +1,91 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+
+#include <seastar/http/httpd.hh>
+#include <seastar/http/handlers.hh>
+#include <seastar/http/function_handlers.hh>
+#include <seastar/http/file_handler.hh>
+#include "demo.json.hh"
+#include <seastar/http/api_docs.hh>
+
+namespace bpo = boost::program_options;
+
+using namespace seastar;
+using namespace httpd;
+
+class handl : public httpd::handler_base {
+public:
+ virtual future<std::unique_ptr<reply> > handle(const sstring& path,
+ std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
+ rep->_content = "hello";
+ rep->done("html");
+ return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
+ }
+};
+
+void set_routes(routes& r) {
+ function_handler* h1 = new function_handler([](const_req req) {
+ return "hello";
+ });
+ function_handler* h2 = new function_handler([](std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>("json-future");
+ });
+ r.add(operation_type::GET, url("/"), h1);
+ r.add(operation_type::GET, url("/jf"), h2);
+ r.add(operation_type::GET, url("/file").remainder("path"),
+ new directory_handler("/"));
+ demo_json::hello_world.set(r, [] (const_req req) {
+ demo_json::my_object obj;
+ obj.var1 = req.param.at("var1");
+ obj.var2 = req.param.at("var2");
+ demo_json::ns_hello_world::query_enum v = demo_json::ns_hello_world::str2query_enum(req.query_parameters.at("query_enum"));
+ // This demonstrate enum conversion
+ obj.enum_var = v;
+ return obj;
+ });
+}
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()("port", bpo::value<uint16_t>()->default_value(10000),
+ "HTTP Server port");
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto server = new http_server_control();
+ auto rb = make_shared<api_registry_builder>("apps/httpd/");
+ server->start().then([server] {
+ return server->set_routes(set_routes);
+ }).then([server, rb]{
+ return server->set_routes([rb](routes& r){rb->set_api_doc(r);});
+ }).then([server, rb]{
+ return server->set_routes([rb](routes& r) {rb->register_function(r, "demo", "hello world application");});
+ }).then([server, port] {
+ return server->listen(port);
+ }).then([server, port] {
+ std::cout << "Seastar HTTP server listening on port " << port << " ...\n";
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ });
+
+ });
+}
diff --git a/src/seastar/apps/io_tester/CMakeLists.txt b/src/seastar/apps/io_tester/CMakeLists.txt
new file mode 100644
index 00000000..266ff4e5
--- /dev/null
+++ b/src/seastar/apps/io_tester/CMakeLists.txt
@@ -0,0 +1,27 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+seastar_add_app (io_tester
+ SOURCES io_tester.cc)
+
+target_link_libraries (app_io_tester
+ PRIVATE yaml-cpp::yaml-cpp)
diff --git a/src/seastar/apps/io_tester/conf.yaml b/src/seastar/apps/io_tester/conf.yaml
new file mode 100644
index 00000000..e8b8091c
--- /dev/null
+++ b/src/seastar/apps/io_tester/conf.yaml
@@ -0,0 +1,25 @@
+- name: big_writes
+ shards: all
+ type: seqwrite
+ shard_info:
+ parallelism: 10
+ reqsize: 256kB
+ shares: 10
+ think_time: 0
+
+- name: latency_reads
+ shards: [0]
+ type: randread
+ shard_info:
+ parallelism: 1
+ reqsize: 512
+ shares: 100
+ think_time: 1000us
+
+- name: cpu_hog
+ shards: [0]
+ type: cpu
+ shard_info:
+ parallelim: 1
+ execution_time: 90us
+ think_time: 10us
diff --git a/src/seastar/apps/io_tester/io_tester.cc b/src/seastar/apps/io_tester/io_tester.cc
new file mode 100644
index 00000000..16b64d38
--- /dev/null
+++ b/src/seastar/apps/io_tester/io_tester.cc
@@ -0,0 +1,612 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+#include <seastar/core/app-template.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/align.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/thread.hh>
+#include <chrono>
+#include <vector>
+#include <boost/range/irange.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/p_square_quantile.hpp>
+#include <boost/accumulators/statistics/extended_p_square.hpp>
+#include <boost/accumulators/statistics/extended_p_square_quantile.hpp>
+#include <boost/range/adaptor/filtered.hpp>
+#include <boost/range/adaptor/map.hpp>
+#include <boost/array.hpp>
+#include <iomanip>
+#include <random>
+#include <yaml-cpp/yaml.h>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+using namespace boost::accumulators;
+
+static auto random_seed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
+static std::default_random_engine random_generator(random_seed);
+// size of each individual file. Every class will have its file, so in a normal system with many shards, we'll naturally have many files and
+// that will push the data out of the disk's cache. And static sizes per file are simpler.
+static constexpr uint64_t file_data_size = 1ull << 30;
+
+struct context;
+enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu };
+
+namespace std {
+
+template <>
+struct hash<request_type> {
+ size_t operator() (const request_type& type) const {
+ return static_cast<size_t>(type);
+ }
+};
+
+}
+
+struct byte_size {
+ uint64_t size;
+};
+
+struct duration_time {
+ std::chrono::duration<float> time;
+};
+
+class shard_config {
+ std::unordered_set<unsigned> _shards;
+public:
+ shard_config()
+ : _shards(boost::copy_range<std::unordered_set<unsigned>>(boost::irange(0u, smp::count))) {}
+ shard_config(std::unordered_set<unsigned> s) : _shards(std::move(s)) {}
+
+ bool is_set(unsigned cpu) const {
+ return _shards.count(cpu);
+ }
+};
+
+struct shard_info {
+ unsigned parallelism = 10;
+ unsigned shares = 10;
+ uint64_t request_size = 4 << 10;
+ std::chrono::duration<float> think_time = 0ms;
+ std::chrono::duration<float> execution_time = 1ms;
+ seastar::scheduling_group scheduling_group = seastar::default_scheduling_group();
+};
+
+class class_data;
+
+struct job_config {
+ std::string name;
+ request_type type;
+ shard_config shard_placement;
+ ::shard_info shard_info;
+ std::unique_ptr<class_data> gen_class_data();
+};
+
+std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999};
+
+class class_data {
+protected:
+ using accumulator_type = accumulator_set<double, stats<tag::extended_p_square_quantile(quadratic), tag::mean, tag::max>>;
+
+ job_config _config;
+ uint64_t _alignment;
+ uint64_t _last_pos = 0;
+
+ io_priority_class _iop;
+ seastar::scheduling_group _sg;
+
+ size_t _data = 0;
+ std::chrono::duration<float> _total_duration;
+
+ std::chrono::steady_clock::time_point _start = {};
+ accumulator_type _latencies;
+ std::uniform_int_distribution<uint32_t> _pos_distribution;
+ file _file;
+
+ virtual future<> do_start(sstring dir) = 0;
+ virtual future<size_t> issue_request(char *buf) = 0;
+public:
+ static int idgen();
+ class_data(job_config cfg)
+ : _config(std::move(cfg))
+ , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512)
+ , _iop(engine().register_one_priority_class(format("test-class-{:d}", idgen()), _config.shard_info.shares))
+ , _sg(cfg.shard_info.scheduling_group)
+ , _latencies(extended_p_square_probabilities = quantiles)
+ , _pos_distribution(0, file_data_size / _config.shard_info.request_size)
+ {}
+
+ future<> issue_requests(std::chrono::steady_clock::time_point stop) {
+ _start = std::chrono::steady_clock::now();
+ return with_scheduling_group(_sg, [this, stop] {
+ return parallel_for_each(boost::irange(0u, parallelism()), [this, stop] (auto dummy) mutable {
+ auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment);
+ auto buf = bufptr.get();
+ return do_until([this, stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable {
+ auto start = std::chrono::steady_clock::now();
+ return issue_request(buf).then([this, start, stop] (auto size) {
+ auto now = std::chrono::steady_clock::now();
+ if (now < stop) {
+ this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
+ }
+ return think();
+ });
+ }).finally([bufptr = std::move(bufptr)] {});
+ });
+ }).then([this] {
+ _total_duration = std::chrono::steady_clock::now() - _start;
+ });
+ }
+
+ future<> think() {
+ if (_config.shard_info.think_time > 0us) {
+ return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time));
+ } else {
+ return make_ready_future<>();
+ }
+ }
+ // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting
+ // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate
+ // this file differently depending on the type:
+ //
+ // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF
+ // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF
+ // random reads : will read the file at random positions, between 0 and EOF
+ // random writes : will overwrite the file at a random position, between 0 and EOF
+ // append : will write to the file from pos = EOF onwards, always appending to the end.
+ // cpu : CPU-only load, file is not created.
+ future<> start(sstring dir) {
+ return do_start(dir);
+ }
+protected:
+ sstring type_str() const {
+ return std::unordered_map<request_type, sstring>{
+ { request_type::seqread, "SEQ READ" },
+ { request_type::seqwrite, "SEQ WRITE" },
+ { request_type::randread, "RAND READ" },
+ { request_type::randwrite, "RAND WRITE" },
+ { request_type::append , "APPEND" },
+ { request_type::cpu , "CPU" },
+ }[_config.type];;
+ }
+
+ const sstring name() const {
+ return _config.name;
+ }
+
+ request_type req_type() const {
+ return _config.type;
+ }
+
+ sstring think_time() const {
+ if (_config.shard_info.think_time == std::chrono::duration<float>(0)) {
+ return "NO think time";
+ } else {
+ return format("{:d} us think time", std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time).count());
+ }
+ }
+
+ size_t req_size() const {
+ return _config.shard_info.request_size;
+ }
+
+ unsigned parallelism() const {
+ return _config.shard_info.parallelism;
+ }
+
+ unsigned shares() const {
+ return _config.shard_info.shares;
+ }
+
+ std::chrono::duration<float> total_duration() const {
+ return _total_duration;
+ }
+
+ uint64_t total_data() const {
+ return _data;
+ }
+
+ uint64_t max_latency() const {
+ return max(_latencies);
+ }
+
+ uint64_t average_latency() const {
+ return mean(_latencies);
+ }
+
+ uint64_t quantile_latency(double q) const {
+ return quantile(_latencies, quantile_probability = q);
+ }
+
+ bool is_sequential() const {
+ return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite);
+ }
+ bool is_random() const {
+ return (req_type() == request_type::randread) || (req_type() == request_type::randwrite);
+ }
+
+ uint64_t get_pos() {
+ uint64_t pos;
+ if (is_random()) {
+ pos = _pos_distribution(random_generator) * req_size();
+ } else {
+ pos = _last_pos + req_size();
+ if (is_sequential() && (pos >= file_data_size)) {
+ pos = 0;
+ }
+ }
+ _last_pos = pos;
+ return pos;
+ }
+
+ void add_result(size_t data, std::chrono::microseconds latency) {
+ _data += data;
+ _latencies(latency.count());
+ }
+
+public:
+ virtual sstring describe_class() = 0;
+ virtual sstring describe_results() = 0;
+};
+
+class io_class_data : public class_data {
+public:
+ io_class_data(job_config cfg) : class_data(std::move(cfg)) {}
+
+ future<> do_start(sstring dir) override {
+ auto fname = format("{}/test-{}-{:d}", dir, name(), engine().cpu_id());
+ return open_file_dma(fname, open_flags::rw | open_flags::create | open_flags::truncate).then([this, fname] (auto f) {
+ _file = f;
+ return remove_file(fname);
+ }).then([this, fname] {
+ return do_with(seastar::semaphore(64), [this] (auto& write_parallelism) mutable {
+ auto bufsize = 256ul << 10;
+ auto pos = boost::irange(0ul, (file_data_size / bufsize) + 1);
+ return parallel_for_each(pos.begin(), pos.end(), [this, bufsize, &write_parallelism] (auto pos) mutable {
+ return get_units(write_parallelism, 1).then([this, bufsize, pos] (auto perm) mutable {
+ auto bufptr = allocate_aligned_buffer<char>(bufsize, 4096);
+ auto buf = bufptr.get();
+ std::uniform_int_distribution<char> fill('@', '~');
+ memset(buf, fill(random_generator), bufsize);
+ pos = pos * bufsize;
+ return _file.dma_write(pos, buf, bufsize).finally([this, bufsize, bufptr = std::move(bufptr), perm = std::move(perm), pos] {
+ if ((this->req_type() == request_type::append) && (pos > _last_pos)) {
+ _last_pos = pos;
+ }
+ }).discard_result();
+ });
+ });
+ });
+ }).then([this] {
+ return _file.flush();
+ });
+ }
+
+ virtual sstring describe_class() override {
+ return fmt::format("{}: {} shares, {}-byte {}, {} concurrent requests, {}", name(), shares(), req_size(), type_str(), parallelism(), think_time());
+ }
+
+ virtual sstring describe_results() override {
+ auto throughput_kbs = (total_data() >> 10) / total_duration().count();
+ sstring result;
+ result += fmt::format(" Throughput : {:>8} KB/s\n", throughput_kbs);
+ result += fmt::format(" Lat average : {:>8} usec\n", average_latency());
+ for (auto& q: quantiles) {
+ result += fmt::format(" Lat quantile={:>5} : {:>8} usec\n", q, quantile_latency(q));
+ }
+ result += fmt::format(" Lat max : {:>8} usec\n", max_latency());
+ return result;
+ }
+};
+
+class read_io_class_data : public io_class_data {
+public:
+ read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
+
+ future<size_t> issue_request(char *buf) override {
+ return _file.dma_read(this->get_pos(), buf, this->req_size(), _iop);
+ }
+};
+
+class write_io_class_data : public io_class_data {
+public:
+ write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
+
+ future<size_t> issue_request(char *buf) override {
+ return _file.dma_write(this->get_pos(), buf, this->req_size(), _iop);
+ }
+};
+
+class cpu_class_data : public class_data {
+public:
+ cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {}
+
+ future<> do_start(sstring dir) override {
+ return make_ready_future<>();
+ }
+
+ future<size_t> issue_request(char *buf) override {
+ // We do want the execution time to be a busy loop, and not just a bunch of
+ // continuations until our time is up: by doing this we can also simulate the behavior
+ // of I/O continuations in the face of reactor stalls.
+ auto start = std::chrono::steady_clock::now();
+ do {
+ } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time);
+ return make_ready_future<size_t>(1);
+ }
+
+ virtual sstring describe_class() override {
+ auto exec = std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.execution_time);
+ return fmt::format("{}: {} shares, {} us CPU execution time, {} concurrent requests, {}", name(), shares(), exec.count(), parallelism(), think_time());
+ }
+
+ virtual sstring describe_results() override {
+ auto throughput = total_data() / total_duration().count();
+ return fmt::format(" Throughput : {:>8} continuations/s\n", throughput);
+ }
+};
+
+std::unique_ptr<class_data> job_config::gen_class_data() {
+ if (type == request_type::cpu) {
+ return std::make_unique<cpu_class_data>(*this);
+ } else if ((type == request_type::seqread) || (type == request_type::randread)) {
+ return std::make_unique<read_io_class_data>(*this);
+ } else {
+ return std::make_unique<write_io_class_data>(*this);
+ }
+}
+
+/// YAML parsing functions
+namespace YAML {
+template<>
+struct convert<byte_size> {
+ static bool decode(const Node& node, byte_size& bs) {
+ auto str = node.as<std::string>();
+ unsigned shift = 0;
+ if (str.back() == 'B') {
+ str.pop_back();
+ shift = std::unordered_map<char, unsigned>{
+ { 'k', 10 },
+ { 'M', 20 },
+ { 'G', 30 },
+ }[str.back()];
+ str.pop_back();
+ }
+ bs.size = (boost::lexical_cast<size_t>(str) << shift);
+ return bs.size >= 512;
+ }
+};
+
+template<>
+struct convert<duration_time> {
+ static bool decode(const Node& node, duration_time& dt) {
+ auto str = node.as<std::string>();
+ if (str == "0") {
+ dt.time = 0ns;
+ return true;
+ }
+ if (str.back() != 's') {
+ return false;
+ }
+ str.pop_back();
+ std::unordered_map<char, std::chrono::duration<float>> unit = {
+ { 'n', 1ns },
+ { 'u', 1us },
+ { 'm', 1ms },
+ };
+
+ if (unit.count(str.back())) {
+ auto u = str.back();
+ str.pop_back();
+ dt.time = (boost::lexical_cast<size_t>(str) * unit[u]);
+ } else {
+ dt.time = (boost::lexical_cast<size_t>(str) * 1s);
+ }
+ return true;
+ }
+};
+
+template<>
+struct convert<shard_config> {
+ static bool decode(const Node& node, shard_config& shards) {
+ try {
+ auto str = node.as<std::string>();
+ return (str == "all");
+ } catch (YAML::TypedBadConversion<std::string>& e) {
+ shards = shard_config(boost::copy_range<std::unordered_set<unsigned>>(node.as<std::vector<unsigned>>()));
+ return true;
+ }
+ return false;
+ }
+};
+
+template<>
+struct convert<request_type> {
+ static bool decode(const Node& node, request_type& rt) {
+ static std::unordered_map<std::string, request_type> mappings = {
+ { "seqread", request_type::seqread },
+ { "seqwrite", request_type::seqwrite},
+ { "randread", request_type::randread },
+ { "randwrite", request_type::randwrite },
+ { "append", request_type::append},
+ { "cpu", request_type::cpu},
+ };
+ auto reqstr = node.as<std::string>();
+ if (!mappings.count(reqstr)) {
+ return false;
+ }
+ rt = mappings[reqstr];
+ return true;
+ }
+};
+
+template<>
+struct convert<shard_info> {
+ static bool decode(const Node& node, shard_info& sl) {
+ if (node["parallelism"]) {
+ sl.parallelism = node["parallelism"].as<unsigned>();
+ }
+ if (node["shares"]) {
+ sl.shares = node["shares"].as<unsigned>();
+ }
+ if (node["reqsize"]) {
+ sl.request_size = node["reqsize"].as<byte_size>().size;
+ }
+ if (node["think_time"]) {
+ sl.think_time = node["think_time"].as<duration_time>().time;
+ }
+ if (node["execution_time"]) {
+ sl.execution_time = node["execution_time"].as<duration_time>().time;
+ }
+ return true;
+ }
+};
+
+template<>
+struct convert<job_config> {
+ static bool decode(const Node& node, job_config& cl) {
+ cl.name = node["name"].as<std::string>();
+ cl.type = node["type"].as<request_type>();
+ cl.shard_placement = node["shards"].as<shard_config>();
+ if (node["shard_info"]) {
+ cl.shard_info = node["shard_info"].as<shard_info>();
+ }
+ return true;
+ }
+};
+}
+
+/// Each shard has one context, and the context is responsible for creating the classes that should
+/// run in this shard.
+class context {
+ std::vector<std::unique_ptr<class_data>> _cl;
+
+ sstring _dir;
+ std::chrono::seconds _duration;
+
+ semaphore _finished;
+public:
+ context(sstring dir, std::vector<job_config> req_config, unsigned duration)
+ : _cl(boost::copy_range<std::vector<std::unique_ptr<class_data>>>(req_config
+ | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(engine().cpu_id()); })
+ | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); })
+ ))
+ , _dir(dir)
+ , _duration(duration)
+ , _finished(0)
+ {}
+
+ future<> stop() { return make_ready_future<>(); }
+
+ future<> start() {
+ return parallel_for_each(_cl, [this] (std::unique_ptr<class_data>& cl) {
+ return cl->start(_dir);
+ });
+ }
+
+ future<> issue_requests() {
+ return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr<class_data>& cl) {
+ return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] {
+ _finished.signal(1);
+ });
+ });
+ }
+
+ future<> print_stats() {
+ return _finished.wait(_cl.size()).then([this] {
+ fmt::print("Shard {:>2}\n", engine().cpu_id());
+ auto idx = 0;
+ for (auto& cl: _cl) {
+ fmt::print("Class {:>2} ({})\n", idx++, cl->describe_class());
+ fmt::print("{}\n", cl->describe_results());
+ }
+ return make_ready_future<>();
+ });
+ }
+};
+
+int class_data::idgen() {
+ static thread_local int id = 0;
+ return id++;
+}
+
+int main(int ac, char** av) {
+ namespace bpo = boost::program_options;
+
+ app_template app;
+ auto opt_add = app.add_options();
+ opt_add
+ ("directory", bpo::value<sstring>()->default_value("."), "directory where to execute the test")
+ ("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test")
+ ("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification")
+ ;
+
+ distributed<context> ctx;
+ return app.run(ac, av, [&] {
+ return seastar::async([&] {
+ auto& opts = app.configuration();
+ auto& directory = opts["directory"].as<sstring>();
+
+ auto fs = file_system_at(directory).get0();
+ if (fs != fs_type::xfs) {
+ throw std::runtime_error(format("This is a performance test. {} is not on XFS", directory));
+ }
+
+ auto& duration = opts["duration"].as<unsigned>();
+ auto& yaml = opts["conf"].as<sstring>();
+ YAML::Node doc = YAML::LoadFile(yaml);
+ auto reqs = doc.as<std::vector<job_config>>();
+
+ parallel_for_each(reqs, [] (auto& r) {
+ return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) {
+ r.shard_info.scheduling_group = sg;
+ });
+ }).get();
+
+ ctx.start(directory, reqs, duration).get0();
+ engine().at_exit([&ctx] {
+ return ctx.stop();
+ });
+ std::cout << "Creating initial files..." << std::endl;
+ ctx.invoke_on_all([] (auto& c) {
+ return c.start();
+ }).get();
+ std::cout << "Starting evaluation..." << std::endl;
+ ctx.invoke_on_all([] (auto& c) {
+ return c.issue_requests();
+ }).get();
+ for (unsigned i = 0; i < smp::count; ++i) {
+ ctx.invoke_on(i, [] (auto& c) {
+ return c.print_stats();
+ }).get();
+ }
+ }).or_terminate();
+ });
+}
diff --git a/src/seastar/apps/iotune/CMakeLists.txt b/src/seastar/apps/iotune/CMakeLists.txt
new file mode 100644
index 00000000..18788c14
--- /dev/null
+++ b/src/seastar/apps/iotune/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+seastar_add_app (iotune
+ SOURCES iotune.cc)
+
+target_link_libraries (app_iotune
+ PRIVATE
+ StdFilesystem::filesystem
+ yaml-cpp::yaml-cpp)
diff --git a/src/seastar/apps/iotune/iotune.cc b/src/seastar/apps/iotune/iotune.cc
new file mode 100644
index 00000000..32886073
--- /dev/null
+++ b/src/seastar/apps/iotune/iotune.cc
@@ -0,0 +1,694 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2018 ScyllaDB
+ *
+ * The goal of this program is to allow a user to properly configure the Seastar I/O
+ * scheduler.
+ */
+#include <iostream>
+#include <chrono>
+#include <random>
+#include <memory>
+#include <vector>
+#include <cmath>
+#include <sys/vfs.h>
+#include <sys/sysmacros.h>
+#include <boost/filesystem.hpp>
+#include <boost/range/irange.hpp>
+#include <boost/program_options.hpp>
+#include <boost/iterator/counting_iterator.hpp>
+#include <fstream>
+#include <wordexp.h>
+#include <yaml-cpp/yaml.h>
+#include <seastar/core/thread.hh>
+#include <seastar/core/sstring.hh>
+#include <seastar/core/posix.hh>
+#include <seastar/core/resource.hh>
+#include <seastar/core/aligned_buffer.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/fsqual.hh>
+#include <seastar/util/defer.hh>
+#include <seastar/util/log.hh>
+#include <seastar/util/std-compat.hh>
+#include <seastar/util/read_first_line.hh>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+namespace fs = std::experimental::filesystem;
+
+logger iotune_logger("iotune");
+
+using iotune_clock = std::chrono::steady_clock;
+static thread_local std::default_random_engine random_generator(std::chrono::duration_cast<std::chrono::nanoseconds>(iotune_clock::now().time_since_epoch()).count());
+
+template <typename Type>
+Type read_sys_file_as(fs::path sys_file) {
+ return boost::lexical_cast<Type>(read_first_line(sys_file));
+}
+
+void check_device_properties(fs::path dev_sys_file) {
+ auto sched_file = dev_sys_file / "queue" / "scheduler";
+ auto sched_string = read_first_line(sched_file);
+ auto beg = sched_string.find('[');
+ size_t len = sched_string.size();
+ if (beg == sstring::npos) {
+ beg = 0;
+ } else {
+ auto end = sched_string.find(']');
+ if (end != sstring::npos) {
+ len = end - beg - 1;
+ }
+ beg++;
+ }
+ auto scheduler = sched_string.substr(beg, len);
+ if ((scheduler != "noop") && (scheduler != "none")) {
+ iotune_logger.warn("Scheduler for {} set to {}. It is recommend to set it to noop before evaluation so as not to skew the results.",
+ sched_file.string(), scheduler);
+ }
+
+ auto nomerges_file = dev_sys_file / "queue" / "nomerges";
+ auto nomerges = read_sys_file_as<unsigned>(nomerges_file);
+ if (nomerges != 2u) {
+ iotune_logger.warn("nomerges for {} set to {}. It is recommend to set it to 2 before evaluation so that merges are disabled. Results can be skewed otherwise.",
+ nomerges_file.string(), nomerges);
+ }
+}
+
+struct evaluation_directory {
+ sstring _name;
+ // We know that if we issue more than this, they will be blocked on linux anyway.
+ unsigned _max_iodepth = 0;
+ uint64_t _available_space;
+ uint64_t _min_data_transfer_size = 512;
+ unsigned _disks_per_array = 0;
+
+ void scan_device(unsigned dev_maj, unsigned dev_min) {
+ scan_device(fmt::format("{}:{}", dev_maj, dev_min));
+ }
+
+ void scan_device(std::string dev_str) {
+ scan_device(fs::path("/sys/dev/block") / dev_str);
+ }
+
+ void scan_device(fs::path sys_file) {
+ try {
+ sys_file = fs::canonical(sys_file);
+ bool is_leaf = true;
+ if (fs::exists(sys_file / "slaves")) {
+ for (auto& dev : fs::directory_iterator(sys_file / "slaves")) {
+ is_leaf = false;
+ scan_device(read_first_line(dev / "dev"));
+ }
+ }
+
+ // our work is done if not leaf. We'll tune the leaves
+ if (!is_leaf) {
+ return;
+ }
+
+ if (fs::exists(sys_file / "partition")) {
+ scan_device(sys_file.remove_filename());
+ } else {
+ check_device_properties(sys_file);
+ auto queue_dir = sys_file / "queue";
+ auto disk_min_io_size = read_sys_file_as<uint64_t>(queue_dir / "minimum_io_size");
+
+ _min_data_transfer_size = std::max(_min_data_transfer_size, disk_min_io_size);
+ _max_iodepth += read_sys_file_as<uint64_t>(queue_dir / "nr_requests");
+ _disks_per_array++;
+ }
+ } catch (std::system_error& se) {
+ iotune_logger.error("Error while parsing sysfs. Will continue with guessed values: {}", se.what());
+ _max_iodepth = 128;
+ }
+ _disks_per_array = std::max(_disks_per_array, 1u);
+ }
+public:
+ evaluation_directory(sstring name)
+ : _name(name)
+ , _available_space(fs::space(fs::path(_name)).available)
+ {}
+
+ unsigned max_iodepth() const {
+ return _max_iodepth;
+ }
+
+ fs::path path() const {
+ return fs::path(_name);
+ }
+
+ const sstring& name() const {
+ return _name;
+ }
+
+ unsigned disks_per_array() const {
+ return _disks_per_array;
+ }
+
+ uint64_t minimum_io_size() const {
+ return _min_data_transfer_size;
+ }
+
+ future<> discover_directory() {
+ return seastar::async([this] {
+ auto f = open_directory(_name).get0();
+ auto st = f.stat().get0();
+ f.close().get();
+
+ scan_device(major(st.st_dev), minor(st.st_dev));
+ });
+ }
+
+ uint64_t available_space() const {
+ return _available_space;
+ }
+};
+
+struct io_rates {
+ float bytes_per_sec = 0;
+ float iops = 0;
+ io_rates operator+(const io_rates& a) const {
+ return io_rates{bytes_per_sec + a.bytes_per_sec, iops + a.iops};
+ }
+
+ io_rates& operator+=(const io_rates& a) {
+ bytes_per_sec += a.bytes_per_sec;
+ iops += a.iops;
+ return *this;
+ }
+};
+
+class invalid_position : public std::exception {
+public:
+ virtual const char* what() const noexcept {
+ return "file access position invalid";
+ }
+};
+
+struct position_generator {
+ virtual uint64_t get_pos() = 0;
+ virtual bool is_sequential() const = 0;
+ virtual ~position_generator() {}
+};
+
+class sequential_issuer : public position_generator {
+ size_t _buffer_size;
+ uint64_t _position = 0;
+ uint64_t _size_limit;
+public:
+ sequential_issuer(size_t buffer_size, uint64_t size_limit)
+ : _buffer_size(buffer_size)
+ , _size_limit(size_limit)
+ {}
+
+ virtual bool is_sequential() const {
+ return true;
+ }
+
+ virtual uint64_t get_pos() {
+ if (_position >= _size_limit) {
+ throw invalid_position();
+ }
+ auto pos = _position;
+ _position += _buffer_size;
+ return pos;
+ }
+};
+
+class random_issuer : public position_generator {
+ size_t _buffer_size;
+ uint64_t _last_position;
+ std::uniform_int_distribution<uint64_t> _pos_distribution;
+public:
+ random_issuer(size_t buffer_size, uint64_t last_position)
+ : _buffer_size(buffer_size)
+ , _last_position(last_position)
+ , _pos_distribution(0, (last_position / buffer_size) - 1)
+ {}
+
+ virtual bool is_sequential() const {
+ return false;
+ }
+
+ virtual uint64_t get_pos() {
+ uint64_t pos = _pos_distribution(random_generator) * _buffer_size;
+ if (pos >= _last_position) {
+ throw invalid_position();
+ }
+ return pos;
+ }
+};
+
+class request_issuer {
+public:
+ virtual future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) = 0;
+ virtual ~request_issuer() {}
+};
+
+
+class write_request_issuer : public request_issuer {
+ file _file;
+public:
+ explicit write_request_issuer(file f) : _file(f) {}
+ future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) override {
+ return _file.dma_write(pos, buf, size);
+ }
+};
+
+class read_request_issuer : public request_issuer {
+ file _file;
+public:
+ explicit read_request_issuer(file f) : _file(f) {}
+ future<size_t> issue_request(uint64_t pos, char* buf, uint64_t size) override {
+ return _file.dma_read(pos, buf, size);
+ }
+};
+
+class io_worker {
+ uint64_t _bytes = 0;
+ unsigned _requests = 0;
+ size_t _buffer_size;
+ std::chrono::duration<double> _duration;
+ std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _start_measuring;
+ std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_measuring;
+ std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_load;
+ // track separately because in the sequential case we may exhaust the file before _duration
+ std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _last_time_seen;
+
+ std::unique_ptr<position_generator> _pos_impl;
+ std::unique_ptr<request_issuer> _req_impl;
+public:
+ bool is_sequential() const {
+ return _pos_impl->is_sequential();
+ }
+
+ bool should_stop() const {
+ return iotune_clock::now() >= _end_load;
+ }
+
+ io_worker(size_t buffer_size, std::chrono::duration<double> duration, std::unique_ptr<request_issuer> reqs, std::unique_ptr<position_generator> pos)
+ : _buffer_size(buffer_size)
+ , _duration(duration)
+ , _start_measuring(iotune_clock::now() + std::chrono::duration<double>(10ms))
+ , _end_measuring(_start_measuring + duration)
+ , _end_load(_end_measuring + 10ms)
+ , _last_time_seen(_start_measuring)
+ , _pos_impl(std::move(pos))
+ , _req_impl(std::move(reqs))
+ {}
+
+ std::unique_ptr<char[], free_deleter> get_buffer() {
+ return allocate_aligned_buffer<char>(_buffer_size, _buffer_size);
+ }
+
+ future<> issue_request(char* buf) {
+ return _req_impl->issue_request(_pos_impl->get_pos(), buf, _buffer_size).then([this] (size_t size) {
+ auto now = iotune_clock::now();
+ if ((now > _start_measuring) && (now < _end_measuring)) {
+ _last_time_seen = now;
+ _bytes += size;
+ _requests++;
+ }
+ });
+ }
+
+ uint64_t bytes() const {
+ return _bytes;
+ }
+
+ io_rates get_io_rates() const {
+ io_rates rates;
+ auto t = _last_time_seen - _start_measuring;
+ if (!t.count()) {
+ throw std::runtime_error("No data collected");
+ }
+ rates.bytes_per_sec = _bytes / t.count();
+ rates.iops = _requests / t.count();
+ return rates;
+ }
+};
+
+class test_file {
+public:
+ enum class pattern { sequential, random };
+private:
+ fs::path _dirpath;
+ uint64_t _file_size;
+ file _file;
+
+ std::unique_ptr<position_generator> get_position_generator(size_t buffer_size, pattern access_pattern) {
+ if (access_pattern == pattern::sequential) {
+ return std::make_unique<sequential_issuer>(buffer_size, _file_size);
+ } else {
+ return std::make_unique<random_issuer>(buffer_size, _file_size);
+ }
+ }
+public:
+ test_file(const ::evaluation_directory& dir, uint64_t maximum_size)
+ : _dirpath(dir.path() / fs::path(fmt::format("ioqueue-discovery-{}", engine().cpu_id())))
+ , _file_size(maximum_size)
+ {}
+
+ future<> create_data_file() {
+ // XFS likes access in many directories better.
+ return make_directory(_dirpath.string()).then([this] {
+ auto testfile = _dirpath / fs::path("testfile");
+ file_open_options options;
+ options.extent_allocation_size_hint = _file_size;
+ return open_file_dma(testfile.string(), open_flags::rw | open_flags::create, std::move(options)).then([this, testfile] (file file) {
+ _file = file;
+ return remove_file(testfile.string()).then([this] {
+ return remove_file(_dirpath.string());
+ });
+ }).then([this] {
+ return _file.truncate(_file_size);
+ });
+ });
+ }
+
+ future<io_rates> do_workload(std::unique_ptr<io_worker> worker_ptr, unsigned max_os_concurrency, bool update_file_size = false) {
+ if (update_file_size) {
+ _file_size = 0;
+ }
+
+ auto worker = worker_ptr.get();
+ auto concurrency = boost::irange<unsigned, unsigned>(0, max_os_concurrency, 1);
+ return parallel_for_each(std::move(concurrency), [this, worker] (unsigned idx) {
+ auto bufptr = worker->get_buffer();
+ auto buf = bufptr.get();
+ return do_until([worker] { return worker->should_stop(); }, [this, buf, worker, idx] {
+ return worker->issue_request(buf);
+ }).finally([this, alive = std::move(bufptr)] {});
+ }).then_wrapped([this, worker = std::move(worker_ptr), update_file_size] (future<> f) {
+ try {
+ f.get();
+ } catch (invalid_position& ip) {
+ // expected if sequential. Example: reading and the file ended.
+ if (!worker->is_sequential()) {
+ throw;
+ }
+ }
+
+ if (update_file_size) {
+ _file_size = worker->bytes();
+ }
+ return make_ready_future<io_rates>(worker->get_io_rates());
+ });
+ }
+
+ future<io_rates> read_workload(size_t buffer_size, pattern access_pattern, unsigned max_os_concurrency, std::chrono::duration<double> duration) {
+ buffer_size = std::max(buffer_size, _file.disk_read_dma_alignment());
+ auto worker = std::make_unique<io_worker>(buffer_size, duration, std::make_unique<read_request_issuer>(_file), get_position_generator(buffer_size, access_pattern));
+ return do_workload(std::move(worker), max_os_concurrency);
+ }
+
+ future<io_rates> write_workload(size_t buffer_size, pattern access_pattern, unsigned max_os_concurrency, std::chrono::duration<double> duration) {
+ buffer_size = std::max(buffer_size, _file.disk_write_dma_alignment());
+ auto worker = std::make_unique<io_worker>(buffer_size, duration, std::make_unique<write_request_issuer>(_file), get_position_generator(buffer_size, access_pattern));
+ bool update_file_size = worker->is_sequential();
+ return do_workload(std::move(worker), max_os_concurrency, update_file_size).then([this] (io_rates r) {
+ return _file.flush().then([r = std::move(r)] () mutable {
+ return make_ready_future<io_rates>(std::move(r));
+ });
+ });
+ }
+
+ future<> stop() {
+ return make_ready_future<>();
+ }
+};
+
+class iotune_multi_shard_context {
+ ::evaluation_directory _test_directory;
+
+ unsigned per_shard_io_depth() const {
+ auto iodepth = _test_directory.max_iodepth() / smp::count;
+ if (engine().cpu_id() < _test_directory.max_iodepth() % smp::count) {
+ iodepth++;
+ }
+ return std::min(iodepth, 128u);
+ }
+ seastar::sharded<test_file> _iotune_test_file;
+public:
+ future<> stop() {
+ return _iotune_test_file.stop();
+ }
+
+ future<> start() {
+ return _iotune_test_file.start(_test_directory, _test_directory.available_space() / (2 * smp::count));
+ }
+
+ future<> create_data_file() {
+ return _iotune_test_file.invoke_on_all([this] (test_file& tf) {
+ return tf.create_data_file();
+ });
+ }
+
+ future<io_rates> write_sequential_data(unsigned shard, size_t buffer_size, std::chrono::duration<double> duration) {
+ return _iotune_test_file.invoke_on(shard, [this, buffer_size, duration] (test_file& tf) {
+ return tf.write_workload(buffer_size, test_file::pattern::sequential, 4 * _test_directory.disks_per_array(), duration);
+ });
+ }
+
+ future<io_rates> read_sequential_data(unsigned shard, size_t buffer_size, std::chrono::duration<double> duration) {
+ return _iotune_test_file.invoke_on(shard, [this, buffer_size, duration] (test_file& tf) {
+ return tf.read_workload(buffer_size, test_file::pattern::sequential, 4 * _test_directory.disks_per_array(), duration);
+ });
+ }
+
+ future<io_rates> write_random_data(size_t buffer_size, std::chrono::duration<double> duration) {
+ return _iotune_test_file.map_reduce0([buffer_size, this, duration] (test_file& tf) {
+ return tf.write_workload(buffer_size, test_file::pattern::random, per_shard_io_depth(), duration);
+ }, io_rates(), std::plus<io_rates>());
+ }
+
+ future<io_rates> read_random_data(size_t buffer_size, std::chrono::duration<double> duration) {
+ return _iotune_test_file.map_reduce0([buffer_size, this, duration] (test_file& tf) {
+ return tf.read_workload(buffer_size, test_file::pattern::random, per_shard_io_depth(), duration);
+ }, io_rates(), std::plus<io_rates>());
+ }
+
+ iotune_multi_shard_context(::evaluation_directory dir)
+ : _test_directory(dir)
+ {}
+};
+
+struct disk_descriptor {
+ std::string mountpoint;
+ uint64_t read_iops;
+ uint64_t read_bw;
+ uint64_t write_iops;
+ uint64_t write_bw;
+};
+
+void string_to_file(sstring conf_file, sstring buf) {
+ auto f = file_desc::open(conf_file, O_WRONLY | O_CLOEXEC | O_CREAT | O_TRUNC, 0664);
+ auto ret = f.write(buf.data(), buf.size());
+ if (!ret || (*ret != buf.size())) {
+ throw std::runtime_error(fmt::format("Can't write {}: {}", conf_file, *ret));
+ }
+}
+
+void write_configuration_file(sstring conf_file, std::string format, sstring properties_file) {
+ sstring buf;
+ if (format == "seastar") {
+ buf = fmt::format("io-properties-file={}\n", properties_file);
+ } else {
+ buf = fmt::format("SEASTAR_IO=\"--io-properties-file={}\"\n", properties_file);
+ }
+ string_to_file(conf_file, buf);
+}
+
+void write_property_file(sstring conf_file, struct std::vector<disk_descriptor> disk_descriptors) {
+ YAML::Emitter out;
+ out << YAML::BeginMap;
+ out << YAML::Key << "disks";
+ out << YAML::BeginSeq;
+ for (auto& desc : disk_descriptors) {
+ out << YAML::BeginMap;
+ out << YAML::Key << "mountpoint" << YAML::Value << desc.mountpoint;
+ out << YAML::Key << "read_iops" << YAML::Value << desc.read_iops;
+ out << YAML::Key << "read_bandwidth" << YAML::Value << desc.read_bw;
+ out << YAML::Key << "write_iops" << YAML::Value << desc.write_iops;
+ out << YAML::Key << "write_bandwidth" << YAML::Value << desc.write_bw;
+ out << YAML::EndMap;
+ }
+ out << YAML::EndSeq;
+ out << YAML::EndMap;
+ out << YAML::Newline;
+
+ string_to_file(conf_file, sstring(out.c_str(), out.size()));
+}
+
+// Returns the mountpoint of a path. It works by walking backwards from the canonical path
+// (absolute, with symlinks resolved), until we find a point that crosses a device ID.
+fs::path mountpoint_of(sstring filename) {
+ fs::path mnt_candidate = fs::canonical(fs::path(filename));
+ compat::optional<dev_t> candidate_id = {};
+ auto current = mnt_candidate;
+ do {
+ auto f = open_directory(current.string()).get0();
+ auto st = f.stat().get0();
+ if ((candidate_id) && (*candidate_id != st.st_dev)) {
+ return mnt_candidate;
+ }
+ mnt_candidate = current;
+ candidate_id = st.st_dev;
+ current = current.parent_path();
+ } while (!current.empty());
+
+ return mnt_candidate;
+}
+
+int main(int ac, char** av) {
+ namespace bpo = boost::program_options;
+ bool fs_check = false;
+
+ app_template::config app_cfg;
+ app_cfg.name = "IOTune";
+
+ app_template app(std::move(app_cfg));
+ auto opt_add = app.add_options();
+ opt_add
+ ("evaluation-directory", bpo::value<std::vector<sstring>>()->required(), "directory where to execute the evaluation")
+ ("properties-file", bpo::value<sstring>(), "path in which to write the YAML file")
+ ("options-file", bpo::value<sstring>(), "path in which to write the legacy conf file")
+ ("duration", bpo::value<unsigned>()->default_value(120), "time, in seconds, for which to run the test")
+ ("format", bpo::value<sstring>()->default_value("seastar"), "Configuration file format (seastar | envfile)")
+ ("fs-check", bpo::bool_switch(&fs_check), "perform FS check only")
+ ;
+
+ return app.run(ac, av, [&] {
+ return seastar::async([&] {
+ auto& configuration = app.configuration();
+ auto eval_dirs = configuration["evaluation-directory"].as<std::vector<sstring>>();
+ auto format = configuration["format"].as<sstring>();
+ auto duration = std::chrono::duration<double>(configuration["duration"].as<unsigned>() * 1s);
+
+ struct std::vector<disk_descriptor> disk_descriptors;
+ std::unordered_map<sstring, sstring> mountpoint_map;
+ // We want to evaluate once per mountpoint, but we still want to write in one of the
+ // directories that we were provided - we may not have permissions to write into the
+ // mountpoint itself. If we are passed more than one directory per mountpoint, we don't
+ // really care to which one we write, so this simple hash will do.
+ for (auto& eval_dir : eval_dirs) {
+ mountpoint_map[mountpoint_of(eval_dir).string()] = eval_dir;
+ }
+ for (auto eval: mountpoint_map) {
+ auto mountpoint = eval.first;
+ auto eval_dir = eval.second;
+
+ if (filesystem_has_good_aio_support(eval_dir, false) == false) {
+ iotune_logger.error("Exception when qualifying filesystem at {}", eval_dir);
+ return 1;
+ }
+
+ auto rec = 10000000000ULL;
+ auto avail = fs_avail(eval_dir).get0();
+ if (avail < rec) {
+ uint64_t val;
+ const char* units;
+ if (avail >= 1000000000) {
+ val = (avail + 500000000) / 1000000000;
+ units = "GB";
+ } else if (avail >= 1000000) {
+ val = (avail + 500000) / 1000000;
+ units = "MB";
+ } else {
+ val = avail;
+ units = "bytes";
+ }
+ iotune_logger.warn("Available space on filesystem at {}: {} {}: is less than recommended: {} GB",
+ eval_dir, val, units, rec / 1000000000ULL);
+ }
+
+ iotune_logger.info("{} passed sanity checks", eval_dir);
+ if (fs_check) {
+ return 0;
+ }
+
+ // Directory is the same object for all tests.
+ ::evaluation_directory test_directory(eval_dir);
+ test_directory.discover_directory().get();
+
+ ::iotune_multi_shard_context iotune_tests(test_directory);
+ iotune_tests.start().get();
+ iotune_tests.create_data_file().get();
+
+ auto stop = defer([&iotune_tests] {
+ iotune_tests.stop().get();
+ });
+
+ fmt::print("Starting Evaluation. This may take a while...\n");
+ fmt::print("Measuring sequential write bandwidth: ");
+ std::cout.flush();
+ io_rates write_bw;
+ size_t sequential_buffer_size = 1 << 20;
+ for (unsigned shard = 0; shard < smp::count; ++shard) {
+ write_bw += iotune_tests.write_sequential_data(shard, sequential_buffer_size, duration * 0.70 / smp::count).get0();
+ }
+ write_bw.bytes_per_sec /= smp::count;
+ fmt::print("{} MB/s\n", uint64_t(write_bw.bytes_per_sec / (1024 * 1024)));
+
+ fmt::print("Measuring sequential read bandwidth: ");
+ std::cout.flush();
+ auto read_bw = iotune_tests.read_sequential_data(0, sequential_buffer_size, duration * 0.1).get0();
+ fmt::print("{} MB/s\n", uint64_t(read_bw.bytes_per_sec / (1024 * 1024)));
+
+ fmt::print("Measuring random write IOPS: ");
+ std::cout.flush();
+ auto write_iops = iotune_tests.write_random_data(test_directory.minimum_io_size(), duration * 0.1).get0();
+ fmt::print("{} IOPS\n", uint64_t(write_iops.iops));
+
+ fmt::print("Measuring random read IOPS: ");
+ std::cout.flush();
+ auto read_iops = iotune_tests.read_random_data(test_directory.minimum_io_size(), duration * 0.1).get0();
+ fmt::print("{} IOPS\n", uint64_t(read_iops.iops));
+
+ struct disk_descriptor desc;
+ desc.mountpoint = mountpoint;
+ desc.read_iops = read_iops.iops;
+ desc.read_bw = read_bw.bytes_per_sec;
+ desc.write_iops = write_iops.iops;
+ desc.write_bw = write_bw.bytes_per_sec;
+ disk_descriptors.push_back(std::move(desc));
+ }
+
+ auto file = "properties file";
+ try {
+ if (configuration.count("properties-file")) {
+ fmt::print("Writing result to {}\n", configuration["properties-file"].as<sstring>());
+ write_property_file(configuration["properties-file"].as<sstring>(), disk_descriptors);
+ }
+
+ file = "configuration file";
+ if (configuration.count("options-file")) {
+ fmt::print("Writing result to {}\n", configuration["options-file"].as<sstring>());
+ write_configuration_file(configuration["options-file"].as<sstring>(), format, configuration["properties-file"].as<sstring>());
+ }
+ } catch (...) {
+ iotune_logger.error("Exception when writing {}: {}.\nPlease add the above values manually to your seastar command line.", file, std::current_exception());
+ return 1;
+ }
+ return 0;
+ });
+ });
+}
diff --git a/src/seastar/apps/memcached/CMakeLists.txt b/src/seastar/apps/memcached/CMakeLists.txt
new file mode 100644
index 00000000..ec213fd0
--- /dev/null
+++ b/src/seastar/apps/memcached/CMakeLists.txt
@@ -0,0 +1,49 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+set (Seastar_APP_MEMCACHED_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR})
+set (Seastar_APP_MEMCACHED_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR})
+
+seastar_generate_ragel (
+ TARGET app_memcached_ascii
+ VAR app_memcached_ascii_file
+ IN_FILE ${CMAKE_CURRENT_SOURCE_DIR}/ascii.rl
+ OUT_FILE ${CMAKE_CURRENT_BINARY_DIR}/ascii.hh)
+
+seastar_add_app (memcached
+ SOURCES
+ ${app_memcached_ascii_file}
+ memcache.cc
+ memcached.hh)
+
+target_include_directories (app_memcached
+ PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
+
+add_dependencies (app_memcached app_memcached_ascii)
+
+#
+# Tests.
+#
+
+if (Seastar_TESTING)
+ add_subdirectory (tests)
+endif ()
diff --git a/src/seastar/apps/memcached/ascii.rl b/src/seastar/apps/memcached/ascii.rl
new file mode 100644
index 00000000..04d161ff
--- /dev/null
+++ b/src/seastar/apps/memcached/ascii.rl
@@ -0,0 +1,154 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/ragel.hh>
+#include "memcached.hh"
+#include <memory>
+#include <algorithm>
+#include <functional>
+
+using namespace seastar;
+
+%%{
+
+machine memcache_ascii_protocol;
+
+access _fsm_;
+
+action mark {
+ g.mark_start(p);
+}
+
+action start_blob {
+ g.mark_start(p);
+ _size_left = _size;
+}
+
+action advance_blob {
+ auto len = std::min((uint32_t)(pe - p), _size_left);
+ _size_left -= len;
+ p += len;
+ if (_size_left == 0) {
+ _blob = str();
+ p--;
+ fret;
+ }
+ p--;
+}
+
+crlf = '\r\n';
+sp = ' ';
+u32 = digit+ >{ _u32 = 0; } ${ _u32 *= 10; _u32 += fc - '0'; };
+u64 = digit+ >{ _u64 = 0; } ${ _u64 *= 10; _u64 += fc - '0'; };
+key = [^ ]+ >mark %{ _key = memcache::item_key(str()); };
+flags = digit+ >mark %{ _flags_str = str(); };
+expiration = u32 %{ _expiration = _u32; };
+size = u32 >mark %{ _size = _u32; _size_str = str(); };
+blob := any+ >start_blob $advance_blob;
+maybe_noreply = (sp "noreply" @{ _noreply = true; })? >{ _noreply = false; };
+maybe_expiration = (sp expiration)? >{ _expiration = 0; };
+version_field = u64 %{ _version = _u64; };
+
+insertion_params = sp key sp flags sp expiration sp size maybe_noreply (crlf @{ fcall blob; } ) crlf;
+set = "set" insertion_params @{ _state = state::cmd_set; };
+add = "add" insertion_params @{ _state = state::cmd_add; };
+replace = "replace" insertion_params @{ _state = state::cmd_replace; };
+cas = "cas" sp key sp flags sp expiration sp size sp version_field maybe_noreply (crlf @{ fcall blob; } ) crlf @{ _state = state::cmd_cas; };
+get = "get" (sp key %{ _keys.emplace_back(std::move(_key)); })+ crlf @{ _state = state::cmd_get; };
+gets = "gets" (sp key %{ _keys.emplace_back(std::move(_key)); })+ crlf @{ _state = state::cmd_gets; };
+delete = "delete" sp key maybe_noreply crlf @{ _state = state::cmd_delete; };
+flush = "flush_all" maybe_expiration maybe_noreply crlf @{ _state = state::cmd_flush_all; };
+version = "version" crlf @{ _state = state::cmd_version; };
+stats = "stats" crlf @{ _state = state::cmd_stats; };
+stats_hash = "stats hash" crlf @{ _state = state::cmd_stats_hash; };
+incr = "incr" sp key sp u64 maybe_noreply crlf @{ _state = state::cmd_incr; };
+decr = "decr" sp key sp u64 maybe_noreply crlf @{ _state = state::cmd_decr; };
+main := (add | replace | set | get | gets | delete | flush | version | cas | stats | incr | decr
+ | stats_hash) >eof{ _state = state::eof; };
+
+prepush {
+ prepush();
+}
+
+postpop {
+ postpop();
+}
+
+}%%
+
+class memcache_ascii_parser : public ragel_parser_base<memcache_ascii_parser> {
+ %% write data nofinal noprefix;
+public:
+ enum class state {
+ error,
+ eof,
+ cmd_set,
+ cmd_cas,
+ cmd_add,
+ cmd_replace,
+ cmd_get,
+ cmd_gets,
+ cmd_delete,
+ cmd_flush_all,
+ cmd_version,
+ cmd_stats,
+ cmd_stats_hash,
+ cmd_incr,
+ cmd_decr,
+ };
+ state _state;
+ uint32_t _u32;
+ uint64_t _u64;
+ memcache::item_key _key;
+ sstring _flags_str;
+ uint32_t _expiration;
+ uint32_t _size;
+ sstring _size_str;
+ uint32_t _size_left;
+ uint64_t _version;
+ sstring _blob;
+ bool _noreply;
+ std::vector<memcache::item_key> _keys;
+public:
+ void init() {
+ init_base();
+ _state = state::error;
+ _keys.clear();
+ %% write init;
+ }
+
+ char* parse(char* p, char* pe, char* eof) {
+ sstring_builder::guard g(_builder, p, pe);
+ auto str = [this, &g, &p] { g.mark_end(p); return get_str(); };
+ %% write exec;
+ if (_state != state::error) {
+ return p;
+ }
+ if (p != pe) {
+ p = pe;
+ return p;
+ }
+ return nullptr;
+ }
+ bool eof() const {
+ return _state == state::eof;
+ }
+};
diff --git a/src/seastar/apps/memcached/memcache.cc b/src/seastar/apps/memcached/memcache.cc
new file mode 100644
index 00000000..4cce0e92
--- /dev/null
+++ b/src/seastar/apps/memcached/memcache.cc
@@ -0,0 +1,1464 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2014-2015 Cloudius Systems
+ */
+
+#include <boost/intrusive/unordered_set.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/optional.hpp>
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/timer-set.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/stream.hh>
+#include <seastar/core/memory.hh>
+#include <seastar/core/units.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/vector-data-sink.hh>
+#include <seastar/core/bitops.hh>
+#include <seastar/core/slab.hh>
+#include <seastar/core/align.hh>
+#include <seastar/net/api.hh>
+#include <seastar/net/packet-data-source.hh>
+#include "ascii.hh"
+#include "memcached.hh"
+#include <unistd.h>
+
+#define PLATFORM "seastar"
+#define VERSION "v1.0"
+#define VERSION_STRING PLATFORM " " VERSION
+
+using namespace seastar;
+using namespace net;
+
+namespace memcache {
+
+namespace bi = boost::intrusive;
+
+static constexpr double default_slab_growth_factor = 1.25;
+static constexpr uint64_t default_slab_page_size = 1UL*MB;
+static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled.
+static __thread slab_allocator<item>* slab;
+static thread_local std::unique_ptr<slab_allocator<item>> slab_holder;
+
+template<typename T>
+using optional = boost::optional<T>;
+
+using clock_type = lowres_clock;
+
+//
+// "Expiration" is a uint32_t value.
+// The minimal value of _time is when "expiration" is set to (seconds_in_a_month
+// + 1).
+// In this case _time will have a value of
+//
+// (seconds_in_a_month + 1 - Wall_Clock_Time_Since_Epoch)
+//
+// because lowres_clock now() initialized to zero when the application starts.
+//
+// We will use a timepoint at LLONG_MIN to represent a "never expire" value
+// since it will not collide with the minimum _time value mentioned above for
+// about 290 thousand years to come.
+//
+static constexpr clock_type::time_point never_expire_timepoint = clock_type::time_point(clock_type::duration::min());
+
+struct expiration {
+ using time_point = clock_type::time_point;
+ using duration = time_point::duration;
+
+ static constexpr uint32_t seconds_in_a_month = 60U * 60 * 24 * 30;
+ time_point _time = never_expire_timepoint;
+
+ expiration() {}
+
+ expiration(clock_type::duration wc_to_clock_type_delta, uint32_t s) {
+ using namespace std::chrono;
+
+ static_assert(sizeof(clock_type::duration::rep) >= 8, "clock_type::duration::rep must be at least 8 bytes wide");
+
+ if (s == 0U) {
+ return; // means never expire.
+ } else if (s <= seconds_in_a_month) {
+ _time = clock_type::now() + seconds(s); // from delta
+ } else {
+ //
+ // seastar::reactor supports only a monotonic clock at the moment
+ // therefore this may make the elements with the absolute expiration
+ // time expire at the wrong time if the wall clock has been updated
+ // during the expiration period. However the original memcached has
+ // the same weakness.
+ //
+ // TODO: Fix this when a support for system_clock-based timers is
+ // added to the seastar::reactor.
+ //
+ _time = time_point(seconds(s) + wc_to_clock_type_delta); // from real time
+ }
+ }
+
+ bool ever_expires() {
+ return _time != never_expire_timepoint;
+ }
+
+ time_point to_time_point() {
+ return _time;
+ }
+};
+
+class item : public slab_item_base {
+public:
+ using version_type = uint64_t;
+ using time_point = expiration::time_point;
+ using duration = expiration::duration;
+ static constexpr uint8_t field_alignment = alignof(void*);
+private:
+ using hook_type = bi::unordered_set_member_hook<>;
+ // TODO: align shared data to cache line boundary
+ version_type _version;
+ hook_type _cache_link;
+ bi::list_member_hook<> _timer_link;
+ size_t _key_hash;
+ expiration _expiry;
+ uint32_t _value_size;
+ uint32_t _slab_page_index;
+ uint16_t _ref_count;
+ uint8_t _key_size;
+ uint8_t _ascii_prefix_size;
+ char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value.
+ friend class cache;
+public:
+ item(uint32_t slab_page_index, item_key&& key, sstring&& ascii_prefix,
+ sstring&& value, expiration expiry, version_type version = 1)
+ : _version(version)
+ , _key_hash(key.hash())
+ , _expiry(expiry)
+ , _value_size(value.size())
+ , _slab_page_index(slab_page_index)
+ , _ref_count(0U)
+ , _key_size(key.key().size())
+ , _ascii_prefix_size(ascii_prefix.size())
+ {
+ assert(_key_size <= std::numeric_limits<uint8_t>::max());
+ assert(_ascii_prefix_size <= std::numeric_limits<uint8_t>::max());
+ // storing key
+ memcpy(_data, key.key().c_str(), _key_size);
+ // storing ascii_prefix
+ memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size);
+ // storing value
+ memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment),
+ value.c_str(), _value_size);
+ }
+
+ item(const item&) = delete;
+ item(item&&) = delete;
+
+ clock_type::time_point get_timeout() {
+ return _expiry.to_time_point();
+ }
+
+ version_type version() {
+ return _version;
+ }
+
+ const compat::string_view key() const {
+ return compat::string_view(_data, _key_size);
+ }
+
+ const compat::string_view ascii_prefix() const {
+ const char *p = _data + align_up(_key_size, field_alignment);
+ return compat::string_view(p, _ascii_prefix_size);
+ }
+
+ const compat::string_view value() const {
+ const char *p = _data + align_up(_key_size, field_alignment) +
+ align_up(_ascii_prefix_size, field_alignment);
+ return compat::string_view(p, _value_size);
+ }
+
+ size_t key_size() const {
+ return _key_size;
+ }
+
+ size_t ascii_prefix_size() const {
+ return _ascii_prefix_size;
+ }
+
+ size_t value_size() const {
+ return _value_size;
+ }
+
+ optional<uint64_t> data_as_integral() {
+ auto str = value().data();
+ if (str[0] == '-') {
+ return {};
+ }
+
+ auto len = _value_size;
+
+ // Strip trailing space
+ while (len && str[len - 1] == ' ') {
+ len--;
+ }
+
+ try {
+ return {boost::lexical_cast<uint64_t>(str, len)};
+ } catch (const boost::bad_lexical_cast& e) {
+ return {};
+ }
+ }
+
+ // needed by timer_set
+ bool cancel() {
+ return false;
+ }
+
+ // Methods required by slab allocator.
+ uint32_t get_slab_page_index() const {
+ return _slab_page_index;
+ }
+ bool is_unlocked() const {
+ return _ref_count == 1;
+ }
+
+ friend bool operator==(const item &a, const item &b) {
+ return (a._key_hash == b._key_hash) &&
+ (a._key_size == b._key_size) &&
+ (memcmp(a._data, b._data, a._key_size) == 0);
+ }
+
+ friend std::size_t hash_value(const item &i) {
+ return i._key_hash;
+ }
+
+ friend inline void intrusive_ptr_add_ref(item* it) {
+ assert(it->_ref_count >= 0);
+ ++it->_ref_count;
+ if (it->_ref_count == 2) {
+ slab->lock_item(it);
+ }
+ }
+
+ friend inline void intrusive_ptr_release(item* it) {
+ --it->_ref_count;
+ if (it->_ref_count == 1) {
+ slab->unlock_item(it);
+ } else if (it->_ref_count == 0) {
+ slab->free(it);
+ }
+ assert(it->_ref_count >= 0);
+ }
+
+ friend class item_key_cmp;
+};
+
+struct item_key_cmp
+{
+private:
+ bool compare(const item_key& key, const item& it) const {
+ return (it._key_hash == key.hash()) &&
+ (it._key_size == key.key().size()) &&
+ (memcmp(it._data, key.key().c_str(), it._key_size) == 0);
+ }
+public:
+ bool operator()(const item_key& key, const item& it) const {
+ return compare(key, it);
+ }
+
+ bool operator()(const item& it, const item_key& key) const {
+ return compare(key, it);
+ }
+};
+
+using item_ptr = foreign_ptr<boost::intrusive_ptr<item>>;
+
+struct cache_stats {
+ size_t _get_hits {};
+ size_t _get_misses {};
+ size_t _set_adds {};
+ size_t _set_replaces {};
+ size_t _cas_hits {};
+ size_t _cas_misses {};
+ size_t _cas_badval {};
+ size_t _delete_misses {};
+ size_t _delete_hits {};
+ size_t _incr_misses {};
+ size_t _incr_hits {};
+ size_t _decr_misses {};
+ size_t _decr_hits {};
+ size_t _expired {};
+ size_t _evicted {};
+ size_t _bytes {};
+ size_t _resize_failure {};
+ size_t _size {};
+ size_t _reclaims{};
+
+ void operator+=(const cache_stats& o) {
+ _get_hits += o._get_hits;
+ _get_misses += o._get_misses;
+ _set_adds += o._set_adds;
+ _set_replaces += o._set_replaces;
+ _cas_hits += o._cas_hits;
+ _cas_misses += o._cas_misses;
+ _cas_badval += o._cas_badval;
+ _delete_misses += o._delete_misses;
+ _delete_hits += o._delete_hits;
+ _incr_misses += o._incr_misses;
+ _incr_hits += o._incr_hits;
+ _decr_misses += o._decr_misses;
+ _decr_hits += o._decr_hits;
+ _expired += o._expired;
+ _evicted += o._evicted;
+ _bytes += o._bytes;
+ _resize_failure += o._resize_failure;
+ _size += o._size;
+ _reclaims += o._reclaims;
+ }
+};
+
+enum class cas_result {
+ not_found, stored, bad_version
+};
+
+struct remote_origin_tag {
+ template <typename T>
+ static inline
+ T move_if_local(T& ref) {
+ return ref;
+ }
+};
+
+struct local_origin_tag {
+ template <typename T>
+ static inline
+ T move_if_local(T& ref) {
+ return std::move(ref);
+ }
+};
+
+struct item_insertion_data {
+ item_key key;
+ sstring ascii_prefix;
+ sstring data;
+ expiration expiry;
+};
+
+class cache {
+private:
+ using cache_type = bi::unordered_set<item,
+ bi::member_hook<item, item::hook_type, &item::_cache_link>,
+ bi::power_2_buckets<true>,
+ bi::constant_time_size<true>>;
+ using cache_iterator = typename cache_type::iterator;
+ static constexpr size_t initial_bucket_count = 1 << 10;
+ static constexpr float load_factor = 0.75f;
+ size_t _resize_up_threshold = load_factor * initial_bucket_count;
+ std::vector<cache_type::bucket_type> _buckets;
+ cache_type _cache;
+ seastar::timer_set<item, &item::_timer_link> _alive;
+ timer<clock_type> _timer;
+ // delta in seconds between the current values of a wall clock and a clock_type clock
+ clock_type::duration _wc_to_clock_type_delta;
+ cache_stats _stats;
+ timer<clock_type> _flush_timer;
+private:
+ size_t item_size(item& item_ref) {
+ constexpr size_t field_alignment = alignof(void*);
+ return sizeof(item) +
+ align_up(item_ref.key_size(), field_alignment) +
+ align_up(item_ref.ascii_prefix_size(), field_alignment) +
+ item_ref.value_size();
+ }
+
+ size_t item_size(item_insertion_data& insertion) {
+ constexpr size_t field_alignment = alignof(void*);
+ auto size = sizeof(item) +
+ align_up(insertion.key.key().size(), field_alignment) +
+ align_up(insertion.ascii_prefix.size(), field_alignment) +
+ insertion.data.size();
+#ifdef __DEBUG__
+ static bool print_item_footprint = true;
+ if (print_item_footprint) {
+ print_item_footprint = false;
+ std::cout << __FUNCTION__ << ": " << size << "\n";
+ std::cout << "sizeof(item) " << sizeof(item) << "\n";
+ std::cout << "key.size " << insertion.key.key().size() << "\n";
+ std::cout << "value.size " << insertion.data.size() << "\n";
+ std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n";
+ }
+#endif
+ return size;
+ }
+
+ template <bool IsInCache = true, bool IsInTimerList = true, bool Release = true>
+ void erase(item& item_ref) {
+ if (IsInCache) {
+ _cache.erase(_cache.iterator_to(item_ref));
+ }
+ if (IsInTimerList) {
+ if (item_ref._expiry.ever_expires()) {
+ _alive.remove(item_ref);
+ }
+ }
+ _stats._bytes -= item_size(item_ref);
+ if (Release) {
+ // memory used by item shouldn't be freed when slab is replacing it with another item.
+ intrusive_ptr_release(&item_ref);
+ }
+ }
+
+ void expire() {
+ using namespace std::chrono;
+
+ //
+ // Adjust the delta on every timer event to minimize an error caused
+ // by a wall clock adjustment.
+ //
+ _wc_to_clock_type_delta =
+ duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
+
+ auto exp = _alive.expire(clock_type::now());
+ while (!exp.empty()) {
+ auto item = &*exp.begin();
+ exp.pop_front();
+ erase<true, false>(*item);
+ _stats._expired++;
+ }
+ _timer.arm(_alive.get_next_timeout());
+ }
+
+ inline
+ cache_iterator find(const item_key& key) {
+ return _cache.find(key, std::hash<item_key>(), item_key_cmp());
+ }
+
+ template <typename Origin>
+ inline
+ cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) {
+ auto& old_item = *i;
+ uint64_t old_item_version = old_item._version;
+
+ erase(old_item);
+
+ size_t size = item_size(insertion);
+ auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
+ Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1);
+ intrusive_ptr_add_ref(new_item);
+
+ auto insert_result = _cache.insert(*new_item);
+ assert(insert_result.second);
+ if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) {
+ _timer.rearm(new_item->get_timeout());
+ }
+ _stats._bytes += size;
+ return insert_result.first;
+ }
+
+ template <typename Origin>
+ inline
+ void add_new(item_insertion_data& insertion) {
+ size_t size = item_size(insertion);
+ auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
+ Origin::move_if_local(insertion.data), insertion.expiry);
+ intrusive_ptr_add_ref(new_item);
+ auto& item_ref = *new_item;
+ _cache.insert(item_ref);
+ if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) {
+ _timer.rearm(item_ref.get_timeout());
+ }
+ _stats._bytes += size;
+ maybe_rehash();
+ }
+
+ void maybe_rehash() {
+ if (_cache.size() >= _resize_up_threshold) {
+ auto new_size = _cache.bucket_count() * 2;
+ std::vector<cache_type::bucket_type> old_buckets;
+ try {
+ old_buckets = std::exchange(_buckets, std::vector<cache_type::bucket_type>(new_size));
+ } catch (const std::bad_alloc& e) {
+ _stats._resize_failure++;
+ return;
+ }
+ _cache.rehash(typename cache_type::bucket_traits(_buckets.data(), new_size));
+ _resize_up_threshold = _cache.bucket_count() * load_factor;
+ }
+ }
+public:
+ cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size)
+ : _buckets(initial_bucket_count)
+ , _cache(cache_type::bucket_traits(_buckets.data(), initial_bucket_count))
+ {
+ using namespace std::chrono;
+
+ _wc_to_clock_type_delta =
+ duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
+
+ _timer.set_callback([this] { expire(); });
+ _flush_timer.set_callback([this] { flush_all(); });
+
+ // initialize per-thread slab allocator.
+ slab_holder = std::make_unique<slab_allocator<item>>(default_slab_growth_factor, per_cpu_slab_size, slab_page_size,
+ [this](item& item_ref) { erase<true, true, false>(item_ref); _stats._evicted++; });
+ slab = slab_holder.get();
+#ifdef __DEBUG__
+ static bool print_slab_classes = true;
+ if (print_slab_classes) {
+ print_slab_classes = false;
+ slab->print_slab_classes();
+ }
+#endif
+ }
+
+ ~cache() {
+ flush_all();
+ }
+
+ void flush_all() {
+ _flush_timer.cancel();
+ _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) {
+ erase<false, true>(*it);
+ });
+ }
+
+ void flush_at(uint32_t time) {
+ auto expiry = expiration(get_wc_to_clock_type_delta(), time);
+ _flush_timer.rearm(expiry.to_time_point());
+ }
+
+ template <typename Origin = local_origin_tag>
+ bool set(item_insertion_data& insertion) {
+ auto i = find(insertion.key);
+ if (i != _cache.end()) {
+ add_overriding<Origin>(i, insertion);
+ _stats._set_replaces++;
+ return true;
+ } else {
+ add_new<Origin>(insertion);
+ _stats._set_adds++;
+ return false;
+ }
+ }
+
+ template <typename Origin = local_origin_tag>
+ bool add(item_insertion_data& insertion) {
+ auto i = find(insertion.key);
+ if (i != _cache.end()) {
+ return false;
+ }
+
+ _stats._set_adds++;
+ add_new<Origin>(insertion);
+ return true;
+ }
+
+ template <typename Origin = local_origin_tag>
+ bool replace(item_insertion_data& insertion) {
+ auto i = find(insertion.key);
+ if (i == _cache.end()) {
+ return false;
+ }
+
+ _stats._set_replaces++;
+ add_overriding<Origin>(i, insertion);
+ return true;
+ }
+
+ bool remove(const item_key& key) {
+ auto i = find(key);
+ if (i == _cache.end()) {
+ _stats._delete_misses++;
+ return false;
+ }
+ _stats._delete_hits++;
+ auto& item_ref = *i;
+ erase(item_ref);
+ return true;
+ }
+
+ item_ptr get(const item_key& key) {
+ auto i = find(key);
+ if (i == _cache.end()) {
+ _stats._get_misses++;
+ return nullptr;
+ }
+ _stats._get_hits++;
+ auto& item_ref = *i;
+ return item_ptr(&item_ref);
+ }
+
+ template <typename Origin = local_origin_tag>
+ cas_result cas(item_insertion_data& insertion, item::version_type version) {
+ auto i = find(insertion.key);
+ if (i == _cache.end()) {
+ _stats._cas_misses++;
+ return cas_result::not_found;
+ }
+ auto& item_ref = *i;
+ if (item_ref._version != version) {
+ _stats._cas_badval++;
+ return cas_result::bad_version;
+ }
+ _stats._cas_hits++;
+ add_overriding<Origin>(i, insertion);
+ return cas_result::stored;
+ }
+
+ size_t size() {
+ return _cache.size();
+ }
+
+ size_t bucket_count() {
+ return _cache.bucket_count();
+ }
+
+ cache_stats stats() {
+ _stats._size = size();
+ return _stats;
+ }
+
+ template <typename Origin = local_origin_tag>
+ std::pair<item_ptr, bool> incr(item_key& key, uint64_t delta) {
+ auto i = find(key);
+ if (i == _cache.end()) {
+ _stats._incr_misses++;
+ return {item_ptr{}, false};
+ }
+ auto& item_ref = *i;
+ _stats._incr_hits++;
+ auto value = item_ref.data_as_integral();
+ if (!value) {
+ return {boost::intrusive_ptr<item>(&item_ref), false};
+ }
+ item_insertion_data insertion {
+ .key = Origin::move_if_local(key),
+ .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .data = to_sstring(*value + delta),
+ .expiry = item_ref._expiry
+ };
+ i = add_overriding<local_origin_tag>(i, insertion);
+ return {boost::intrusive_ptr<item>(&*i), true};
+ }
+
+ template <typename Origin = local_origin_tag>
+ std::pair<item_ptr, bool> decr(item_key& key, uint64_t delta) {
+ auto i = find(key);
+ if (i == _cache.end()) {
+ _stats._decr_misses++;
+ return {item_ptr{}, false};
+ }
+ auto& item_ref = *i;
+ _stats._decr_hits++;
+ auto value = item_ref.data_as_integral();
+ if (!value) {
+ return {boost::intrusive_ptr<item>(&item_ref), false};
+ }
+ item_insertion_data insertion {
+ .key = Origin::move_if_local(key),
+ .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
+ .data = to_sstring(*value - std::min(*value, delta)),
+ .expiry = item_ref._expiry
+ };
+ i = add_overriding<local_origin_tag>(i, insertion);
+ return {boost::intrusive_ptr<item>(&*i), true};
+ }
+
+ std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> print_hash_stats() {
+ static constexpr unsigned bits = sizeof(size_t) * 8;
+ size_t histo[bits + 1] {};
+ size_t max_size = 0;
+ unsigned max_bucket = 0;
+
+ for (size_t i = 0; i < _cache.bucket_count(); i++) {
+ size_t size = _cache.bucket_size(i);
+ unsigned bucket;
+ if (size == 0) {
+ bucket = 0;
+ } else {
+ bucket = bits - count_leading_zeros(size);
+ }
+ max_bucket = std::max(max_bucket, bucket);
+ max_size = std::max(max_size, size);
+ histo[bucket]++;
+ }
+
+ std::stringstream ss;
+
+ ss << "size: " << _cache.size() << "\n";
+ ss << "buckets: " << _cache.bucket_count() << "\n";
+ ss << "load: " << format("{:.2f}", (double)_cache.size() / _cache.bucket_count()) << "\n";
+ ss << "max bucket occupancy: " << max_size << "\n";
+ ss << "bucket occupancy histogram:\n";
+
+ for (unsigned i = 0; i < (max_bucket + 2); i++) {
+ ss << " ";
+ if (i == 0) {
+ ss << "0: ";
+ } else if (i == 1) {
+ ss << "1: ";
+ } else {
+ ss << (1 << (i - 1)) << "+: ";
+ }
+ ss << histo[i] << "\n";
+ }
+ return {engine().cpu_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
+ }
+
+ future<> stop() { return make_ready_future<>(); }
+ clock_type::duration get_wc_to_clock_type_delta() { return _wc_to_clock_type_delta; }
+};
+
+class sharded_cache {
+private:
+ distributed<cache>& _peers;
+
+ inline
+ unsigned get_cpu(const item_key& key) {
+ return std::hash<item_key>()(key) % smp::count;
+ }
+public:
+ sharded_cache(distributed<cache>& peers) : _peers(peers) {}
+
+ future<> flush_all() {
+ return _peers.invoke_on_all(&cache::flush_all);
+ }
+
+ future<> flush_at(uint32_t time) {
+ return _peers.invoke_on_all(&cache::flush_at, time);
+ }
+
+ auto get_wc_to_clock_type_delta() { return _peers.local().get_wc_to_clock_type_delta(); }
+
+ // The caller must keep @insertion live until the resulting future resolves.
+ future<bool> set(item_insertion_data& insertion) {
+ auto cpu = get_cpu(insertion.key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<bool>(_peers.local().set(insertion));
+ }
+ return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion));
+ }
+
+ // The caller must keep @insertion live until the resulting future resolves.
+ future<bool> add(item_insertion_data& insertion) {
+ auto cpu = get_cpu(insertion.key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<bool>(_peers.local().add(insertion));
+ }
+ return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion));
+ }
+
+ // The caller must keep @insertion live until the resulting future resolves.
+ future<bool> replace(item_insertion_data& insertion) {
+ auto cpu = get_cpu(insertion.key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<bool>(_peers.local().replace(insertion));
+ }
+ return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion));
+ }
+
+ // The caller must keep @key live until the resulting future resolves.
+ future<bool> remove(const item_key& key) {
+ auto cpu = get_cpu(key);
+ return _peers.invoke_on(cpu, &cache::remove, std::ref(key));
+ }
+
+ // The caller must keep @key live until the resulting future resolves.
+ future<item_ptr> get(const item_key& key) {
+ auto cpu = get_cpu(key);
+ return _peers.invoke_on(cpu, &cache::get, std::ref(key));
+ }
+
+ // The caller must keep @insertion live until the resulting future resolves.
+ future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
+ auto cpu = get_cpu(insertion.key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
+ }
+ return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version));
+ }
+
+ future<cache_stats> stats() {
+ return _peers.map_reduce(adder<cache_stats>(), &cache::stats);
+ }
+
+ // The caller must keep @key live until the resulting future resolves.
+ future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) {
+ auto cpu = get_cpu(key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<std::pair<item_ptr, bool>>(
+ _peers.local().incr<local_origin_tag>(key, delta));
+ }
+ return _peers.invoke_on(cpu, &cache::incr<remote_origin_tag>, std::ref(key), std::move(delta));
+ }
+
+ // The caller must keep @key live until the resulting future resolves.
+ future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) {
+ auto cpu = get_cpu(key);
+ if (engine().cpu_id() == cpu) {
+ return make_ready_future<std::pair<item_ptr, bool>>(
+ _peers.local().decr(key, delta));
+ }
+ return _peers.invoke_on(cpu, &cache::decr<remote_origin_tag>, std::ref(key), std::move(delta));
+ }
+
+ future<> print_hash_stats(output_stream<char>& out) {
+ return _peers.map_reduce([&out] (std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> data) mutable {
+ return out.write("=== CPU " + std::to_string(data.first) + " ===\r\n")
+ .then([&out, str = std::move(data.second)] {
+ return out.write(*str);
+ });
+ }, &cache::print_hash_stats);
+ }
+};
+
+struct system_stats {
+ uint32_t _curr_connections {};
+ uint32_t _total_connections {};
+ uint64_t _cmd_get {};
+ uint64_t _cmd_set {};
+ uint64_t _cmd_flush {};
+ clock_type::time_point _start_time;
+public:
+ system_stats() {
+ _start_time = clock_type::time_point::max();
+ }
+ system_stats(clock_type::time_point start_time)
+ : _start_time(start_time) {
+ }
+ system_stats self() {
+ return *this;
+ }
+ void operator+=(const system_stats& other) {
+ _curr_connections += other._curr_connections;
+ _total_connections += other._total_connections;
+ _cmd_get += other._cmd_get;
+ _cmd_set += other._cmd_set;
+ _cmd_flush += other._cmd_flush;
+ _start_time = std::min(_start_time, other._start_time);
+ }
+ future<> stop() { return make_ready_future<>(); }
+};
+
+class ascii_protocol {
+private:
+ using this_type = ascii_protocol;
+ sharded_cache& _cache;
+ distributed<system_stats>& _system_stats;
+ memcache_ascii_parser _parser;
+ item_key _item_key;
+ item_insertion_data _insertion;
+ std::vector<item_ptr> _items;
+private:
+ static constexpr const char *msg_crlf = "\r\n";
+ static constexpr const char *msg_error = "ERROR\r\n";
+ static constexpr const char *msg_stored = "STORED\r\n";
+ static constexpr const char *msg_not_stored = "NOT_STORED\r\n";
+ static constexpr const char *msg_end = "END\r\n";
+ static constexpr const char *msg_value = "VALUE ";
+ static constexpr const char *msg_deleted = "DELETED\r\n";
+ static constexpr const char *msg_not_found = "NOT_FOUND\r\n";
+ static constexpr const char *msg_ok = "OK\r\n";
+ static constexpr const char *msg_version = "VERSION " VERSION_STRING "\r\n";
+ static constexpr const char *msg_exists = "EXISTS\r\n";
+ static constexpr const char *msg_stat = "STAT ";
+ static constexpr const char *msg_out_of_memory = "SERVER_ERROR Out of memory allocating new item\r\n";
+ static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n";
+private:
+ template <bool WithVersion>
+ static void append_item(scattered_message<char>& msg, item_ptr item) {
+ if (!item) {
+ return;
+ }
+
+ msg.append_static("VALUE ");
+ msg.append_static(item->key());
+ msg.append_static(item->ascii_prefix());
+
+ if (WithVersion) {
+ msg.append_static(" ");
+ msg.append(to_sstring(item->version()));
+ }
+
+ msg.append_static(msg_crlf);
+ msg.append_static(item->value());
+ msg.append_static(msg_crlf);
+ msg.on_delete([item = std::move(item)] {});
+ }
+
+ template <bool WithVersion>
+ future<> handle_get(output_stream<char>& out) {
+ _system_stats.local()._cmd_get++;
+ if (_parser._keys.size() == 1) {
+ return _cache.get(_parser._keys[0]).then([&out] (auto item) -> future<> {
+ scattered_message<char> msg;
+ this_type::append_item<WithVersion>(msg, std::move(item));
+ msg.append_static(msg_end);
+ return out.write(std::move(msg));
+ });
+ } else {
+ _items.clear();
+ return parallel_for_each(_parser._keys.begin(), _parser._keys.end(), [this] (const auto& key) {
+ return _cache.get(key).then([this] (auto item) {
+ _items.emplace_back(std::move(item));
+ });
+ }).then([this, &out] () {
+ scattered_message<char> msg;
+ for (auto& item : _items) {
+ append_item<WithVersion>(msg, std::move(item));
+ }
+ msg.append_static(msg_end);
+ return out.write(std::move(msg));
+ });
+ }
+ }
+
+ template <typename Value>
+ static future<> print_stat(output_stream<char>& out, const char* key, Value value) {
+ return out.write(msg_stat)
+ .then([&out, key] { return out.write(key); })
+ .then([&out] { return out.write(" "); })
+ .then([&out, value] { return out.write(to_sstring(value)); })
+ .then([&out] { return out.write(msg_crlf); });
+ }
+
+ future<> print_stats(output_stream<char>& out) {
+ return _cache.stats().then([this, &out] (auto stats) {
+ return _system_stats.map_reduce(adder<system_stats>(), &system_stats::self)
+ .then([&out, all_cache_stats = std::move(stats)] (auto all_system_stats) -> future<> {
+ auto now = clock_type::now();
+ auto total_items = all_cache_stats._set_replaces + all_cache_stats._set_adds
+ + all_cache_stats._cas_hits;
+ return print_stat(out, "pid", getpid())
+ .then([&out, uptime = now - all_system_stats._start_time] {
+ return print_stat(out, "uptime",
+ std::chrono::duration_cast<std::chrono::seconds>(uptime).count());
+ }).then([now, &out] {
+ return print_stat(out, "time",
+ std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count());
+ }).then([&out] {
+ return print_stat(out, "version", VERSION_STRING);
+ }).then([&out] {
+ return print_stat(out, "pointer_size", sizeof(void*)*8);
+ }).then([&out, v = all_system_stats._curr_connections] {
+ return print_stat(out, "curr_connections", v);
+ }).then([&out, v = all_system_stats._total_connections] {
+ return print_stat(out, "total_connections", v);
+ }).then([&out, v = all_system_stats._curr_connections] {
+ return print_stat(out, "connection_structures", v);
+ }).then([&out, v = all_system_stats._cmd_get] {
+ return print_stat(out, "cmd_get", v);
+ }).then([&out, v = all_system_stats._cmd_set] {
+ return print_stat(out, "cmd_set", v);
+ }).then([&out, v = all_system_stats._cmd_flush] {
+ return print_stat(out, "cmd_flush", v);
+ }).then([&out] {
+ return print_stat(out, "cmd_touch", 0);
+ }).then([&out, v = all_cache_stats._get_hits] {
+ return print_stat(out, "get_hits", v);
+ }).then([&out, v = all_cache_stats._get_misses] {
+ return print_stat(out, "get_misses", v);
+ }).then([&out, v = all_cache_stats._delete_misses] {
+ return print_stat(out, "delete_misses", v);
+ }).then([&out, v = all_cache_stats._delete_hits] {
+ return print_stat(out, "delete_hits", v);
+ }).then([&out, v = all_cache_stats._incr_misses] {
+ return print_stat(out, "incr_misses", v);
+ }).then([&out, v = all_cache_stats._incr_hits] {
+ return print_stat(out, "incr_hits", v);
+ }).then([&out, v = all_cache_stats._decr_misses] {
+ return print_stat(out, "decr_misses", v);
+ }).then([&out, v = all_cache_stats._decr_hits] {
+ return print_stat(out, "decr_hits", v);
+ }).then([&out, v = all_cache_stats._cas_misses] {
+ return print_stat(out, "cas_misses", v);
+ }).then([&out, v = all_cache_stats._cas_hits] {
+ return print_stat(out, "cas_hits", v);
+ }).then([&out, v = all_cache_stats._cas_badval] {
+ return print_stat(out, "cas_badval", v);
+ }).then([&out] {
+ return print_stat(out, "touch_hits", 0);
+ }).then([&out] {
+ return print_stat(out, "touch_misses", 0);
+ }).then([&out] {
+ return print_stat(out, "auth_cmds", 0);
+ }).then([&out] {
+ return print_stat(out, "auth_errors", 0);
+ }).then([&out] {
+ return print_stat(out, "threads", smp::count);
+ }).then([&out, v = all_cache_stats._size] {
+ return print_stat(out, "curr_items", v);
+ }).then([&out, v = total_items] {
+ return print_stat(out, "total_items", v);
+ }).then([&out, v = all_cache_stats._expired] {
+ return print_stat(out, "seastar.expired", v);
+ }).then([&out, v = all_cache_stats._resize_failure] {
+ return print_stat(out, "seastar.resize_failure", v);
+ }).then([&out, v = all_cache_stats._evicted] {
+ return print_stat(out, "evictions", v);
+ }).then([&out, v = all_cache_stats._bytes] {
+ return print_stat(out, "bytes", v);
+ }).then([&out] {
+ return out.write(msg_end);
+ });
+ });
+ });
+ }
+public:
+ ascii_protocol(sharded_cache& cache, distributed<system_stats>& system_stats)
+ : _cache(cache)
+ , _system_stats(system_stats)
+ {}
+
+ void prepare_insertion() {
+ _insertion = item_insertion_data{
+ .key = std::move(_parser._key),
+ .ascii_prefix = make_sstring(" ", _parser._flags_str, " ", _parser._size_str),
+ .data = std::move(_parser._blob),
+ .expiry = expiration(_cache.get_wc_to_clock_type_delta(), _parser._expiration)
+ };
+ }
+
+ future<> handle(input_stream<char>& in, output_stream<char>& out) {
+ _parser.init();
+ return in.consume(_parser).then([this, &out] () -> future<> {
+ switch (_parser._state) {
+ case memcache_ascii_parser::state::eof:
+ return make_ready_future<>();
+
+ case memcache_ascii_parser::state::error:
+ return out.write(msg_error);
+
+ case memcache_ascii_parser::state::cmd_set:
+ {
+ _system_stats.local()._cmd_set++;
+ prepare_insertion();
+ auto f = _cache.set(_insertion);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (...) {
+ return out.write(msg_stored);
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_cas:
+ {
+ _system_stats.local()._cmd_set++;
+ prepare_insertion();
+ auto f = _cache.cas(_insertion, _parser._version);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (auto result) {
+ switch (result) {
+ case cas_result::stored:
+ return out.write(msg_stored);
+ case cas_result::not_found:
+ return out.write(msg_not_found);
+ case cas_result::bad_version:
+ return out.write(msg_exists);
+ default:
+ std::abort();
+ }
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_add:
+ {
+ _system_stats.local()._cmd_set++;
+ prepare_insertion();
+ auto f = _cache.add(_insertion);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (bool added) {
+ return out.write(added ? msg_stored : msg_not_stored);
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_replace:
+ {
+ _system_stats.local()._cmd_set++;
+ prepare_insertion();
+ auto f = _cache.replace(_insertion);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (auto replaced) {
+ return out.write(replaced ? msg_stored : msg_not_stored);
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_get:
+ return handle_get<false>(out);
+
+ case memcache_ascii_parser::state::cmd_gets:
+ return handle_get<true>(out);
+
+ case memcache_ascii_parser::state::cmd_delete:
+ {
+ auto f = _cache.remove(_parser._key);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (bool removed) {
+ return out.write(removed ? msg_deleted : msg_not_found);
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_flush_all:
+ {
+ _system_stats.local()._cmd_flush++;
+ if (_parser._expiration) {
+ auto f = _cache.flush_at(_parser._expiration);
+ if (_parser._noreply) {
+ return f;
+ }
+ return std::move(f).then([&out] {
+ return out.write(msg_ok);
+ });
+ } else {
+ auto f = _cache.flush_all();
+ if (_parser._noreply) {
+ return f;
+ }
+ return std::move(f).then([&out] {
+ return out.write(msg_ok);
+ });
+ }
+ }
+
+ case memcache_ascii_parser::state::cmd_version:
+ return out.write(msg_version);
+
+ case memcache_ascii_parser::state::cmd_stats:
+ return print_stats(out);
+
+ case memcache_ascii_parser::state::cmd_stats_hash:
+ return _cache.print_hash_stats(out);
+
+ case memcache_ascii_parser::state::cmd_incr:
+ {
+ auto f = _cache.incr(_parser._key, _parser._u64);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (auto result) {
+ auto item = std::move(result.first);
+ if (!item) {
+ return out.write(msg_not_found);
+ }
+ auto incremented = result.second;
+ if (!incremented) {
+ return out.write(msg_error_non_numeric_value);
+ }
+ return out.write(item->value().data(), item->value_size()).then([&out] {
+ return out.write(msg_crlf);
+ });
+ });
+ }
+
+ case memcache_ascii_parser::state::cmd_decr:
+ {
+ auto f = _cache.decr(_parser._key, _parser._u64);
+ if (_parser._noreply) {
+ return std::move(f).discard_result();
+ }
+ return std::move(f).then([&out] (auto result) {
+ auto item = std::move(result.first);
+ if (!item) {
+ return out.write(msg_not_found);
+ }
+ auto decremented = result.second;
+ if (!decremented) {
+ return out.write(msg_error_non_numeric_value);
+ }
+ return out.write(item->value().data(), item->value_size()).then([&out] {
+ return out.write(msg_crlf);
+ });
+ });
+ }
+ };
+ std::abort();
+ }).then_wrapped([this, &out] (auto&& f) -> future<> {
+ // FIXME: then_wrapped() being scheduled even though no exception was triggered has a
+ // performance cost of about 2.6%. Not using it means maintainability penalty.
+ try {
+ f.get();
+ } catch (std::bad_alloc& e) {
+ if (_parser._noreply) {
+ return make_ready_future<>();
+ }
+ return out.write(msg_out_of_memory);
+ }
+ return make_ready_future<>();
+ });
+ };
+};
+
+class udp_server {
+public:
+ static const size_t default_max_datagram_size = 1400;
+private:
+ sharded_cache& _cache;
+ distributed<system_stats>& _system_stats;
+ udp_channel _chan;
+ uint16_t _port;
+ size_t _max_datagram_size = default_max_datagram_size;
+
+ struct header {
+ packed<uint16_t> _request_id;
+ packed<uint16_t> _sequence_number;
+ packed<uint16_t> _n;
+ packed<uint16_t> _reserved;
+
+ template<typename Adjuster>
+ auto adjust_endianness(Adjuster a) {
+ return a(_request_id, _sequence_number, _n);
+ }
+ } __attribute__((packed));
+
+ struct connection {
+ ipv4_addr _src;
+ uint16_t _request_id;
+ input_stream<char> _in;
+ output_stream<char> _out;
+ std::vector<packet> _out_bufs;
+ ascii_protocol _proto;
+
+ connection(ipv4_addr src, uint16_t request_id, input_stream<char>&& in, size_t out_size,
+ sharded_cache& c, distributed<system_stats>& system_stats)
+ : _src(src)
+ , _request_id(request_id)
+ , _in(std::move(in))
+ , _out(output_stream<char>(data_sink(std::make_unique<vector_data_sink>(_out_bufs)), out_size, true))
+ , _proto(c, system_stats)
+ {}
+
+ future<> respond(udp_channel& chan) {
+ int i = 0;
+ return do_for_each(_out_bufs.begin(), _out_bufs.end(), [this, i, &chan] (packet& p) mutable {
+ header* out_hdr = p.prepend_header<header>(0);
+ out_hdr->_request_id = _request_id;
+ out_hdr->_sequence_number = i++;
+ out_hdr->_n = _out_bufs.size();
+ *out_hdr = hton(*out_hdr);
+ return chan.send(_src, std::move(p));
+ });
+ }
+ };
+
+public:
+ udp_server(sharded_cache& c, distributed<system_stats>& system_stats, uint16_t port = 11211)
+ : _cache(c)
+ , _system_stats(system_stats)
+ , _port(port)
+ {}
+
+ void set_max_datagram_size(size_t max_datagram_size) {
+ _max_datagram_size = max_datagram_size;
+ }
+
+ void start() {
+ _chan = engine().net().make_udp_channel({_port});
+ keep_doing([this] {
+ return _chan.receive().then([this](udp_datagram dgram) {
+ packet& p = dgram.get_data();
+ if (p.len() < sizeof(header)) {
+ // dropping invalid packet
+ return make_ready_future<>();
+ }
+
+ header hdr = ntoh(*p.get_header<header>());
+ p.trim_front(sizeof(hdr));
+
+ auto request_id = hdr._request_id;
+ auto in = as_input_stream(std::move(p));
+ auto conn = make_lw_shared<connection>(dgram.get_src(), request_id, std::move(in),
+ _max_datagram_size - sizeof(header), _cache, _system_stats);
+
+ if (hdr._n != 1 || hdr._sequence_number != 0) {
+ return conn->_out.write("CLIENT_ERROR only single-datagram requests supported\r\n").then([this, conn] {
+ return conn->_out.flush().then([this, conn] {
+ return conn->respond(_chan).then([conn] {});
+ });
+ });
+ }
+
+ return conn->_proto.handle(conn->_in, conn->_out).then([this, conn]() mutable {
+ return conn->_out.flush().then([this, conn] {
+ return conn->respond(_chan).then([conn] {});
+ });
+ });
+ });
+ }).or_terminate();
+ };
+
+ future<> stop() { return make_ready_future<>(); }
+};
+
+class tcp_server {
+private:
+ lw_shared_ptr<server_socket> _listener;
+ sharded_cache& _cache;
+ distributed<system_stats>& _system_stats;
+ uint16_t _port;
+ struct connection {
+ connected_socket _socket;
+ socket_address _addr;
+ input_stream<char> _in;
+ output_stream<char> _out;
+ ascii_protocol _proto;
+ distributed<system_stats>& _system_stats;
+ connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed<system_stats>& system_stats)
+ : _socket(std::move(socket))
+ , _addr(addr)
+ , _in(_socket.input())
+ , _out(_socket.output())
+ , _proto(c, system_stats)
+ , _system_stats(system_stats)
+ {
+ _system_stats.local()._curr_connections++;
+ _system_stats.local()._total_connections++;
+ }
+ ~connection() {
+ _system_stats.local()._curr_connections--;
+ }
+ };
+public:
+ tcp_server(sharded_cache& cache, distributed<system_stats>& system_stats, uint16_t port = 11211)
+ : _cache(cache)
+ , _system_stats(system_stats)
+ , _port(port)
+ {}
+
+ void start() {
+ listen_options lo;
+ lo.reuse_address = true;
+ _listener = engine().listen(make_ipv4_address({_port}), lo);
+ keep_doing([this] {
+ return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable {
+ auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats);
+ do_until([conn] { return conn->_in.eof(); }, [conn] {
+ return conn->_proto.handle(conn->_in, conn->_out).then([conn] {
+ return conn->_out.flush();
+ });
+ }).finally([conn] {
+ return conn->_out.close().finally([conn]{});
+ });
+ });
+ }).or_terminate();
+ }
+
+ future<> stop() { return make_ready_future<>(); }
+};
+
+class stats_printer {
+private:
+ timer<> _timer;
+ sharded_cache& _cache;
+public:
+ stats_printer(sharded_cache& cache)
+ : _cache(cache) {}
+
+ void start() {
+ _timer.set_callback([this] {
+ _cache.stats().then([] (auto stats) {
+ auto gets_total = stats._get_hits + stats._get_misses;
+ auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0;
+ auto sets_total = stats._set_adds + stats._set_replaces;
+ auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0;
+ std::cout << "items: " << stats._size << " "
+ << std::setprecision(2) << std::fixed
+ << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) "
+ << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%)";
+ std::cout << std::endl;
+ });
+ });
+ _timer.arm_periodic(std::chrono::seconds(1));
+ }
+
+ future<> stop() { return make_ready_future<>(); }
+};
+
+} /* namespace memcache */
+
+int main(int ac, char** av) {
+ distributed<memcache::cache> cache_peers;
+ memcache::sharded_cache cache(cache_peers);
+ distributed<memcache::system_stats> system_stats;
+ distributed<memcache::udp_server> udp_server;
+ distributed<memcache::tcp_server> tcp_server;
+ memcache::stats_printer stats(cache);
+
+ namespace bpo = boost::program_options;
+ app_template app;
+ app.add_options()
+ ("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size),
+ "Maximum size of UDP datagram")
+ ("max-slab-size", bpo::value<uint64_t>()->default_value(memcache::default_per_cpu_slab_size/MB),
+ "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)")
+ ("slab-page-size", bpo::value<uint64_t>()->default_value(memcache::default_slab_page_size/MB),
+ "Size of slab page (value in megabytes)")
+ ("stats",
+ "Print basic statistics periodically (every second)")
+ ("port", bpo::value<uint16_t>()->default_value(11211),
+ "Specify UDP and TCP ports for memcached server to listen on")
+ ;
+
+ return app.run_deprecated(ac, av, [&] {
+ engine().at_exit([&] { return tcp_server.stop(); });
+ engine().at_exit([&] { return udp_server.stop(); });
+ engine().at_exit([&] { return cache_peers.stop(); });
+ engine().at_exit([&] { return system_stats.stop(); });
+
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ uint64_t per_cpu_slab_size = config["max-slab-size"].as<uint64_t>() * MB;
+ uint64_t slab_page_size = config["slab-page-size"].as<uint64_t>() * MB;
+ return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] {
+ return system_stats.start(memcache::clock_type::now());
+ }).then([&] {
+ std::cout << PLATFORM << " memcached " << VERSION << "\n";
+ return make_ready_future<>();
+ }).then([&, port] {
+ return tcp_server.start(std::ref(cache), std::ref(system_stats), port);
+ }).then([&tcp_server] {
+ return tcp_server.invoke_on_all(&memcache::tcp_server::start);
+ }).then([&, port] {
+ if (engine().net().has_per_core_namespace()) {
+ return udp_server.start(std::ref(cache), std::ref(system_stats), port);
+ } else {
+ return udp_server.start_single(std::ref(cache), std::ref(system_stats), port);
+ }
+ }).then([&] {
+ return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size,
+ (size_t)config["max-datagram-size"].as<int>());
+ }).then([&] {
+ return udp_server.invoke_on_all(&memcache::udp_server::start);
+ }).then([&stats, start_stats = config.count("stats")] {
+ if (start_stats) {
+ stats.start();
+ }
+ });
+ });
+}
diff --git a/src/seastar/apps/memcached/memcached.hh b/src/seastar/apps/memcached/memcached.hh
new file mode 100644
index 00000000..9a587578
--- /dev/null
+++ b/src/seastar/apps/memcached/memcached.hh
@@ -0,0 +1,74 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <seastar/core/sstring.hh>
+
+namespace memcache {
+
+using namespace seastar;
+
+class item;
+class cache;
+
+class item_key {
+private:
+ sstring _key;
+ size_t _hash;
+public:
+ item_key() = default;
+ item_key(item_key&) = default;
+ item_key(sstring key)
+ : _key(key)
+ , _hash(std::hash<sstring>()(key))
+ {}
+ item_key(item_key&& other)
+ : _key(std::move(other._key))
+ , _hash(other._hash)
+ {
+ other._hash = 0;
+ }
+ size_t hash() const {
+ return _hash;
+ }
+ const sstring& key() const {
+ return _key;
+ }
+ bool operator==(const item_key& other) const {
+ return other._hash == _hash && other._key == _key;
+ }
+ void operator=(item_key&& other) {
+ _key = std::move(other._key);
+ _hash = other._hash;
+ other._hash = 0;
+ }
+};
+
+}
+
+namespace std {
+
+template <>
+struct hash<memcache::item_key> {
+ size_t operator()(const memcache::item_key& key) {
+ return key.hash();
+ }
+};
+
+} /* namespace std */
diff --git a/src/seastar/apps/memcached/tests/CMakeLists.txt b/src/seastar/apps/memcached/tests/CMakeLists.txt
new file mode 100644
index 00000000..9301cea7
--- /dev/null
+++ b/src/seastar/apps/memcached/tests/CMakeLists.txt
@@ -0,0 +1,75 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+if (Seastar_EXECUTE_ONLY_FAST_TESTS)
+ set (memcached_test_args --fast)
+else ()
+ set (memcached_test_args "")
+endif ()
+
+add_custom_target (app_memcached_test_memcached_run
+ DEPENDS
+ ${memcached_app}
+ ${CMAKE_CURRENT_SOURCE_DIR}/test.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/test_memcached.py
+ COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/test.py --memcached $<TARGET_FILE:app_memcached> ${memcached_test_args}
+ USES_TERMINAL)
+
+add_test (
+ NAME Seastar.app.memcached.memcached
+ COMMAND ${CMAKE_COMMAND} --build ${Seastar_BINARY_DIR} --target app_memcached_test_memcached_run)
+
+set_tests_properties (Seastar.app.memcached.memcached
+ PROPERTIES
+ TIMEOUT ${Seastar_TEST_TIMEOUT})
+
+add_executable (app_memcached_test_ascii
+ test_ascii_parser.cc)
+
+add_dependencies (app_memcached_test_ascii app_memcached)
+
+target_include_directories (app_memcached_test_ascii
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${Seastar_APP_MEMCACHED_BINARY_DIR}
+ ${Seastar_APP_MEMCACHED_SOURCE_DIR})
+
+target_compile_definitions (app_memcached_test_ascii
+ PRIVATE SEASTAR_TESTING_MAIN)
+
+target_link_libraries (app_memcached_test_ascii
+ PRIVATE
+ seastar_with_flags
+ seastar_testing)
+
+add_custom_target (app_memcached_test_ascii_run
+ DEPENDS app_memcached_test_ascii
+ COMMAND app_memcached_test_ascii -- -c 2
+ USES_TERMINAL)
+
+add_test (
+ NAME Seastar.app.memcached.ascii
+ COMMAND ${CMAKE_COMMAND} --build ${Seastar_BINARY_DIR} --target app_memcached_test_ascii_run)
+
+set_tests_properties (Seastar.app.memcached.ascii
+ PROPERTIES
+ TIMEOUT ${Seastar_TEST_TIMEOUT})
diff --git a/src/seastar/apps/memcached/tests/test.py b/src/seastar/apps/memcached/tests/test.py
new file mode 100755
index 00000000..c2f2b80c
--- /dev/null
+++ b/src/seastar/apps/memcached/tests/test.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+import sys
+import os
+import argparse
+import subprocess
+
+DIR_PATH = os.path.dirname(os.path.realpath(__file__))
+
+def run(args, cmd):
+ mc = subprocess.Popen([args.memcached, '--smp=2'])
+ print('Memcached started.')
+ try:
+ cmdline = [DIR_PATH + '/test_memcached.py'] + cmd
+ if args.fast:
+ cmdline.append('--fast')
+ print('Running: ' + ' '.join(cmdline))
+ subprocess.check_call(cmdline)
+ finally:
+ print('Killing memcached...')
+ mc.terminate();
+ mc.wait()
+ print('Memcached killed.')
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="Seastar test runner")
+ parser.add_argument('--fast', action="store_true", help="Run only fast tests")
+ parser.add_argument('--memcached', required=True, help='Path of the memcached executable')
+ args = parser.parse_args()
+
+ run(args, [])
+ run(args, ['-U'])
diff --git a/src/seastar/apps/memcached/tests/test_ascii_parser.cc b/src/seastar/apps/memcached/tests/test_ascii_parser.cc
new file mode 100644
index 00000000..596d193e
--- /dev/null
+++ b/src/seastar/apps/memcached/tests/test_ascii_parser.cc
@@ -0,0 +1,335 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <iostream>
+#include <limits>
+#include <seastar/testing/test_case.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/net/packet-data-source.hh>
+#include "ascii.hh"
+#include <seastar/core/future-util.hh>
+
+using namespace seastar;
+using namespace net;
+using namespace memcache;
+
+using parser_type = memcache_ascii_parser;
+
+static packet make_packet(std::vector<std::string> chunks, size_t buffer_size) {
+ packet p;
+ for (auto&& chunk : chunks) {
+ size_t size = chunk.size();
+ for (size_t pos = 0; pos < size; pos += buffer_size) {
+ auto now = std::min(pos + buffer_size, chunk.size()) - pos;
+ p.append(packet(chunk.data() + pos, now));
+ }
+ }
+ return p;
+}
+
+static auto make_input_stream(packet&& p) {
+ return input_stream<char>(data_source(
+ std::make_unique<packet_data_source>(std::move(p))));
+}
+
+static auto parse(packet&& p) {
+ auto is = make_lw_shared<input_stream<char>>(make_input_stream(std::move(p)));
+ auto parser = make_lw_shared<parser_type>();
+ parser->init();
+ return is->consume(*parser).then([is, parser] {
+ return make_ready_future<lw_shared_ptr<parser_type>>(parser);
+ });
+}
+
+auto for_each_fragment_size = [] (auto&& func) {
+ auto buffer_sizes = { 100000, 1000, 100, 10, 5, 2, 1 };
+ return do_for_each(buffer_sizes.begin(), buffer_sizes.end(), [func] (size_t buffer_size) {
+ return func([buffer_size] (std::vector<std::string> chunks) {
+ return make_packet(chunks, buffer_size);
+ });
+ });
+};
+
+SEASTAR_TEST_CASE(test_set_command_is_parsed) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return parse(make_packet({"set key 1 2 3\r\nabc\r\n"})).then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_flags_str == "1");
+ BOOST_REQUIRE(p->_expiration == 2);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_blob == "abc");
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_empty_data_is_parsed) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return parse(make_packet({"set key 1 2 0\r\n\r\n"})).then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_flags_str == "1");
+ BOOST_REQUIRE(p->_expiration == 2);
+ BOOST_REQUIRE(p->_size == 0);
+ BOOST_REQUIRE(p->_size_str == "0");
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_blob == "");
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_superflous_data_is_an_error) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return parse(make_packet({"set key 0 0 0\r\nasd\r\n"})).then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_not_enough_data_is_an_error) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return parse(make_packet({"set key 0 0 3\r\n"})).then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_u32_parsing) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return make_ready_future<>().then([make_packet] {
+ return parse(make_packet({"set key 0 0 0\r\n\r\n"})).then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_flags_str == "0");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 12345 0 0\r\n\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_flags_str == "12345");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key -1 0 0\r\n\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 1-1 0 0\r\n\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key " + std::to_string(std::numeric_limits<uint32_t>::max()) + " 0 0\r\n\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_flags_str == to_sstring(std::numeric_limits<uint32_t>::max()));
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_parsing_of_split_data) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return make_ready_future<>()
+ .then([make_packet] {
+ return parse(make_packet({"set key 11", "1 222 3\r\nasd\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 11", "1 22", "2 3", "\r\nasd\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set k", "ey 11", "1 2", "2", "2 3", "\r\nasd\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 111 222 3\r\n", "asd\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 111 222 3\r\na", "sd\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 111 222 3\r\nasd", "\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"set key 111 222 3\r\nasd\r", "\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key");
+ BOOST_REQUIRE(p->_flags_str == "111");
+ BOOST_REQUIRE(p->_expiration == 222);
+ BOOST_REQUIRE(p->_size == 3);
+ BOOST_REQUIRE(p->_size_str == "3");
+ BOOST_REQUIRE(p->_blob == "asd");
+ });
+ });
+ });
+}
+
+static std::vector<sstring> as_strings(std::vector<item_key>& keys) {
+ std::vector<sstring> v;
+ for (auto&& key : keys) {
+ v.push_back(key.key());
+ }
+ return v;
+}
+
+SEASTAR_TEST_CASE(test_get_parsing) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return make_ready_future<>()
+ .then([make_packet] {
+ return parse(make_packet({"get key1\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
+ BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1"}));
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"get key1 key2\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
+ BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1", "key2"}));
+ });
+ }).then([make_packet] {
+ return parse(make_packet({"get key1 key2 key3\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
+ BOOST_REQUIRE_EQUAL(as_strings(p->_keys), std::vector<sstring>({"key1", "key2", "key3"}));
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_catches_errors_in_get) {
+ return for_each_fragment_size([] (auto make_packet) {
+ return make_ready_future<>()
+ .then([make_packet] {
+ return parse(make_packet({"get\r\n"}))
+ .then([] (auto p) {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_parser_returns_eof_state_when_no_command_follows) {
+ return for_each_fragment_size([] (auto make_packet) {
+ auto p = make_shared<parser_type>();
+ auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"get key\r\n"})));
+ p->init();
+ return is->consume(*p).then([p] {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
+ }).then([is, p] {
+ p->init();
+ return is->consume(*p).then([p, is] {
+ BOOST_REQUIRE(p->_state == parser_type::state::eof);
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_incomplete_command_is_an_error) {
+ return for_each_fragment_size([] (auto make_packet) {
+ auto p = make_shared<parser_type>();
+ auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"get"})));
+ p->init();
+ return is->consume(*p).then([p] {
+ BOOST_REQUIRE(p->_state == parser_type::state::error);
+ }).then([is, p] {
+ p->init();
+ return is->consume(*p).then([p, is] {
+ BOOST_REQUIRE(p->_state == parser_type::state::eof);
+ });
+ });
+ });
+}
+
+SEASTAR_TEST_CASE(test_multiple_requests_in_one_stream) {
+ return for_each_fragment_size([] (auto make_packet) {
+ auto p = make_shared<parser_type>();
+ auto is = make_shared<input_stream<char>>(make_input_stream(make_packet({"set key1 1 1 5\r\ndata1\r\nset key2 2 2 6\r\ndata2+\r\n"})));
+ p->init();
+ return is->consume(*p).then([p] {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key1");
+ BOOST_REQUIRE(p->_flags_str == "1");
+ BOOST_REQUIRE(p->_expiration == 1);
+ BOOST_REQUIRE(p->_size == 5);
+ BOOST_REQUIRE(p->_size_str == "5");
+ BOOST_REQUIRE(p->_blob == "data1");
+ }).then([is, p] {
+ p->init();
+ return is->consume(*p).then([p, is] {
+ BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
+ BOOST_REQUIRE(p->_key.key() == "key2");
+ BOOST_REQUIRE(p->_flags_str == "2");
+ BOOST_REQUIRE(p->_expiration == 2);
+ BOOST_REQUIRE(p->_size == 6);
+ BOOST_REQUIRE(p->_size_str == "6");
+ BOOST_REQUIRE(p->_blob == "data2+");
+ });
+ });
+ });
+}
diff --git a/src/seastar/apps/memcached/tests/test_memcached.py b/src/seastar/apps/memcached/tests/test_memcached.py
new file mode 100755
index 00000000..4aca858e
--- /dev/null
+++ b/src/seastar/apps/memcached/tests/test_memcached.py
@@ -0,0 +1,600 @@
+#!/usr/bin/env python3
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from contextlib import contextmanager
+import socket
+import struct
+import sys
+import random
+import argparse
+import time
+import re
+import unittest
+
+server_addr = None
+call = None
+args = None
+
+class TimeoutError(Exception):
+ pass
+
+@contextmanager
+def tcp_connection(timeout=1):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(timeout)
+ s.connect(server_addr)
+ def call(msg):
+ s.send(msg.encode())
+ return s.recv(16*1024)
+ yield call
+ s.close()
+
+def slow(f):
+ def wrapper(self):
+ if args.fast:
+ raise unittest.SkipTest('Slow')
+ return f(self)
+ return wrapper
+
+def recv_all(s):
+ m = b''
+ while True:
+ data = s.recv(1024)
+ if not data:
+ break
+ m += data
+ return m
+
+def tcp_call(msg, timeout=1):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(timeout)
+ s.connect(server_addr)
+ s.send(msg.encode())
+ s.shutdown(socket.SHUT_WR)
+ data = recv_all(s)
+ s.close()
+ return data
+
+def udp_call_for_fragments(msg, timeout=1):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.settimeout(timeout)
+ this_req_id = random.randint(-32768, 32767)
+
+ datagram = struct.pack(">hhhh", this_req_id, 0, 1, 0) + msg.encode()
+ sock.sendto(datagram, server_addr)
+
+ messages = {}
+ n_determined = None
+ while True:
+ data, addr = sock.recvfrom(1500)
+ req_id, seq, n, res = struct.unpack_from(">hhhh", data)
+ content = data[8:]
+
+ if n_determined and n_determined != n:
+ raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined, n))
+ n_determined = n
+
+ if req_id != this_req_id:
+ raise Exception('Invalid request id: ' + req_id + ', expected ' + this_req_id)
+
+ if seq in messages:
+ raise Exception('Duplicate message for seq=' + seq)
+
+ messages[seq] = content
+ if len(messages) == n:
+ break
+
+ for k, v in sorted(messages.items(), key=lambda e: e[0]):
+ yield v
+
+ sock.close()
+
+def udp_call(msg, **kwargs):
+ return b''.join(udp_call_for_fragments(msg, **kwargs))
+
+class MemcacheTest(unittest.TestCase):
+ def set(self, key, value, flags=0, expiry=0):
+ self.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n')
+
+ def delete(self, key):
+ self.assertEqual(call('delete %s\r\n' % key), b'DELETED\r\n')
+
+ def assertHasKey(self, key):
+ resp = call('get %s\r\n' % key)
+ if not resp.startswith(('VALUE %s' % key).encode()):
+ self.fail('Key \'%s\' should be present, but got: %s' % (key, resp.decode()))
+
+ def assertNoKey(self, key):
+ resp = call('get %s\r\n' % key)
+ if resp != b'END\r\n':
+ self.fail('Key \'%s\' should not be present, but got: %s' % (key, resp.decode()))
+
+ def setKey(self, key):
+ self.set(key, 'some value')
+
+ def getItemVersion(self, key):
+ m = re.match(r'VALUE %s \d+ \d+ (?P<version>\d+)' % key, call('gets %s\r\n' % key).decode())
+ return int(m.group('version'))
+
+ def getStat(self, name, call_fn=None):
+ if not call_fn: call_fn = call
+ resp = call_fn('stats\r\n').decode()
+ m = re.search(r'STAT %s (?P<value>.+)' % re.escape(name), resp, re.MULTILINE)
+ return m.group('value')
+
+ def flush(self):
+ self.assertEqual(call('flush_all\r\n'), b'OK\r\n')
+
+ def tearDown(self):
+ self.flush()
+
+class TcpSpecificTests(MemcacheTest):
+ def test_recovers_from_errors_in_the_stream(self):
+ with tcp_connection() as conn:
+ self.assertEqual(conn('get\r\n'), b'ERROR\r\n')
+ self.assertEqual(conn('get key\r\n'), b'END\r\n')
+
+ def test_incomplete_command_results_in_error(self):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(server_addr)
+ s.send(b'get')
+ s.shutdown(socket.SHUT_WR)
+ self.assertEqual(recv_all(s), b'ERROR\r\n')
+ s.close()
+
+ def test_stream_closed_results_in_error(self):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(server_addr)
+ s.shutdown(socket.SHUT_WR)
+ self.assertEqual(recv_all(s), b'')
+ s.close()
+
+ def test_unsuccesful_parsing_does_not_leave_data_behind(self):
+ with tcp_connection() as conn:
+ self.assertEqual(conn('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n')
+ self.assertRegex(conn('delete a b c\r\n'), b'^(CLIENT_)?ERROR.*\r\n$')
+ self.assertEqual(conn('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+ self.assertEqual(conn('delete key\r\n'), b'DELETED\r\n')
+
+ def test_flush_all_no_reply(self):
+ self.assertEqual(call('flush_all noreply\r\n'), b'')
+
+ def test_set_no_reply(self):
+ self.assertEqual(call('set key 0 0 5 noreply\r\nhello\r\nget key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+ self.delete('key')
+
+ def test_delete_no_reply(self):
+ self.setKey('key')
+ self.assertEqual(call('delete key noreply\r\nget key\r\n'), b'END\r\n')
+
+ def test_add_no_reply(self):
+ self.assertEqual(call('add key 0 0 1 noreply\r\na\r\nget key\r\n'), b'VALUE key 0 1\r\na\r\nEND\r\n')
+ self.delete('key')
+
+ def test_replace_no_reply(self):
+ self.assertEqual(call('set key 0 0 1\r\na\r\n'), b'STORED\r\n')
+ self.assertEqual(call('replace key 0 0 1 noreply\r\nb\r\nget key\r\n'), b'VALUE key 0 1\r\nb\r\nEND\r\n')
+ self.delete('key')
+
+ def test_cas_noreply(self):
+ self.assertNoKey('key')
+ self.assertEqual(call('cas key 0 0 1 1 noreply\r\na\r\n'), b'')
+ self.assertNoKey('key')
+
+ self.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b'STORED\r\n')
+ version = self.getItemVersion('key')
+
+ self.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version + 1)), b'')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+
+ self.assertEqual(call('cas key 1 0 5 %d noreply\r\naloha\r\n' % (version)), b'')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 1 5\r\naloha\r\nEND\r\n')
+
+ self.delete('key')
+
+ @slow
+ def test_connection_statistics(self):
+ with tcp_connection() as conn:
+ curr_connections = int(self.getStat('curr_connections', call_fn=conn))
+ total_connections = int(self.getStat('total_connections', call_fn=conn))
+ with tcp_connection() as conn2:
+ self.assertEqual(curr_connections + 1, int(self.getStat('curr_connections', call_fn=conn)))
+ self.assertEqual(total_connections + 1, int(self.getStat('total_connections', call_fn=conn)))
+ self.assertEqual(total_connections + 1, int(self.getStat('total_connections', call_fn=conn)))
+ time.sleep(0.1)
+ self.assertEqual(curr_connections, int(self.getStat('curr_connections', call_fn=conn)))
+
+class UdpSpecificTests(MemcacheTest):
+ def test_large_response_is_split_into_mtu_chunks(self):
+ max_datagram_size = 1400
+ data = '1' * (max_datagram_size*3)
+ self.set('key', data)
+
+ chunks = list(udp_call_for_fragments('get key\r\n'))
+
+ for chunk in chunks:
+ self.assertLessEqual(len(chunk), max_datagram_size)
+
+ self.assertEqual(b''.join(chunks).decode(),
+ 'VALUE key 0 %d\r\n%s\r\n' \
+ 'END\r\n' % (len(data), data))
+
+ self.delete('key')
+
+class TestCommands(MemcacheTest):
+ def test_basic_commands(self):
+ self.assertEqual(call('get key\r\n'), b'END\r\n')
+ self.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+ self.assertEqual(call('delete key\r\n'), b'DELETED\r\n')
+ self.assertEqual(call('delete key\r\n'), b'NOT_FOUND\r\n')
+ self.assertEqual(call('get key\r\n'), b'END\r\n')
+
+ def test_error_handling(self):
+ self.assertEqual(call('get\r\n'), b'ERROR\r\n')
+
+ @slow
+ def test_expiry(self):
+ self.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b'STORED\r\n')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+ time.sleep(2)
+ self.assertEqual(call('get key\r\n'), b'END\r\n')
+
+ @slow
+ def test_expiry_at_epoch_time(self):
+ expiry = int(time.time()) + 1
+ self.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry), b'STORED\r\n')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
+ time.sleep(2)
+ self.assertEqual(call('get key\r\n'), b'END\r\n')
+
+ def test_multiple_keys_in_get(self):
+ self.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b'STORED\r\n')
+ self.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b'STORED\r\n')
+ resp = call('get key1 key\r\n')
+ self.assertRegex(resp, b'^(VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n)|(VALUE key 0 2\r\nv2\r\nVALUE key1 0 2\r\nv1\r\nEND\r\n)$')
+ self.delete("key")
+ self.delete("key1")
+
+ def test_flush_all(self):
+ self.set('key', 'value')
+ self.assertEqual(call('flush_all\r\n'), b'OK\r\n')
+ self.assertNoKey('key')
+
+ def test_keys_set_after_flush_remain(self):
+ self.assertEqual(call('flush_all\r\n'), b'OK\r\n')
+ self.setKey('key')
+ self.assertHasKey('key')
+ self.delete('key')
+
+ @slow
+ def test_flush_all_with_timeout_flushes_all_keys_even_those_set_after_flush(self):
+ self.setKey('key')
+ self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n')
+ self.assertHasKey('key')
+ self.setKey('key2')
+ time.sleep(3)
+ self.assertNoKey('key')
+ self.assertNoKey('key2')
+
+ @slow
+ def test_subsequent_flush_is_merged(self):
+ self.setKey('key')
+ self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n') # Can flush in anything between 1-2
+ self.assertEqual(call('flush_all 4\r\n'), b'OK\r\n') # Can flush in anything between 3-4
+ time.sleep(3)
+ self.assertHasKey('key')
+ self.setKey('key2')
+ time.sleep(4)
+ self.assertNoKey('key')
+ self.assertNoKey('key2')
+
+ @slow
+ def test_immediate_flush_cancels_delayed_flush(self):
+ self.assertEqual(call('flush_all 2\r\n'), b'OK\r\n')
+ self.assertEqual(call('flush_all\r\n'), b'OK\r\n')
+ self.setKey('key')
+ time.sleep(1)
+ self.assertHasKey('key')
+ self.delete('key')
+
+ @slow
+ def test_flushing_in_the_past(self):
+ self.setKey('key1')
+ time.sleep(1)
+ self.setKey('key2')
+ key2_time = int(time.time())
+ self.assertEqual(call('flush_all %d\r\n' % (key2_time - 1)), b'OK\r\n')
+ time.sleep(1)
+ self.assertNoKey("key1")
+ self.assertNoKey("key2")
+
+ @slow
+ def test_memcache_does_not_crash_when_flushing_with_already_expred_items(self):
+ self.assertEqual(call('set key1 0 2 5\r\nhello\r\n'), b'STORED\r\n')
+ time.sleep(1)
+ self.assertEqual(call('flush_all\r\n'), b'OK\r\n')
+
+ def test_response_spanning_many_datagrams(self):
+ key1_data = '1' * 1000
+ key2_data = '2' * 1000
+ key3_data = '3' * 1000
+ self.set('key1', key1_data)
+ self.set('key2', key2_data)
+ self.set('key3', key3_data)
+
+ resp = call('get key1 key2 key3\r\n').decode()
+
+ pattern = '^VALUE (?P<v1>.*?\r\n.*?)\r\nVALUE (?P<v2>.*?\r\n.*?)\r\nVALUE (?P<v3>.*?\r\n.*?)\r\nEND\r\n$'
+ self.assertRegex(resp, pattern)
+
+ m = re.match(pattern, resp)
+ self.assertEqual(set([m.group('v1'), m.group('v2'), m.group('v3')]),
+ set(['key1 0 %d\r\n%s' % (len(key1_data), key1_data),
+ 'key2 0 %d\r\n%s' % (len(key2_data), key2_data),
+ 'key3 0 %d\r\n%s' % (len(key3_data), key3_data)]))
+
+ self.delete('key1')
+ self.delete('key2')
+ self.delete('key3')
+
+ def test_version(self):
+ self.assertRegex(call('version\r\n'), b'^VERSION .*\r\n$')
+
+ def test_add(self):
+ self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'STORED\r\n')
+ self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'NOT_STORED\r\n')
+ self.delete('key')
+
+ def test_replace(self):
+ self.assertEqual(call('add key 0 0 1\r\na\r\n'), b'STORED\r\n')
+ self.assertEqual(call('replace key 0 0 1\r\na\r\n'), b'STORED\r\n')
+ self.delete('key')
+ self.assertEqual(call('replace key 0 0 1\r\na\r\n'), b'NOT_STORED\r\n')
+
+ def test_cas_and_gets(self):
+ self.assertEqual(call('cas key 0 0 1 1\r\na\r\n'), b'NOT_FOUND\r\n')
+ self.assertEqual(call('add key 0 0 5\r\nhello\r\n'), b'STORED\r\n')
+ version = self.getItemVersion('key')
+
+ self.assertEqual(call('set key 1 0 5\r\nhello\r\n'), b'STORED\r\n')
+ self.assertEqual(call('gets key\r\n').decode(), 'VALUE key 1 5 %d\r\nhello\r\nEND\r\n' % (version + 1))
+
+ self.assertEqual(call('cas key 0 0 5 %d\r\nhello\r\n' % (version)), b'EXISTS\r\n')
+ self.assertEqual(call('cas key 0 0 5 %d\r\naloha\r\n' % (version + 1)), b'STORED\r\n')
+ self.assertEqual(call('gets key\r\n').decode(), 'VALUE key 0 5 %d\r\naloha\r\nEND\r\n' % (version + 2))
+
+ self.delete('key')
+
+ def test_curr_items_stat(self):
+ self.assertEqual(0, int(self.getStat('curr_items')))
+ self.setKey('key')
+ self.assertEqual(1, int(self.getStat('curr_items')))
+ self.delete('key')
+ self.assertEqual(0, int(self.getStat('curr_items')))
+
+ def test_how_stats_change_with_different_commands(self):
+ get_count = int(self.getStat('cmd_get'))
+ set_count = int(self.getStat('cmd_set'))
+ flush_count = int(self.getStat('cmd_flush'))
+ total_items = int(self.getStat('total_items'))
+ get_misses = int(self.getStat('get_misses'))
+ get_hits = int(self.getStat('get_hits'))
+ cas_hits = int(self.getStat('cas_hits'))
+ cas_badval = int(self.getStat('cas_badval'))
+ cas_misses = int(self.getStat('cas_misses'))
+ delete_misses = int(self.getStat('delete_misses'))
+ delete_hits = int(self.getStat('delete_hits'))
+ curr_connections = int(self.getStat('curr_connections'))
+ incr_hits = int(self.getStat('incr_hits'))
+ incr_misses = int(self.getStat('incr_misses'))
+ decr_hits = int(self.getStat('decr_hits'))
+ decr_misses = int(self.getStat('decr_misses'))
+
+ call('get key\r\n')
+ get_count += 1
+ get_misses += 1
+
+ call('gets key\r\n')
+ get_count += 1
+ get_misses += 1
+
+ call('set key1 0 0 1\r\na\r\n')
+ set_count += 1
+ total_items += 1
+
+ call('get key1\r\n')
+ get_count += 1
+ get_hits += 1
+
+ call('add key1 0 0 1\r\na\r\n')
+ set_count += 1
+
+ call('add key2 0 0 1\r\na\r\n')
+ set_count += 1
+ total_items += 1
+
+ call('replace key1 0 0 1\r\na\r\n')
+ set_count += 1
+ total_items += 1
+
+ call('replace key3 0 0 1\r\na\r\n')
+ set_count += 1
+
+ call('cas key4 0 0 1 1\r\na\r\n')
+ set_count += 1
+ cas_misses += 1
+
+ call('cas key1 0 0 1 %d\r\na\r\n' % self.getItemVersion('key1'))
+ set_count += 1
+ get_count += 1
+ get_hits += 1
+ cas_hits += 1
+ total_items += 1
+
+ call('cas key1 0 0 1 %d\r\na\r\n' % (self.getItemVersion('key1') + 1))
+ set_count += 1
+ get_count += 1
+ get_hits += 1
+ cas_badval += 1
+
+ call('delete key1\r\n')
+ delete_hits += 1
+
+ call('delete key1\r\n')
+ delete_misses += 1
+
+ call('incr num 1\r\n')
+ incr_misses += 1
+ call('decr num 1\r\n')
+ decr_misses += 1
+
+ call('set num 0 0 1\r\n0\r\n')
+ set_count += 1
+ total_items += 1
+
+ call('incr num 1\r\n')
+ incr_hits += 1
+ call('decr num 1\r\n')
+ decr_hits += 1
+
+ self.flush()
+ flush_count += 1
+
+ self.assertEqual(get_count, int(self.getStat('cmd_get')))
+ self.assertEqual(set_count, int(self.getStat('cmd_set')))
+ self.assertEqual(flush_count, int(self.getStat('cmd_flush')))
+ self.assertEqual(total_items, int(self.getStat('total_items')))
+ self.assertEqual(get_hits, int(self.getStat('get_hits')))
+ self.assertEqual(get_misses, int(self.getStat('get_misses')))
+ self.assertEqual(cas_misses, int(self.getStat('cas_misses')))
+ self.assertEqual(cas_hits, int(self.getStat('cas_hits')))
+ self.assertEqual(cas_badval, int(self.getStat('cas_badval')))
+ self.assertEqual(delete_misses, int(self.getStat('delete_misses')))
+ self.assertEqual(delete_hits, int(self.getStat('delete_hits')))
+ self.assertEqual(0, int(self.getStat('curr_items')))
+ self.assertEqual(curr_connections, int(self.getStat('curr_connections')))
+ self.assertEqual(incr_misses, int(self.getStat('incr_misses')))
+ self.assertEqual(incr_hits, int(self.getStat('incr_hits')))
+ self.assertEqual(decr_misses, int(self.getStat('decr_misses')))
+ self.assertEqual(decr_hits, int(self.getStat('decr_hits')))
+
+ def test_incr(self):
+ self.assertEqual(call('incr key 0\r\n'), b'NOT_FOUND\r\n')
+
+ self.assertEqual(call('set key 0 0 1\r\n0\r\n'), b'STORED\r\n')
+ self.assertEqual(call('incr key 0\r\n'), b'0\r\n')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 1\r\n0\r\nEND\r\n')
+
+ self.assertEqual(call('incr key 1\r\n'), b'1\r\n')
+ self.assertEqual(call('incr key 2\r\n'), b'3\r\n')
+ self.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 1)), b'2\r\n')
+ self.assertEqual(call('incr key %d\r\n' % (pow(2, 64) - 3)), b'18446744073709551615\r\n')
+ self.assertRegex(call('incr key 1\r\n').decode(), r'0(\w+)?\r\n')
+
+ self.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b'STORED\r\n')
+ self.assertEqual(call('incr key 1\r\n'), b'2\r\n')
+
+ self.assertEqual(call('set key 0 0 2\r\n09\r\n'), b'STORED\r\n')
+ self.assertEqual(call('incr key 1\r\n'), b'10\r\n')
+
+ def test_decr(self):
+ self.assertEqual(call('decr key 0\r\n'), b'NOT_FOUND\r\n')
+
+ self.assertEqual(call('set key 0 0 1\r\n7\r\n'), b'STORED\r\n')
+ self.assertEqual(call('decr key 1\r\n'), b'6\r\n')
+ self.assertEqual(call('get key\r\n'), b'VALUE key 0 1\r\n6\r\nEND\r\n')
+
+ self.assertEqual(call('decr key 6\r\n'), b'0\r\n')
+ self.assertEqual(call('decr key 2\r\n'), b'0\r\n')
+
+ self.assertEqual(call('set key 0 0 2\r\n20\r\n'), b'STORED\r\n')
+ self.assertRegex(call('decr key 11\r\n').decode(), r'^9( )?\r\n$')
+
+ self.assertEqual(call('set key 0 0 3\r\n100\r\n'), b'STORED\r\n')
+ self.assertRegex(call('decr key 91\r\n').decode(), r'^9( )?\r\n$')
+
+ self.assertEqual(call('set key 0 0 2\r\n1 \r\n'), b'STORED\r\n')
+ self.assertEqual(call('decr key 1\r\n'), b'0\r\n')
+
+ self.assertEqual(call('set key 0 0 2\r\n09\r\n'), b'STORED\r\n')
+ self.assertEqual(call('decr key 1\r\n'), b'8\r\n')
+
+ def test_incr_and_decr_on_invalid_input(self):
+ error_msg = b'CLIENT_ERROR cannot increment or decrement non-numeric value\r\n'
+ for cmd in ['incr', 'decr']:
+ for value in ['', '-1', 'a', '0x1', '18446744073709551616']:
+ self.assertEqual(call('set key 0 0 %d\r\n%s\r\n' % (len(value), value)), b'STORED\r\n')
+ prev = call('get key\r\n')
+ self.assertEqual(call(cmd + ' key 1\r\n'), error_msg, "cmd=%s, value=%s" % (cmd, value))
+ self.assertEqual(call('get key\r\n'), prev)
+ self.delete('key')
+
+def wait_for_memcache_tcp(timeout=4):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ timeout_at = time.time() + timeout
+ while True:
+ if time.time() >= timeout_at:
+ raise TimeoutError()
+ try:
+ s.connect(server_addr)
+ s.close()
+ break
+ except ConnectionRefusedError:
+ time.sleep(0.1)
+
+
+def wait_for_memcache_udp(timeout=4):
+ timeout_at = time.time() + timeout
+ while True:
+ if time.time() >= timeout_at:
+ raise TimeoutError()
+ try:
+ udp_call('version\r\n', timeout=0.2)
+ break
+ except socket.timeout:
+ pass
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description="memcache protocol tests")
+ parser.add_argument('--server', '-s', action="store", help="server adddress in <host>:<port> format", default="localhost:11211")
+ parser.add_argument('--udp', '-U', action="store_true", help="Use UDP protocol")
+ parser.add_argument('--fast', action="store_true", help="Run only fast tests")
+ args = parser.parse_args()
+
+ host, port = args.server.split(':')
+ server_addr = (host, int(port))
+
+ if args.udp:
+ call = udp_call
+ wait_for_memcache_udp()
+ else:
+ call = tcp_call
+ wait_for_memcache_tcp()
+
+ runner = unittest.TextTestRunner()
+ loader = unittest.TestLoader()
+ suite = unittest.TestSuite()
+ suite.addTest(loader.loadTestsFromTestCase(TestCommands))
+ if args.udp:
+ suite.addTest(loader.loadTestsFromTestCase(UdpSpecificTests))
+ else:
+ suite.addTest(loader.loadTestsFromTestCase(TcpSpecificTests))
+ result = runner.run(suite)
+ if not result.wasSuccessful():
+ sys.exit(1)
diff --git a/src/seastar/apps/seawreck/CMakeLists.txt b/src/seastar/apps/seawreck/CMakeLists.txt
new file mode 100644
index 00000000..f9ffd357
--- /dev/null
+++ b/src/seastar/apps/seawreck/CMakeLists.txt
@@ -0,0 +1,24 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+seastar_add_app (seawreck
+ SOURCES seawreck.cc)
diff --git a/src/seastar/apps/seawreck/seawreck.cc b/src/seastar/apps/seawreck/seawreck.cc
new file mode 100644
index 00000000..61066956
--- /dev/null
+++ b/src/seastar/apps/seawreck/seawreck.cc
@@ -0,0 +1,225 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/http/response_parser.hh>
+#include <seastar/core/print.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/future-util.hh>
+#include <chrono>
+
+using namespace seastar;
+
+template <typename... Args>
+void http_debug(const char* fmt, Args&&... args) {
+#if HTTP_DEBUG
+ print(fmt, std::forward<Args>(args)...);
+#endif
+}
+
+class http_client {
+private:
+ unsigned _duration;
+ unsigned _conn_per_core;
+ unsigned _reqs_per_conn;
+ std::vector<connected_socket> _sockets;
+ semaphore _conn_connected{0};
+ semaphore _conn_finished{0};
+ timer<> _run_timer;
+ bool _timer_based;
+ bool _timer_done{false};
+ uint64_t _total_reqs{0};
+public:
+ http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn)
+ : _duration(duration)
+ , _conn_per_core(total_conn / smp::count)
+ , _reqs_per_conn(reqs_per_conn)
+ , _run_timer([this] { _timer_done = true; })
+ , _timer_based(reqs_per_conn == 0) {
+ }
+
+ class connection {
+ private:
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ http_response_parser _parser;
+ http_client* _http_client;
+ uint64_t _nr_done{0};
+ public:
+ connection(connected_socket&& fd, http_client* client)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output())
+ , _http_client(client){
+ }
+
+ uint64_t nr_done() {
+ return _nr_done;
+ }
+
+ future<> do_req() {
+ return _write_buf.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ _parser.init();
+ return _read_buf.consume(_parser).then([this] {
+ // Read HTTP response header first
+ if (_parser.eof()) {
+ return make_ready_future<>();
+ }
+ auto _rsp = _parser.get_parsed_response();
+ auto it = _rsp->_headers.find("Content-Length");
+ if (it == _rsp->_headers.end()) {
+ fmt::print("Error: HTTP response does not contain: Content-Length\n");
+ return make_ready_future<>();
+ }
+ auto content_len = std::stoi(it->second);
+ http_debug("Content-Length = %d\n", content_len);
+ // Read HTTP response body
+ return _read_buf.read_exactly(content_len).then([this] (temporary_buffer<char> buf) {
+ _nr_done++;
+ http_debug("%s\n", buf.get());
+ if (_http_client->done(_nr_done)) {
+ return make_ready_future();
+ } else {
+ return do_req();
+ }
+ });
+ });
+ });
+ }
+ };
+
+ future<uint64_t> total_reqs() {
+ fmt::print("Requests on cpu {:2d}: {:d}\n", engine().cpu_id(), _total_reqs);
+ return make_ready_future<uint64_t>(_total_reqs);
+ }
+
+ bool done(uint64_t nr_done) {
+ if (_timer_based) {
+ return _timer_done;
+ } else {
+ return nr_done >= _reqs_per_conn;
+ }
+ }
+
+ future<> connect(ipv4_addr server_addr) {
+ // Establish all the TCP connections first
+ for (unsigned i = 0; i < _conn_per_core; i++) {
+ engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) {
+ _sockets.push_back(std::move(fd));
+ http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id());
+ _conn_connected.signal();
+ }).or_terminate();
+ }
+ return _conn_connected.wait(_conn_per_core);
+ }
+
+ future<> run() {
+ // All connected, start HTTP request
+ http_debug("Established all %6d tcp connections on cpu %3d\n", _conn_per_core, engine().cpu_id());
+ if (_timer_based) {
+ _run_timer.arm(std::chrono::seconds(_duration));
+ }
+ for (auto&& fd : _sockets) {
+ auto conn = new connection(std::move(fd), this);
+ conn->do_req().then_wrapped([this, conn] (auto&& f) {
+ http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
+ _total_reqs += conn->nr_done();
+ _conn_finished.signal();
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ fmt::print("http request error: {}\n", ex.what());
+ }
+ });
+ }
+
+ // All finished
+ return _conn_finished.wait(_conn_per_core);
+ }
+ future<> stop() {
+ return make_ready_future();
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("server,s", bpo::value<std::string>()->default_value("192.168.66.100:10000"), "Server address")
+ ("conn,c", bpo::value<unsigned>()->default_value(100), "total connections")
+ ("reqs,r", bpo::value<unsigned>()->default_value(0), "reqs per connection")
+ ("duration,d", bpo::value<unsigned>()->default_value(10), "duration of the test in seconds)");
+
+ return app.run(ac, av, [&app] () -> future<int> {
+ auto& config = app.configuration();
+ auto server = config["server"].as<std::string>();
+ auto reqs_per_conn = config["reqs"].as<unsigned>();
+ auto total_conn= config["conn"].as<unsigned>();
+ auto duration = config["duration"].as<unsigned>();
+
+ if (total_conn % smp::count != 0) {
+ fmt::print("Error: conn needs to be n * cpu_nr\n");
+ return make_ready_future<int>(-1);
+ }
+
+ auto http_clients = new distributed<http_client>;
+
+ // Start http requests on all the cores
+ auto started = steady_clock_type::now();
+ fmt::print("========== http_client ============\n");
+ fmt::print("Server: {}\n", server);
+ fmt::print("Connections: {:d}\n", total_conn);
+ fmt::print("Requests/connection: {}\n", reqs_per_conn == 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn));
+ return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn)).then([http_clients, server] {
+ return http_clients->invoke_on_all(&http_client::connect, ipv4_addr{server});
+ }).then([http_clients] {
+ return http_clients->invoke_on_all(&http_client::run);
+ }).then([http_clients] {
+ return http_clients->map_reduce(adder<uint64_t>(), &http_client::total_reqs);
+ }).then([http_clients, started] (auto total_reqs) {
+ // All the http requests are finished
+ auto finished = steady_clock_type::now();
+ auto elapsed = finished - started;
+ auto secs = static_cast<double>(elapsed.count() / 1000000000.0);
+ fmt::print("Total cpus: {:d}\n", smp::count);
+ fmt::print("Total requests: {:d}\n", total_reqs);
+ fmt::print("Total time: {:f}\n", secs);
+ fmt::print("Requests/sec: {:f}\n", static_cast<double>(total_reqs) / secs);
+ fmt::print("========== done ============\n");
+ return http_clients->stop().then([http_clients] {
+ // FIXME: If we call engine().exit(0) here to exit when
+ // requests are done. The tcp connection will not be closed
+ // properly, becasue we exit too earily and the FIN packets are
+ // not exchanged.
+ delete http_clients;
+ return make_ready_future<int>(0);
+ });
+ });
+ });
+}