From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/seastar/demos/tcp_sctp_server_demo.cc | 205 ++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 src/seastar/demos/tcp_sctp_server_demo.cc (limited to 'src/seastar/demos/tcp_sctp_server_demo.cc') diff --git a/src/seastar/demos/tcp_sctp_server_demo.cc b/src/seastar/demos/tcp_sctp_server_demo.cc new file mode 100644 index 000000000..ce0b18654 --- /dev/null +++ b/src/seastar/demos/tcp_sctp_server_demo.cc @@ -0,0 +1,205 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2014 Cloudius Systems + */ + +#include +#include +#include +#include +#include +#include +#include + +using namespace seastar; + +static std::string str_ping{"ping"}; +static std::string str_txtx{"txtx"}; +static std::string str_rxrx{"rxrx"}; +static std::string str_pong{"pong"}; +static std::string str_unknow{"unknow cmd"}; +static int tx_msg_total_size = 100 * 1024 * 1024; +static int tx_msg_size = 4 * 1024; +static int tx_msg_nr = tx_msg_total_size / tx_msg_size; +static int rx_msg_size = 4 * 1024; +static std::string str_txbuf(tx_msg_size, 'X'); +static bool enable_tcp = false; +static bool enable_sctp = false; + +class tcp_server { + std::vector _tcp_listeners; + std::vector _sctp_listeners; +public: + future<> listen(ipv4_addr addr) { + if (enable_tcp) { + listen_options lo; + lo.proto = transport::TCP; + lo.reuse_address = true; + _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); + do_accepts(_tcp_listeners); + } + + if (enable_sctp) { + listen_options lo; + lo.proto = transport::SCTP; + lo.reuse_address = true; + _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); + do_accepts(_sctp_listeners); + } + return make_ready_future<>(); + } + + // FIXME: We should properly tear down the service here. + future<> stop() { + return make_ready_future<>(); + } + + void do_accepts(std::vector& listeners) { + int which = listeners.size() - 1; + // Accept in the background. + (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable { + connected_socket fd = std::move(ar.connection); + socket_address addr = std::move(ar.remote_address); + auto conn = new connection(*this, std::move(fd), addr); + (void)conn->process().then_wrapped([conn] (auto&& f) { + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "request error " << ex.what() << "\n"; + } + }); + do_accepts(listeners); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "accept failed: " << ex.what() << "\n"; + } + }); + } + class connection { + connected_socket _fd; + input_stream _read_buf; + output_stream _write_buf; + public: + connection(tcp_server& server, connected_socket&& fd, socket_address addr) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) {} + future<> process() { + return read(); + } + future<> read() { + if (_read_buf.eof()) { + return make_ready_future(); + } + // Expect 4 bytes cmd from client + size_t n = 4; + return _read_buf.read_exactly(n).then([this] (temporary_buffer buf) { + if (buf.size() == 0) { + return make_ready_future(); + } + auto cmd = std::string(buf.get(), buf.size()); + // pingpong test + if (cmd == str_ping) { + return _write_buf.write(str_pong).then([this] { + return _write_buf.flush(); + }).then([this] { + return this->read(); + }); + // server tx test + } else if (cmd == str_txtx) { + return tx_test(); + // server tx test + } else if (cmd == str_rxrx) { + return rx_test(); + // unknow test + } else { + return _write_buf.write(str_unknow).then([this] { + return _write_buf.flush(); + }).then([] { + return make_ready_future(); + }); + } + }); + } + future<> do_write(int end) { + if (end == 0) { + return make_ready_future<>(); + } + return _write_buf.write(str_txbuf).then([this] { + return _write_buf.flush(); + }).then([this, end] { + return do_write(end - 1); + }); + } + future<> tx_test() { + return do_write(tx_msg_nr).then([this] { + return _write_buf.close(); + }).then([] { + return make_ready_future<>(); + }); + } + future<> do_read() { + return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer buf) { + if (buf.size() == 0) { + return make_ready_future(); + } else { + return do_read(); + } + }); + } + future<> rx_test() { + return do_read().then([] { + return make_ready_future<>(); + }); + } + }; +}; + +namespace bpo = boost::program_options; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value()->default_value(10000), "TCP server port") + ("tcp", bpo::value()->default_value("yes"), "tcp listen") + ("sctp", bpo::value()->default_value("no"), "sctp listen") ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as(); + enable_tcp = config["tcp"].as() == "yes"; + enable_sctp = config["sctp"].as() == "yes"; + if (!enable_tcp && !enable_sctp) { + fprint(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n"); + return engine().exit(1); + } + auto server = new distributed; + (void)server->start().then([server = std::move(server), port] () mutable { + engine().at_exit([server] { + return server->stop(); + }); + // Start listening in the background. + (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port}); + }).then([port] { + std::cout << "Seastar TCP server listening on port " << port << " ...\n"; + }); + }); +} -- cgit v1.2.3