summaryrefslogtreecommitdiffstats
path: root/src/seastar/apps/seawreck
diff options
context:
space:
mode:
Diffstat (limited to 'src/seastar/apps/seawreck')
-rw-r--r--src/seastar/apps/seawreck/CMakeLists.txt24
-rw-r--r--src/seastar/apps/seawreck/seawreck.cc225
2 files changed, 249 insertions, 0 deletions
diff --git a/src/seastar/apps/seawreck/CMakeLists.txt b/src/seastar/apps/seawreck/CMakeLists.txt
new file mode 100644
index 00000000..f9ffd357
--- /dev/null
+++ b/src/seastar/apps/seawreck/CMakeLists.txt
@@ -0,0 +1,24 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+seastar_add_app (seawreck
+ SOURCES seawreck.cc)
diff --git a/src/seastar/apps/seawreck/seawreck.cc b/src/seastar/apps/seawreck/seawreck.cc
new file mode 100644
index 00000000..61066956
--- /dev/null
+++ b/src/seastar/apps/seawreck/seawreck.cc
@@ -0,0 +1,225 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/http/response_parser.hh>
+#include <seastar/core/print.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/semaphore.hh>
+#include <seastar/core/future-util.hh>
+#include <chrono>
+
+using namespace seastar;
+
+template <typename... Args>
+void http_debug(const char* fmt, Args&&... args) {
+#if HTTP_DEBUG
+ print(fmt, std::forward<Args>(args)...);
+#endif
+}
+
+class http_client {
+private:
+ unsigned _duration;
+ unsigned _conn_per_core;
+ unsigned _reqs_per_conn;
+ std::vector<connected_socket> _sockets;
+ semaphore _conn_connected{0};
+ semaphore _conn_finished{0};
+ timer<> _run_timer;
+ bool _timer_based;
+ bool _timer_done{false};
+ uint64_t _total_reqs{0};
+public:
+ http_client(unsigned duration, unsigned total_conn, unsigned reqs_per_conn)
+ : _duration(duration)
+ , _conn_per_core(total_conn / smp::count)
+ , _reqs_per_conn(reqs_per_conn)
+ , _run_timer([this] { _timer_done = true; })
+ , _timer_based(reqs_per_conn == 0) {
+ }
+
+ class connection {
+ private:
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ http_response_parser _parser;
+ http_client* _http_client;
+ uint64_t _nr_done{0};
+ public:
+ connection(connected_socket&& fd, http_client* client)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output())
+ , _http_client(client){
+ }
+
+ uint64_t nr_done() {
+ return _nr_done;
+ }
+
+ future<> do_req() {
+ return _write_buf.write("GET / HTTP/1.1\r\nHost: 127.0.0.1:10000\r\n\r\n").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ _parser.init();
+ return _read_buf.consume(_parser).then([this] {
+ // Read HTTP response header first
+ if (_parser.eof()) {
+ return make_ready_future<>();
+ }
+ auto _rsp = _parser.get_parsed_response();
+ auto it = _rsp->_headers.find("Content-Length");
+ if (it == _rsp->_headers.end()) {
+ fmt::print("Error: HTTP response does not contain: Content-Length\n");
+ return make_ready_future<>();
+ }
+ auto content_len = std::stoi(it->second);
+ http_debug("Content-Length = %d\n", content_len);
+ // Read HTTP response body
+ return _read_buf.read_exactly(content_len).then([this] (temporary_buffer<char> buf) {
+ _nr_done++;
+ http_debug("%s\n", buf.get());
+ if (_http_client->done(_nr_done)) {
+ return make_ready_future();
+ } else {
+ return do_req();
+ }
+ });
+ });
+ });
+ }
+ };
+
+ future<uint64_t> total_reqs() {
+ fmt::print("Requests on cpu {:2d}: {:d}\n", engine().cpu_id(), _total_reqs);
+ return make_ready_future<uint64_t>(_total_reqs);
+ }
+
+ bool done(uint64_t nr_done) {
+ if (_timer_based) {
+ return _timer_done;
+ } else {
+ return nr_done >= _reqs_per_conn;
+ }
+ }
+
+ future<> connect(ipv4_addr server_addr) {
+ // Establish all the TCP connections first
+ for (unsigned i = 0; i < _conn_per_core; i++) {
+ engine().net().connect(make_ipv4_address(server_addr)).then([this] (connected_socket fd) {
+ _sockets.push_back(std::move(fd));
+ http_debug("Established connection %6d on cpu %3d\n", _conn_connected.current(), engine().cpu_id());
+ _conn_connected.signal();
+ }).or_terminate();
+ }
+ return _conn_connected.wait(_conn_per_core);
+ }
+
+ future<> run() {
+ // All connected, start HTTP request
+ http_debug("Established all %6d tcp connections on cpu %3d\n", _conn_per_core, engine().cpu_id());
+ if (_timer_based) {
+ _run_timer.arm(std::chrono::seconds(_duration));
+ }
+ for (auto&& fd : _sockets) {
+ auto conn = new connection(std::move(fd), this);
+ conn->do_req().then_wrapped([this, conn] (auto&& f) {
+ http_debug("Finished connection %6d on cpu %3d\n", _conn_finished.current(), engine().cpu_id());
+ _total_reqs += conn->nr_done();
+ _conn_finished.signal();
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ fmt::print("http request error: {}\n", ex.what());
+ }
+ });
+ }
+
+ // All finished
+ return _conn_finished.wait(_conn_per_core);
+ }
+ future<> stop() {
+ return make_ready_future();
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("server,s", bpo::value<std::string>()->default_value("192.168.66.100:10000"), "Server address")
+ ("conn,c", bpo::value<unsigned>()->default_value(100), "total connections")
+ ("reqs,r", bpo::value<unsigned>()->default_value(0), "reqs per connection")
+ ("duration,d", bpo::value<unsigned>()->default_value(10), "duration of the test in seconds)");
+
+ return app.run(ac, av, [&app] () -> future<int> {
+ auto& config = app.configuration();
+ auto server = config["server"].as<std::string>();
+ auto reqs_per_conn = config["reqs"].as<unsigned>();
+ auto total_conn= config["conn"].as<unsigned>();
+ auto duration = config["duration"].as<unsigned>();
+
+ if (total_conn % smp::count != 0) {
+ fmt::print("Error: conn needs to be n * cpu_nr\n");
+ return make_ready_future<int>(-1);
+ }
+
+ auto http_clients = new distributed<http_client>;
+
+ // Start http requests on all the cores
+ auto started = steady_clock_type::now();
+ fmt::print("========== http_client ============\n");
+ fmt::print("Server: {}\n", server);
+ fmt::print("Connections: {:d}\n", total_conn);
+ fmt::print("Requests/connection: {}\n", reqs_per_conn == 0 ? "dynamic (timer based)" : std::to_string(reqs_per_conn));
+ return http_clients->start(std::move(duration), std::move(total_conn), std::move(reqs_per_conn)).then([http_clients, server] {
+ return http_clients->invoke_on_all(&http_client::connect, ipv4_addr{server});
+ }).then([http_clients] {
+ return http_clients->invoke_on_all(&http_client::run);
+ }).then([http_clients] {
+ return http_clients->map_reduce(adder<uint64_t>(), &http_client::total_reqs);
+ }).then([http_clients, started] (auto total_reqs) {
+ // All the http requests are finished
+ auto finished = steady_clock_type::now();
+ auto elapsed = finished - started;
+ auto secs = static_cast<double>(elapsed.count() / 1000000000.0);
+ fmt::print("Total cpus: {:d}\n", smp::count);
+ fmt::print("Total requests: {:d}\n", total_reqs);
+ fmt::print("Total time: {:f}\n", secs);
+ fmt::print("Requests/sec: {:f}\n", static_cast<double>(total_reqs) / secs);
+ fmt::print("========== done ============\n");
+ return http_clients->stop().then([http_clients] {
+ // FIXME: If we call engine().exit(0) here to exit when
+ // requests are done. The tcp connection will not be closed
+ // properly, becasue we exit too earily and the FIN packets are
+ // not exchanged.
+ delete http_clients;
+ return make_ready_future<int>(0);
+ });
+ });
+ });
+}