/* * This file is open source software, licensed to you under the terms * of the Apache License, Version 2.0 (the "License"). See the NOTICE file * distributed with this work for additional information regarding copyright * ownership. You may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Copyright 2014 Cloudius Systems */ #include #include #include #include #include #include #include using namespace seastar; static std::string str_ping{"ping"}; static std::string str_txtx{"txtx"}; static std::string str_rxrx{"rxrx"}; static std::string str_pong{"pong"}; static std::string str_unknow{"unknow cmd"}; static int tx_msg_total_size = 100 * 1024 * 1024; static int tx_msg_size = 4 * 1024; static int tx_msg_nr = tx_msg_total_size / tx_msg_size; static int rx_msg_size = 4 * 1024; static std::string str_txbuf(tx_msg_size, 'X'); static bool enable_tcp = false; static bool enable_sctp = false; class tcp_server { std::vector _tcp_listeners; std::vector _sctp_listeners; public: future<> listen(ipv4_addr addr) { if (enable_tcp) { listen_options lo; lo.proto = transport::TCP; lo.reuse_address = true; _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); do_accepts(_tcp_listeners); } if (enable_sctp) { listen_options lo; lo.proto = transport::SCTP; lo.reuse_address = true; _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); do_accepts(_sctp_listeners); } return make_ready_future<>(); } // FIXME: We should properly tear down the service here. future<> stop() { return make_ready_future<>(); } void do_accepts(std::vector& listeners) { int which = listeners.size() - 1; // Accept in the background. (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable { connected_socket fd = std::move(ar.connection); socket_address addr = std::move(ar.remote_address); auto conn = new connection(*this, std::move(fd), addr); (void)conn->process().then_wrapped([conn] (auto&& f) { delete conn; try { f.get(); } catch (std::exception& ex) { std::cout << "request error " << ex.what() << "\n"; } }); do_accepts(listeners); }).then_wrapped([] (auto&& f) { try { f.get(); } catch (std::exception& ex) { std::cout << "accept failed: " << ex.what() << "\n"; } }); } class connection { connected_socket _fd; input_stream _read_buf; output_stream _write_buf; public: connection(tcp_server& server, connected_socket&& fd, socket_address addr) : _fd(std::move(fd)) , _read_buf(_fd.input()) , _write_buf(_fd.output()) {} future<> process() { return read(); } future<> read() { if (_read_buf.eof()) { return make_ready_future(); } // Expect 4 bytes cmd from client size_t n = 4; return _read_buf.read_exactly(n).then([this] (temporary_buffer buf) { if (buf.size() == 0) { return make_ready_future(); } auto cmd = std::string(buf.get(), buf.size()); // pingpong test if (cmd == str_ping) { return _write_buf.write(str_pong).then([this] { return _write_buf.flush(); }).then([this] { return this->read(); }); // server tx test } else if (cmd == str_txtx) { return tx_test(); // server tx test } else if (cmd == str_rxrx) { return rx_test(); // unknow test } else { return _write_buf.write(str_unknow).then([this] { return _write_buf.flush(); }).then([] { return make_ready_future(); }); } }); } future<> do_write(int end) { if (end == 0) { return make_ready_future<>(); } return _write_buf.write(str_txbuf).then([this] { return _write_buf.flush(); }).then([this, end] { return do_write(end - 1); }); } future<> tx_test() { return do_write(tx_msg_nr).then([this] { return _write_buf.close(); }).then([] { return make_ready_future<>(); }); } future<> do_read() { return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer buf) { if (buf.size() == 0) { return make_ready_future(); } else { return do_read(); } }); } future<> rx_test() { return do_read().then([] { return make_ready_future<>(); }); } }; }; namespace bpo = boost::program_options; int main(int ac, char** av) { app_template app; app.add_options() ("port", bpo::value()->default_value(10000), "TCP server port") ("tcp", bpo::value()->default_value("yes"), "tcp listen") ("sctp", bpo::value()->default_value("no"), "sctp listen") ; return app.run_deprecated(ac, av, [&] { auto&& config = app.configuration(); uint16_t port = config["port"].as(); enable_tcp = config["tcp"].as() == "yes"; enable_sctp = config["sctp"].as() == "yes"; if (!enable_tcp && !enable_sctp) { fprint(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n"); return engine().exit(1); } auto server = new distributed; (void)server->start().then([server = std::move(server), port] () mutable { engine().at_exit([server] { return server->stop(); }); // Start listening in the background. (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port}); }).then([port] { std::cout << "Seastar TCP server listening on port " << port << " ...\n"; }); }); }