diff options
Diffstat (limited to 'src/seastar/demos')
-rw-r--r-- | src/seastar/demos/CMakeLists.txt | 101 | ||||
-rw-r--r-- | src/seastar/demos/block_discard_demo.cc | 71 | ||||
-rw-r--r-- | src/seastar/demos/echo_demo.cc | 126 | ||||
-rw-r--r-- | src/seastar/demos/ip_demo.cc | 46 | ||||
-rw-r--r-- | src/seastar/demos/l3_demo.cc | 48 | ||||
-rw-r--r-- | src/seastar/demos/line_count_demo.cc | 84 | ||||
-rw-r--r-- | src/seastar/demos/rpc_demo.cc | 299 | ||||
-rw-r--r-- | src/seastar/demos/scheduling_group_demo.cc | 179 | ||||
-rw-r--r-- | src/seastar/demos/tcp_demo.cc | 71 | ||||
-rw-r--r-- | src/seastar/demos/tcp_sctp_client_demo.cc | 277 | ||||
-rw-r--r-- | src/seastar/demos/tcp_sctp_server_demo.cc | 200 | ||||
-rw-r--r-- | src/seastar/demos/tls_echo_server.hh | 118 | ||||
-rw-r--r-- | src/seastar/demos/tls_echo_server_demo.cc | 67 | ||||
-rw-r--r-- | src/seastar/demos/tls_simple_client_demo.cc | 132 | ||||
-rw-r--r-- | src/seastar/demos/udp_client_demo.cc | 87 | ||||
-rw-r--r-- | src/seastar/demos/udp_server_demo.cc | 80 | ||||
-rw-r--r-- | src/seastar/demos/udp_zero_copy_demo.cc | 149 |
17 files changed, 2135 insertions, 0 deletions
diff --git a/src/seastar/demos/CMakeLists.txt b/src/seastar/demos/CMakeLists.txt new file mode 100644 index 00000000..202e2805 --- /dev/null +++ b/src/seastar/demos/CMakeLists.txt @@ -0,0 +1,101 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +# Logical target for all demos. +add_custom_target (demos) + +macro (seastar_add_demo name) + set (args ${ARGN}) + + cmake_parse_arguments ( + parsed_args + "" + "" + "SOURCES" + ${args}) + + set (target demo_${name}) + add_executable (${target} ${parsed_args_SOURCES}) + + target_include_directories (${target} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + + target_link_libraries (${target} + PRIVATE + Boost::program_options + seastar_with_flags) + + set_target_properties (${target} + PROPERTIES + OUTPUT_NAME ${name}) + + add_dependencies (demos ${target}) +endmacro () + +seastar_add_demo (block_discard + SOURCES block_discard_demo.cc) + +seastar_add_demo (echo + SOURCES echo_demo.cc) + +seastar_add_demo (ip + SOURCES ip_demo.cc) + +seastar_add_demo (line_count + SOURCES line_count_demo.cc) + +seastar_add_demo (l3 + SOURCES l3_demo.cc) + +seastar_add_demo (rpc + SOURCES rpc_demo.cc) + +seastar_add_demo (scheduling_group + SOURCES scheduling_group_demo.cc) + +seastar_add_demo (tcp + SOURCES tcp_demo.cc) + +seastar_add_demo (tcp_sctp_client + SOURCES tcp_sctp_client_demo.cc) + +seastar_add_demo (tcp_sctp_server + SOURCES tcp_sctp_server_demo.cc) + +seastar_add_demo (tls_echo_server + SOURCES + tls_echo_server.hh + tls_echo_server_demo.cc) + +seastar_add_demo (tls_simple_client + SOURCES + tls_echo_server.hh + tls_simple_client_demo.cc) + +seastar_add_demo (udp_client + SOURCES udp_client_demo.cc) + +seastar_add_demo (udp_server + SOURCES udp_server_demo.cc) + +seastar_add_demo (udp_zero_copy + SOURCES udp_zero_copy_demo.cc) diff --git a/src/seastar/demos/block_discard_demo.cc b/src/seastar/demos/block_discard_demo.cc new file mode 100644 index 00000000..a1fbcfd9 --- /dev/null +++ b/src/seastar/demos/block_discard_demo.cc @@ -0,0 +1,71 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <algorithm> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/file.hh> +#include <seastar/core/reactor.hh> +#include <iostream> + +using namespace seastar; + +namespace bpo = boost::program_options; + +struct file_test { + file_test(file&& f) : f(std::move(f)) {} + file f; + semaphore sem = { 0 }; +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("dev", bpo::value<std::string>(), "e.g. --dev /dev/sdb") + ; + + return app.run_deprecated(ac, av, [&app] { + static constexpr auto max = 10000; + auto&& config = app.configuration(); + auto filepath = config["dev"].as<std::string>(); + + open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) { + auto ft = new file_test{std::move(f)}; + + ft->f.stat().then([ft] (struct stat st) mutable { + assert(S_ISBLK(st.st_mode)); + auto offset = 0; + auto length = max * 4096; + ft->f.discard(offset, length).then([ft] () mutable { + ft->sem.signal(); + }); + }); + + ft->sem.wait().then([ft] () mutable { + return ft->f.flush(); + }).then([ft] () mutable { + std::cout << "done\n"; + delete ft; + engine().exit(0); + }); + }); + }); +} diff --git a/src/seastar/demos/echo_demo.cc b/src/seastar/demos/echo_demo.cc new file mode 100644 index 00000000..220e463d --- /dev/null +++ b/src/seastar/demos/echo_demo.cc @@ -0,0 +1,126 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + */ + +#include <seastar/net/virtio.hh> +#include <seastar/net/dpdk.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/ip.hh> +#include <iostream> +#include <utility> +#include <algorithm> + +using namespace seastar; +using namespace net; + +void dump_packet(const packet& p) { + std::cout << "rx:"; + auto f = p.frag(0); + for (unsigned i = 0; i < std::min(f.size, size_t(30)); ++i) { + char x[4]; + std::sprintf(x, " %02x", uint8_t(f.base[i])); + std::cout << x; + } + std::cout << "\n"; +} + +future<> echo_packet(net::qp& netif, packet p) { + auto f = p.frag(0); + if (f.size < sizeof(eth_hdr)) { + return make_ready_future<>(); + } + auto pos = 0; + auto eh = reinterpret_cast<eth_hdr*>(f.base + pos); + pos += sizeof(*eh); + *eh = ntoh(*eh); + if (eh->eth_proto != 0x0800) { + return make_ready_future<>(); + } + auto iph = reinterpret_cast<ip_hdr*>(f.base + pos); + *iph = ntoh(*iph); + pos += iph->ihl * 4; + if (iph->ver != 4 || iph->ihl < 5 || iph->ip_proto != 1) { + return make_ready_future<>(); + } + auto ip_len = iph->len; + auto icmph = reinterpret_cast<icmp_hdr*>(f.base + pos); + if (icmph->type != icmp_hdr::msg_type::echo_request) { + return make_ready_future<>(); + } + auto icmp_len = ip_len - iph->ihl * 4; + std::swap(eh->src_mac, eh->dst_mac); + std::swap(iph->src_ip, iph->dst_ip); + icmph->type = icmp_hdr::msg_type::echo_reply; + icmph->csum = 0; + *iph = hton(*iph); + *eh = hton(*eh); + icmph->csum = ip_checksum(icmph, icmp_len); + iph->csum = 0; + iph->csum = ip_checksum(iph, iph->ihl * 4); + return netif.send(std::move(p)); +} + +#ifdef SEASTAR_HAVE_DPDK +void usage() +{ + std::cout<<"Usage: echotest [-virtio|-dpdk]"<<std::endl; + std::cout<<" -virtio - use virtio backend (default)"<<std::endl; + std::cout<<" -dpdk - use dpdk-pmd backend"<<std::endl; +} +#endif + +int main(int ac, char** av) { + std::unique_ptr<net::device> dnet; + net::qp* vnet; + + boost::program_options::variables_map opts; + opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false))); + +#ifdef SEASTAR_HAVE_DPDK + if (ac > 2) { + usage(); + return -1; + } + + if ((ac == 1) || !std::strcmp(av[1], "-virtio")) { + dnet = create_virtio_net_device(opts); + } else if (!std::strcmp(av[1], "-dpdk")) { + dnet = create_dpdk_net_device(); + } else { + usage(); + return -1; + } +#else + dnet = create_virtio_net_device(opts); +#endif // SEASTAR_HAVE_DPDK + + auto qp = dnet->init_local_queue(opts, 0); + vnet = qp.get(); + dnet->set_local_queue(std::move(qp)); + subscription<packet> rx = + dnet->receive([vnet] (packet p) { + return echo_packet(*vnet, std::move(p)); + }); + engine().run(); + return 0; +} + + diff --git a/src/seastar/demos/ip_demo.cc b/src/seastar/demos/ip_demo.cc new file mode 100644 index 00000000..02c88fdc --- /dev/null +++ b/src/seastar/demos/ip_demo.cc @@ -0,0 +1,46 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/arp.hh> +#include <seastar/net/ip.hh> +#include <seastar/net/net.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/virtio.hh> + +using namespace seastar; +using namespace net; + +int main(int ac, char** av) { + boost::program_options::variables_map opts; + opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false))); + + auto vnet = create_virtio_net_device(opts); + vnet->set_local_queue(vnet->init_local_queue(opts, 0)); + + interface netif(std::move(vnet)); + ipv4 inet(&netif); + inet.set_host_address(ipv4_address("192.168.122.2")); + engine().run(); + return 0; +} + + + diff --git a/src/seastar/demos/l3_demo.cc b/src/seastar/demos/l3_demo.cc new file mode 100644 index 00000000..98415817 --- /dev/null +++ b/src/seastar/demos/l3_demo.cc @@ -0,0 +1,48 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/net.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/virtio.hh> +#include <iostream> + +using namespace seastar; +using namespace net; + +void dump_arp_packets(l3_protocol& proto) { + proto.receive([] (packet p, ethernet_address from) { + std::cout << "seen arp packet\n"; + return make_ready_future<>(); + }, [] (forward_hash& out_hash_data, packet& p, size_t off) {return false;}); +} + +int main(int ac, char** av) { + boost::program_options::variables_map opts; + opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false))); + + auto vnet = create_virtio_net_device(opts); + interface netif(std::move(vnet)); + l3_protocol arp(&netif, eth_protocol_num::arp, []{ return compat::optional<l3_protocol::l3packet>(); }); + dump_arp_packets(arp); + engine().run(); + return 0; +} + diff --git a/src/seastar/demos/line_count_demo.cc b/src/seastar/demos/line_count_demo.cc new file mode 100644 index 00000000..8d07553e --- /dev/null +++ b/src/seastar/demos/line_count_demo.cc @@ -0,0 +1,84 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +// Demonstration of file_input_stream. Don't expect stellar performance +// since no read-ahead or caching is done yet. + +#include <seastar/core/fstream.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/reactor.hh> +#include <algorithm> +#include <iostream> + +using namespace seastar; + +struct reader { +public: + reader(file f) + : is(make_file_input_stream(std::move(f), file_input_stream_options{1 << 16, 1})) { + } + + input_stream<char> is; + size_t count = 0; + + // for input_stream::consume(): + using unconsumed_remainder = compat::optional<temporary_buffer<char>>; + future<unconsumed_remainder> operator()(temporary_buffer<char> data) { + if (data.empty()) { + return make_ready_future<unconsumed_remainder>(std::move(data)); + } else { + count += std::count(data.begin(), data.end(), '\n'); + // FIXME: last line without \n? + return make_ready_future<unconsumed_remainder>(); + } + } +}; + +int main(int ac, char** av) { + app_template app; + namespace bpo = boost::program_options; + app.add_positional_options({ + { "file", bpo::value<std::string>(), "File to process", 1 }, + }); + return app.run(ac, av, [&app] { + auto fname = app.configuration()["file"].as<std::string>(); + return open_file_dma(fname, open_flags::ro).then([] (file f) { + auto r = make_shared<reader>(std::move(f)); + return r->is.consume(*r).then([r] { + fmt::print("{:d} lines\n", r->count); + return r->is.close().then([r] {}); + }); + }).then_wrapped([] (future<> f) -> future<int> { + try { + f.get(); + return make_ready_future<int>(0); + } catch (std::exception& ex) { + std::cout << ex.what() << "\n"; + return make_ready_future<int>(1); + } catch (...) { + std::cout << "unknown exception\n"; + return make_ready_future<int>(1); + } + }); + }); +} + diff --git a/src/seastar/demos/rpc_demo.cc b/src/seastar/demos/rpc_demo.cc new file mode 100644 index 00000000..899af1c1 --- /dev/null +++ b/src/seastar/demos/rpc_demo.cc @@ -0,0 +1,299 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/rpc/rpc.hh> +#include <seastar/core/sleep.hh> +#include <seastar/rpc/lz4_compressor.hh> + +using namespace seastar; + +struct serializer { +}; + +template <typename T, typename Output> +inline +void write_arithmetic_type(Output& out, T v) { + static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); + return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); +} + +template <typename T, typename Input> +inline +T read_arithmetic_type(Input& in) { + static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); + T v; + in.read(reinterpret_cast<char*>(&v), sizeof(T)); + return v; +} + +template <typename Output> +inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } +template <typename Input> +inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } +template <typename Input> +inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } +template <typename Input> +inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } +template <typename Input> +inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } +template <typename Input> +inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } + +template <typename Output> +inline void write(serializer, Output& out, const sstring& v) { + write_arithmetic_type(out, uint32_t(v.size())); + out.write(v.c_str(), v.size()); +} + +template <typename Input> +inline sstring read(serializer, Input& in, rpc::type<sstring>) { + auto size = read_arithmetic_type<uint32_t>(in); + sstring ret(sstring::initialized_later(), size); + in.read(ret.begin(), size); + return ret; +} + +namespace bpo = boost::program_options; +using namespace std::chrono_literals; + +class mycomp : public rpc::compressor::factory { + const sstring _name = "LZ4"; +public: + virtual const sstring& supported() const override { + fmt::print("supported called\n"); + return _name; + } + virtual std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { + fmt::print("negotiate called with {}\n", feature); + return feature == _name ? std::make_unique<rpc::lz4_compressor>() : nullptr; + } +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "RPC server port") + ("server", bpo::value<std::string>(), "Server address") + ("compress", bpo::value<bool>()->default_value(false), "Compress RPC traffic"); + std::cout << "start "; + rpc::protocol<serializer> myrpc(serializer{}); + static std::unique_ptr<rpc::protocol<serializer>::server> server; + static std::unique_ptr<rpc::protocol<serializer>::client> client; + static double x = 30.0; + + myrpc.set_logger([] (const sstring& log) { + fmt::print("{}", log); + std::cout << std::endl; + }); + + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + bool compress = config["compress"].as<bool>(); + static mycomp mc; + auto test1 = myrpc.register_handler(1, [x = 0](int i) mutable { fmt::print("test1 count {:d} got {:d}\n", ++x, i); }); + auto test2 = myrpc.register_handler(2, [](int a, int b){ fmt::print("test2 got {:d} {:d}\n", a, b); return make_ready_future<int>(a+b); }); + auto test3 = myrpc.register_handler(3, [](double x){ fmt::print("test3 got {:f}\n", x); return std::make_unique<double>(sin(x)); }); + auto test4 = myrpc.register_handler(4, [](){ fmt::print("test4 throw!\n"); throw std::runtime_error("exception!"); }); + auto test5 = myrpc.register_handler(5, [](){ fmt::print("test5 no wait\n"); return rpc::no_wait; }); + auto test6 = myrpc.register_handler(6, [](const rpc::client_info& info, int x){ fmt::print("test6 client {}, {:d}\n", inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr), x); }); + auto test8 = myrpc.register_handler(8, [](){ fmt::print("test8 sleep for 2 sec\n"); return sleep(2s); }); + auto test13 = myrpc.register_handler(13, [](){ fmt::print("test13 sleep for 1 msec\n"); return sleep(1ms); }); + auto test_message_to_big = myrpc.register_handler(14, [](sstring payload){ fmt::print("test message to bit, should not get here"); }); + + if (config.count("server")) { + std::cout << "client" << std::endl; + auto test7 = myrpc.make_client<long (long a, long b)>(7); + auto test9 = myrpc.make_client<long (long a, long b)>(9); // do not send optional + auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional + auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects + auto test10 = myrpc.make_client<long ()>(10); // receive less then replied + auto test10_1 = myrpc.make_client<future<long, int> ()>(10); // receive all + auto test11 = myrpc.make_client<future<long, rpc::optional<int>> ()>(11); // receive more then replied + auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits + auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb + auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call + rpc::client_options co; + if (compress) { + co.compressor_factory = &mc; + } + + client = std::make_unique<rpc::protocol<serializer>::client>(myrpc, co, ipv4_addr{config["server"].as<std::string>()}); + + auto f = test8(*client, 1500ms).then_wrapped([](future<> f) { + try { + f.get(); + printf("test8 should not get here!\n"); + } catch (rpc::timeout_error&) { + printf("test8 timeout!\n"); + } + }); + for (auto i = 0; i < 100; i++) { + fmt::print("iteration={:d}\n", i); + test1(*client, 5).then([] (){ fmt::print("test1 ended\n");}); + test2(*client, 1, 2).then([] (int r) { fmt::print("test2 got {:d}\n", r); }); + test3(*client, x).then([](double x) { fmt::print("sin={:f}\n", x); }); + test4(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test4 your should not see this!\n"); + } catch (std::runtime_error& x){ + fmt::print("test4 {}\n", x.what()); + } + }); + test5(*client).then([] { fmt::print("test5 no wait ended\n"); }); + test6(*client, 1).then([] { fmt::print("test6 ended\n"); }); + test7(*client, 5, 6).then([] (long r) { fmt::print("test7 got {:d}\n", r); }); + test9(*client, 1, 2).then([] (long r) { fmt::print("test9 got {:d}\n", r); }); + test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); }); + test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); }); + test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); }); + test10_1(*client).then([] (long r, int rr) { fmt::print("test10_1 got {:d} and {:d}\n", r, rr); }); + test11(*client).then([] (long r, rpc::optional<int> rr) { fmt::print("test11 got {:d} and {:d}\n", r, bool(rr)); }); + test_nohandler(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test_nohandler your should not see this!\n"); + } catch (rpc::unknown_verb_error& x){ + fmt::print("test_nohandle no such verb\n"); + } catch (...) { + fmt::print("incorrect exception!\n"); + } + }); + test_nohandler_nowait(*client); + auto c = make_lw_shared<rpc::cancellable>(); + test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + c->cancel(); + test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + sleep(500us).then([c] { c->cancel(); }); + test_message_to_big(*client, sstring(sstring::initialized_later(), 10'000'001)).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test message to big shold not get here\n"); + } catch(std::runtime_error& err) { + fmt::print("test message to big get error {}\n", err.what()); + } catch(...) { + fmt::print("test message to big wrong exception\n"); + } + }); + } + // delay a little for a time-sensitive test + sleep(400ms).then([test12] () mutable { + // server is configured for 10MB max, throw 25MB worth of requests at it. + auto now = rpc::rpc_clock_type::now(); + return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable { + return test12(*client, 100, sstring(sstring::initialized_later(), 1'000'000)).then([idx, now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now); + fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count()); + }); + }).then([now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now); + fmt::print("test12 completed after {:d} ms (should be ~300)\n", delta.count()); + }); + }); + f.finally([] { + sleep(1s).then([] { + client->stop().then([] { + engine().exit(0); + }); + }); + }); + } else { + std::cout << "server on port " << port << std::endl; + myrpc.register_handler(7, [](long a, long b) mutable { + auto p = make_lw_shared<promise<>>(); + auto t = make_lw_shared<timer<>>(); + fmt::print("test7 got {:d} {:d}\n", a, b); + auto f = p->get_future().then([a, b, t] { + fmt::print("test7 calc res\n"); + return a - b; + }); + t->set_callback([p = std::move(p)] () mutable { p->set_value(); }); + t->arm(1s); + return f; + }); + myrpc.register_handler(9, [] (long a, long b, rpc::optional<int> c) { + long r = 2; + fmt::print("test9 got {:d} {:d} ", a, b); + if (c) { + fmt::print("{:d}", c.value()); + r++; + } + fmt::print("\n"); + return r; + }); + myrpc.register_handler(10, [] { + fmt::print("test 10\n"); + return make_ready_future<long, int>(1, 2); + }); + myrpc.register_handler(11, [] { + fmt::print("test 11\n"); + return 1ul; + }); + myrpc.register_handler(12, [] (int sleep_ms, sstring payload) { + return sleep(std::chrono::milliseconds(sleep_ms)).then([] { + return make_ready_future<>(); + }); + }); + + rpc::resource_limits limits; + limits.bloat_factor = 1; + limits.basic_request_size = 0; + limits.max_memory = 10'000'000; + rpc::server_options so; + if (compress) { + so.compressor_factory = &mc; + } + server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, so, ipv4_addr{port}, limits); + } + }); + +} diff --git a/src/seastar/demos/scheduling_group_demo.cc b/src/seastar/demos/scheduling_group_demo.cc new file mode 100644 index 00000000..edbfb0a4 --- /dev/null +++ b/src/seastar/demos/scheduling_group_demo.cc @@ -0,0 +1,179 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2016 Scylla DB Ltd + */ + + +#include <seastar/core/app-template.hh> +#include <seastar/core/future.hh> +#include <seastar/core/scheduling.hh> +#include <seastar/core/thread.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> +#include <seastar/util/defer.hh> +#include <chrono> +#include <cmath> + +using namespace seastar; +using namespace std::chrono_literals; + +template <typename Func, typename Duration> +future<> +compute_intensive_task(Duration duration, unsigned& counter, Func func) { + auto end = std::chrono::steady_clock::now() + duration; + while (std::chrono::steady_clock::now() < end) { + func(); + } + ++counter; + return make_ready_future<>(); +} + +future<> +heavy_task(unsigned& counter) { + return compute_intensive_task(1ms, counter, [] { + static thread_local double x = 1; + x = std::exp(x) / 3; + }); +} + +future<> +light_task(unsigned& counter) { + return compute_intensive_task(100us, counter, [] { + static thread_local double x = 0.1; + x = std::log(x + 1); + }); +} + +future<> +medium_task(unsigned& counter) { + return compute_intensive_task(400us, counter, [] { + static thread_local double x = 0.1; + x = std::cos(x); + }); +} + +using done_func = std::function<bool ()>; + +future<> +run_compute_intensive_tasks(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) { + return seastar::async([task, sg, concurrency, done, &counter] { + while (!done()) { + parallel_for_each(boost::irange(0u, concurrency), [task, sg, &counter] (unsigned i) { + return with_scheduling_group(sg, [task, &counter] { + return task(counter); + }); + }).get(); + } + }); +} + +future<> +run_compute_intensive_tasks_in_threads(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) { + auto attr = seastar::thread_attributes(); + attr.sched_group = sg; + return parallel_for_each(boost::irange(0u, concurrency), [attr, done, &counter, task] (unsigned i) { + return seastar::async(attr, [done, &counter, task] { + while (!done()) { + task(counter).get(); + } + }); + }); +} + +future<> +run_with_duty_cycle(float utilization, std::chrono::steady_clock::duration period, done_func done, std::function<future<> (done_func done)> task) { + return seastar::async([=] { + bool duty_toggle = true; + auto t0 = std::chrono::steady_clock::now(); + condition_variable cv; + timer<> tmr_on([&] { duty_toggle = true; cv.signal(); }); + timer<> tmr_off([&] { duty_toggle = false; }); + tmr_on.arm(t0, period); + tmr_off.arm(t0 + std::chrono::duration_cast<decltype(t0)::duration>(period * utilization), period); + auto combined_done = [&] { + return done() || !duty_toggle; + }; + while (!done()) { + while (!combined_done()) { + task(std::cref(combined_done)).get(); + } + cv.wait([&] { + return done() || duty_toggle; + }).get(); + } + tmr_on.cancel(); + tmr_off.cancel(); + }); +} + +#include <fenv.h> + +template <typename T> +auto var_fn(T& var) { + return [&var] { return var; }; +} + +int main(int ac, char** av) { + app_template app; + return app.run(ac, av, [] { + return seastar::async([] { + auto sg100 = seastar::create_scheduling_group("sg100", 100).get0(); + auto ksg100 = seastar::defer([&] { seastar::destroy_scheduling_group(sg100).get(); }); + auto sg20 = seastar::create_scheduling_group("sg20", 20).get0(); + auto ksg20 = seastar::defer([&] { seastar::destroy_scheduling_group(sg20).get(); }); + auto sg50 = seastar::create_scheduling_group("sg50", 50).get0(); + auto ksg50 = seastar::defer([&] { seastar::destroy_scheduling_group(sg50).get(); }); + + bool done = false; + auto end = timer<>([&done] { + done = true; + }); + + end.arm(10s); + unsigned ctr100 = 0, ctr20 = 0, ctr50 = 0; + fmt::print("running three scheduling groups with 100% duty cycle each:\n"); + when_all( + run_compute_intensive_tasks(sg100, var_fn(done), 5, ctr100, heavy_task), + run_compute_intensive_tasks(sg20, var_fn(done), 3, ctr20, light_task), + run_compute_intensive_tasks_in_threads(sg50, var_fn(done), 2, ctr50, medium_task) + ).get(); + fmt::print("{:10} {:15} {:10} {:12} {:8}\n", "shares", "task_time (us)", "executed", "runtime (ms)", "vruntime"); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 1000, ctr100, ctr100 * 1000 / 1000, ctr100 * 1000 / 1000 / 100.); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 20, 100, ctr20, ctr20 * 100 / 1000, ctr20 * 100 / 1000 / 20.); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 400, ctr50, ctr50 * 400 / 1000, ctr50 * 400 / 1000 / 50.); + fmt::print("\n"); + + fmt::print("running two scheduling groups with 100%/50% duty cycles (period=1s:\n"); + unsigned ctr100_2 = 0, ctr50_2 = 0; + done = false; + end.arm(10s); + when_all( + run_compute_intensive_tasks(sg50, var_fn(done), 5, ctr50_2, heavy_task), + run_with_duty_cycle(0.5, 1s, var_fn(done), [=, &ctr100_2] (done_func done) { + return run_compute_intensive_tasks(sg100, done, 4, ctr100_2, heavy_task); + }) + ).get(); + fmt::print("{:10} {:10} {:15} {:10} {:12} {:8}\n", "shares", "duty", "task_time (us)", "executed", "runtime (ms)", "vruntime"); + fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 50, 1000, ctr100_2, ctr100_2 * 1000 / 1000, ctr100_2 * 1000 / 1000 / 100.); + fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 100, 400, ctr50_2, ctr50_2 * 1000 / 1000, ctr50_2 * 1000 / 1000 / 50.); + + return 0; + }); + }); +} diff --git a/src/seastar/demos/tcp_demo.cc b/src/seastar/demos/tcp_demo.cc new file mode 100644 index 00000000..fd14a33b --- /dev/null +++ b/src/seastar/demos/tcp_demo.cc @@ -0,0 +1,71 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/ip.hh> +#include <seastar/net/virtio.hh> +#include <seastar/net/tcp.hh> + +using namespace seastar; +using namespace net; + +struct tcp_test { + ipv4& inet; + using tcp = net::tcp<ipv4_traits>; + tcp::listener _listener; + struct connection { + tcp::connection tcp_conn; + explicit connection(tcp::connection tc) : tcp_conn(std::move(tc)) {} + void run() { + tcp_conn.wait_for_data().then([this] { + auto p = tcp_conn.read(); + if (!p.len()) { + tcp_conn.close_write(); + return; + } + fmt::print("read {:d} bytes\n", p.len()); + tcp_conn.send(std::move(p)); + run(); + }); + } + }; + tcp_test(ipv4& inet) : inet(inet), _listener(inet.get_tcp().listen(10000)) {} + void run() { + _listener.accept().then([this] (tcp::connection conn) { + (new connection(std::move(conn)))->run(); + run(); + }); + } +}; + +int main(int ac, char** av) { + boost::program_options::variables_map opts; + opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false))); + + auto vnet = create_virtio_net_device(opts); + interface netif(std::move(vnet)); + ipv4 inet(&netif); + inet.set_host_address(ipv4_address("192.168.122.2")); + tcp_test tt(inet); + engine().when_started().then([&tt] { tt.run(); }); + engine().run(); +} + + diff --git a/src/seastar/demos/tcp_sctp_client_demo.cc b/src/seastar/demos/tcp_sctp_client_demo.cc new file mode 100644 index 00000000..a07e55f5 --- /dev/null +++ b/src/seastar/demos/tcp_sctp_client_demo.cc @@ -0,0 +1,277 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <iostream> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/distributed.hh> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +static int rx_msg_size = 4 * 1024; +static int tx_msg_total_size = 100 * 1024 * 1024; +static int tx_msg_size = 4 * 1024; +static int tx_msg_nr = tx_msg_total_size / tx_msg_size; +static std::string str_txbuf(tx_msg_size, 'X'); + +class client; +distributed<client> clients; + +transport protocol = transport::TCP; + +class client { +private: + static constexpr unsigned _pings_per_connection = 10000; + unsigned _total_pings; + unsigned _concurrent_connections; + ipv4_addr _server_addr; + std::string _test; + lowres_clock::time_point _earliest_started; + lowres_clock::time_point _latest_finished; + size_t _processed_bytes; + unsigned _num_reported; +public: + class connection { + connected_socket _fd; + input_stream<char> _read_buf; + output_stream<char> _write_buf; + size_t _bytes_read = 0; + size_t _bytes_write = 0; + public: + connection(connected_socket&& fd) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) {} + + future<> do_read() { + return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { + _bytes_read += buf.size(); + if (buf.size() == 0) { + return make_ready_future(); + } else { + return do_read(); + } + }); + } + + future<> do_write(int end) { + if (end == 0) { + return make_ready_future(); + } + return _write_buf.write(str_txbuf).then([this] { + _bytes_write += tx_msg_size; + return _write_buf.flush(); + }).then([this, end] { + return do_write(end - 1); + }); + } + + future<> ping(int times) { + return _write_buf.write("ping").then([this] { + return _write_buf.flush(); + }).then([this, times] { + return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) { + if (buf.size() != 4) { + fprint(std::cerr, "illegal packet received: %d\n", buf.size()); + return make_ready_future(); + } + auto str = std::string(buf.get(), buf.size()); + if (str != "pong") { + fprint(std::cerr, "illegal packet received: %d\n", buf.size()); + return make_ready_future(); + } + if (times > 0) { + return ping(times - 1); + } else { + return make_ready_future(); + } + }); + }); + } + + future<size_t> rxrx() { + return _write_buf.write("rxrx").then([this] { + return _write_buf.flush(); + }).then([this] { + return do_write(tx_msg_nr).then([this] { + return _write_buf.close(); + }).then([this] { + return make_ready_future<size_t>(_bytes_write); + }); + }); + } + + future<size_t> txtx() { + return _write_buf.write("txtx").then([this] { + return _write_buf.flush(); + }).then([this] { + return do_read().then([this] { + return make_ready_future<size_t>(_bytes_read); + }); + }); + } + }; + + future<> ping_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->ping(_pings_per_connection).then([started] { + auto finished = lowres_clock::now(); + clients.invoke_on(0, &client::ping_report, started, finished); + }); + } + + future<> rxrx_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->rxrx().then([started] (size_t bytes) { + auto finished = lowres_clock::now(); + clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); + }); + } + + future<> txtx_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->txtx().then([started] (size_t bytes) { + auto finished = lowres_clock::now(); + clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); + }); + } + + void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) { + if (_earliest_started > started) + _earliest_started = started; + if (_latest_finished < finished) + _latest_finished = finished; + if (++_num_reported == _concurrent_connections) { + auto elapsed = _latest_finished - _earliest_started; + auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); + auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); + fprint(std::cout, "========== ping ============\n"); + fprint(std::cout, "Server: %s\n", _server_addr); + fprint(std::cout,"Connections: %u\n", _concurrent_connections); + fprint(std::cout, "Total PingPong: %u\n", _total_pings); + fprint(std::cout, "Total Time(Secs): %f\n", secs); + fprint(std::cout, "Requests/Sec: %f\n", + static_cast<double>(_total_pings) / secs); + clients.stop().then([] { + engine().exit(0); + }); + } + } + + void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) { + if (_earliest_started > started) + _earliest_started = started; + if (_latest_finished < finished) + _latest_finished = finished; + _processed_bytes += bytes; + if (++_num_reported == _concurrent_connections) { + auto elapsed = _latest_finished - _earliest_started; + auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); + auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); + fprint(std::cout, "========== %s ============\n", _test); + fprint(std::cout, "Server: %s\n", _server_addr); + fprint(std::cout, "Connections: %u\n", _concurrent_connections); + fprint(std::cout, "Bytes Received(MiB): %u\n", _processed_bytes/1024/1024); + fprint(std::cout, "Total Time(Secs): %f\n", secs); + fprint(std::cout, "Bandwidth(Gbits/Sec): %f\n", + static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs); + clients.stop().then([] { + engine().exit(0); + }); + } + } + + future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) { + _server_addr = server_addr; + _concurrent_connections = ncon * smp::count; + _total_pings = _pings_per_connection * _concurrent_connections; + _test = test; + + for (unsigned i = 0; i < ncon; i++) { + socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}}); + engine().net().connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) { + auto conn = new connection(std::move(fd)); + (this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) { + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + fprint(std::cerr, "request error: %s\n", ex.what()); + } + }); + }); + } + return make_ready_future(); + } + future<> stop() { + return make_ready_future(); + } + + typedef future<> (client::*test_fn)(connection *conn); + static const std::map<std::string, test_fn> tests; +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + app_template app; + app.add_options() + ("server", bpo::value<std::string>()->required(), "Server address") + ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)") + ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu") + ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp") + ; + + return app.run_deprecated(ac, av, [&app] { + auto&& config = app.configuration(); + auto server = config["server"].as<std::string>(); + auto test = config["test"].as<std::string>(); + auto ncon = config["conn"].as<unsigned>(); + auto proto = config["proto"].as<std::string>(); + + if (proto == "tcp") { + protocol = transport::TCP; + } else if (proto == "sctp") { + protocol = transport::SCTP; + } else { + fprint(std::cerr, "Error: --proto=tcp|sctp\n"); + return engine().exit(1); + } + + if (!client::tests.count(test)) { + fprint(std::cerr, "Error: -test=ping | rxrx | txtx\n"); + return engine().exit(1); + } + + clients.start().then([server, test, ncon] () { + clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon); + }); + }); +} + +const std::map<std::string, client::test_fn> client::tests = { + {"ping", &client::ping_test}, + {"rxrx", &client::rxrx_test}, + {"txtx", &client::txtx_test}, +}; + diff --git a/src/seastar/demos/tcp_sctp_server_demo.cc b/src/seastar/demos/tcp_sctp_server_demo.cc new file mode 100644 index 00000000..f8231e17 --- /dev/null +++ b/src/seastar/demos/tcp_sctp_server_demo.cc @@ -0,0 +1,200 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2014 Cloudius Systems + */ + +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/temporary_buffer.hh> +#include <seastar/core/distributed.hh> +#include <vector> +#include <iostream> + +using namespace seastar; + +static std::string str_ping{"ping"}; +static std::string str_txtx{"txtx"}; +static std::string str_rxrx{"rxrx"}; +static std::string str_pong{"pong"}; +static std::string str_unknow{"unknow cmd"}; +static int tx_msg_total_size = 100 * 1024 * 1024; +static int tx_msg_size = 4 * 1024; +static int tx_msg_nr = tx_msg_total_size / tx_msg_size; +static int rx_msg_size = 4 * 1024; +static std::string str_txbuf(tx_msg_size, 'X'); +static bool enable_tcp = false; +static bool enable_sctp = false; + +class tcp_server { + std::vector<server_socket> _tcp_listeners; + std::vector<server_socket> _sctp_listeners; +public: + future<> listen(ipv4_addr addr) { + if (enable_tcp) { + listen_options lo; + lo.proto = transport::TCP; + lo.reuse_address = true; + _tcp_listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); + do_accepts(_tcp_listeners); + } + + if (enable_sctp) { + listen_options lo; + lo.proto = transport::SCTP; + lo.reuse_address = true; + _sctp_listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); + do_accepts(_sctp_listeners); + } + return make_ready_future<>(); + } + + // FIXME: We should properly tear down the service here. + future<> stop() { + return make_ready_future<>(); + } + + void do_accepts(std::vector<server_socket>& listeners) { + int which = listeners.size() - 1; + listeners[which].accept().then([this, &listeners] (connected_socket fd, socket_address addr) mutable { + auto conn = new connection(*this, std::move(fd), addr); + conn->process().then_wrapped([conn] (auto&& f) { + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "request error " << ex.what() << "\n"; + } + }); + do_accepts(listeners); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "accept failed: " << ex.what() << "\n"; + } + }); + } + class connection { + connected_socket _fd; + input_stream<char> _read_buf; + output_stream<char> _write_buf; + public: + connection(tcp_server& server, connected_socket&& fd, socket_address addr) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) {} + future<> process() { + return read(); + } + future<> read() { + if (_read_buf.eof()) { + return make_ready_future(); + } + // Expect 4 bytes cmd from client + size_t n = 4; + return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) { + if (buf.size() == 0) { + return make_ready_future(); + } + auto cmd = std::string(buf.get(), buf.size()); + // pingpong test + if (cmd == str_ping) { + return _write_buf.write(str_pong).then([this] { + return _write_buf.flush(); + }).then([this] { + return this->read(); + }); + // server tx test + } else if (cmd == str_txtx) { + return tx_test(); + // server tx test + } else if (cmd == str_rxrx) { + return rx_test(); + // unknow test + } else { + return _write_buf.write(str_unknow).then([this] { + return _write_buf.flush(); + }).then([] { + return make_ready_future(); + }); + } + }); + } + future<> do_write(int end) { + if (end == 0) { + return make_ready_future<>(); + } + return _write_buf.write(str_txbuf).then([this] { + return _write_buf.flush(); + }).then([this, end] { + return do_write(end - 1); + }); + } + future<> tx_test() { + return do_write(tx_msg_nr).then([this] { + return _write_buf.close(); + }).then([] { + return make_ready_future<>(); + }); + } + future<> do_read() { + return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { + if (buf.size() == 0) { + return make_ready_future(); + } else { + return do_read(); + } + }); + } + future<> rx_test() { + return do_read().then([] { + return make_ready_future<>(); + }); + } + }; +}; + +namespace bpo = boost::program_options; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port") + ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen") + ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + enable_tcp = config["tcp"].as<std::string>() == "yes"; + enable_sctp = config["sctp"].as<std::string>() == "yes"; + if (!enable_tcp && !enable_sctp) { + fprint(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n"); + return engine().exit(1); + } + auto server = new distributed<tcp_server>; + server->start().then([server = std::move(server), port] () mutable { + engine().at_exit([server] { + return server->stop(); + }); + server->invoke_on_all(&tcp_server::listen, ipv4_addr{port}); + }).then([port] { + std::cout << "Seastar TCP server listening on port " << port << " ...\n"; + }); + }); +} diff --git a/src/seastar/demos/tls_echo_server.hh b/src/seastar/demos/tls_echo_server.hh new file mode 100644 index 00000000..e9e09ac6 --- /dev/null +++ b/src/seastar/demos/tls_echo_server.hh @@ -0,0 +1,118 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2017 ScyllaDB + */ +#pragma once + +#include <seastar/core/do_with.hh> +#include <seastar/core/sstring.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/sharded.hh> +#include <seastar/core/gate.hh> +#include <seastar/net/tls.hh> + +using namespace seastar; + +struct streams { + connected_socket s; + input_stream<char> in; + output_stream<char> out; + + streams(connected_socket cs) : s(std::move(cs)), in(s.input()), out(s.output()) + {} +}; + +class echoserver { + server_socket _socket; + shared_ptr<tls::server_credentials> _certs; + seastar::gate _gate; + bool _stopped = false; + bool _verbose = false; +public: + echoserver(bool verbose = false) + : _certs(make_shared<tls::server_credentials>(make_shared<tls::dh_params>())) + , _verbose(verbose) + {} + + future<> listen(socket_address addr, sstring crtfile, sstring keyfile, tls::client_auth ca = tls::client_auth::NONE) { + _certs->set_client_auth(ca); + return _certs->set_x509_key_file(crtfile, keyfile, tls::x509_crt_format::PEM).then([this, addr] { + ::listen_options opts; + opts.reuse_address = true; + + _socket = tls::listen(_certs, addr, opts); + + repeat([this] { + if (_stopped) { + return make_ready_future<stop_iteration>(stop_iteration::yes); + } + return with_gate(_gate, [this] { + return _socket.accept().then([this](::connected_socket s, socket_address a) { + if (_verbose) { + std::cout << "Got connection from "<< a << std::endl; + } + auto strms = make_lw_shared<streams>(std::move(s)); + return repeat([strms, this]() { + return strms->in.read().then([this, strms](temporary_buffer<char> buf) { + if (buf.empty()) { + if (_verbose) { + std::cout << "EOM" << std::endl; + } + return make_ready_future<stop_iteration>(stop_iteration::yes); + } + sstring tmp(buf.begin(), buf.end()); + if (_verbose) { + std::cout << "Read " << tmp.size() << "B" << std::endl; + } + return strms->out.write(tmp).then([strms]() { + return strms->out.flush(); + }).then([] { + return make_ready_future<stop_iteration>(stop_iteration::no); + }); + }); + }).then([strms]{ + return strms->out.close(); + }).handle_exception([this](auto ep) { + }).finally([this, strms]{ + if (_verbose) { + std::cout << "Ending session" << std::endl; + } + return strms->in.close(); + }); + }).handle_exception([this](auto ep) { + if (!_stopped) { + std::cerr << "Error: " << ep << std::endl; + } + }).then([this] { + return make_ready_future<stop_iteration>(_stopped ? stop_iteration::yes : stop_iteration::no); + }); + }); + }); + return make_ready_future(); + }); + } + + future<> stop() { + _stopped = true; + _socket.abort_accept(); + return _gate.close(); + } +}; diff --git a/src/seastar/demos/tls_echo_server_demo.cc b/src/seastar/demos/tls_echo_server_demo.cc new file mode 100644 index 00000000..1ca0efbb --- /dev/null +++ b/src/seastar/demos/tls_echo_server_demo.cc @@ -0,0 +1,67 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/dns.hh> +#include "tls_echo_server.hh" + +using namespace seastar; +namespace bpo = boost::program_options; + + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "Server port") + ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Server address") + ("cert,c", bpo::value<std::string>()->required(), "Server certificate file") + ("key,k", bpo::value<std::string>()->required(), "Certificate key") + ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose") + ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto crt = config["cert"].as<std::string>(); + auto key = config["key"].as<std::string>(); + auto addr = config["address"].as<std::string>(); + auto verbose = config["verbose"].as<bool>(); + + std::cout << "Starting..." << std::endl; + return net::dns::resolve_name(addr).then([=](net::inet_address a) { + ipv4_addr ia(a, port); + + auto server = ::make_shared<seastar::sharded<echoserver>>(); + return server->start(verbose).then([=]() { + return server->invoke_on_all(&echoserver::listen, socket_address(ia), sstring(crt), sstring(key), tls::client_auth::NONE); + }).handle_exception([=](auto e) { + std::cerr << "Error: " << e << std::endl; + engine().exit(1); + }).then([=] { + std::cout << "TLS echo server running at " << addr << ":" << port << std::endl; + engine().at_exit([server] { + return server->stop(); + }); + }); + }); + }); +} diff --git a/src/seastar/demos/tls_simple_client_demo.cc b/src/seastar/demos/tls_simple_client_demo.cc new file mode 100644 index 00000000..ea598e48 --- /dev/null +++ b/src/seastar/demos/tls_simple_client_demo.cc @@ -0,0 +1,132 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> + +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/dns.hh> +#include "tls_echo_server.hh" + +using namespace seastar; +namespace bpo = boost::program_options; + + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "Remote port") + ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Remote address") + ("trust,t", bpo::value<std::string>(), "Trust store") + ("msg,m", bpo::value<std::string>(), "Message to send") + ("bytes,b", bpo::value<size_t>()->default_value(512), "Use random bytes of length as message") + ("iterations,i", bpo::value<size_t>()->default_value(1), "Repeat X times") + ("read-response,r", bpo::value<bool>()->default_value(true)->implicit_value(true), "Read echoed message") + ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose operation") + ("check-name,c", bpo::value<bool>()->default_value(false)->implicit_value(true), "Check server name") + ("server-name,s", bpo::value<std::string>(), "Expected server name") + ; + + + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto addr = config["address"].as<std::string>(); + auto n = config["bytes"].as<size_t>(); + auto i = config["iterations"].as<size_t>(); + auto do_read = config["read-response"].as<bool>(); + auto verbose = config["verbose"].as<bool>(); + auto check = config["check-name"].as<bool>(); + + std::cout << "Starting..." << std::endl; + + auto certs = ::make_shared<tls::certificate_credentials>(); + auto f = make_ready_future(); + + if (config.count("trust")) { + f = certs->set_x509_trust_file(config["trust"].as<std::string>(), tls::x509_crt_format::PEM); + } + + seastar::shared_ptr<sstring> msg; + + if (config.count("msg")) { + msg = seastar::make_shared<sstring>(config["msg"].as<std::string>()); + } else { + msg = seastar::make_shared<sstring>(sstring(sstring::initialized_later(), n)); + for (size_t i = 0; i < n; ++i) { + (*msg)[i] = '0' + char(::rand() % 30); + } + } + + sstring server_name; + if (config.count("server-name")) { + server_name = config["server-name"].as<std::string>(); + } + if (verbose) { + std::cout << "Msg (" << msg->size() << "B):" << std::endl << *msg << std::endl; + } + return f.then([=]() { + return net::dns::get_host_by_name(addr).then([=](net::hostent e) { + ipv4_addr ia(e.addr_list.front(), port); + + sstring name; + if (check) { + name = server_name.empty() ? e.names.front() : server_name; + } + return tls::connect(certs, ia, name).then([=](::connected_socket s) { + auto strms = ::make_lw_shared<streams>(std::move(s)); + auto range = boost::irange(size_t(0), i); + return do_for_each(range, [=](auto) { + auto f = strms->out.write(*msg); + if (!do_read) { + return strms->out.close().then([f = std::move(f)]() mutable { + return std::move(f); + }); + } + return f.then([=]() { + return strms->out.flush().then([=] { + return strms->in.read_exactly(msg->size()).then([=](temporary_buffer<char> buf) { + sstring tmp(buf.begin(), buf.end()); + if (tmp != *msg) { + std::cerr << "Got garbled message!" << std::endl; + if (verbose) { + std::cout << "Got (" << tmp.size() << ") :" << std::endl << tmp << std::endl; + } + throw std::runtime_error("Got garbled message!"); + } + }); + }); + }); + }).then([strms, do_read]{ + return do_read ? strms->out.close() : make_ready_future<>(); + }).finally([strms]{ + return strms->in.close(); + }); + }); + }).handle_exception([](auto ep) { + std::cerr << "Error: " << ep << std::endl; + }); + }).finally([] { + engine().exit(0); + }); + }); +} diff --git a/src/seastar/demos/udp_client_demo.cc b/src/seastar/demos/udp_client_demo.cc new file mode 100644 index 00000000..cfd23fc0 --- /dev/null +++ b/src/seastar/demos/udp_client_demo.cc @@ -0,0 +1,87 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/api.hh> +#include <iostream> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +class client { +private: + udp_channel _chan; + uint64_t n_sent {}; + uint64_t n_received {}; + uint64_t n_failed {}; + timer<> _stats_timer; +public: + void start(ipv4_addr server_addr) { + std::cout << "Sending to " << server_addr << std::endl; + + _chan = engine().net().make_udp_channel(); + + _stats_timer.set_callback([this] { + std::cout << "Out: " << n_sent << " pps, \t"; + std::cout << "Err: " << n_failed << " pps, \t"; + std::cout << "In: " << n_received << " pps" << std::endl; + n_sent = 0; + n_received = 0; + n_failed = 0; + }); + _stats_timer.arm_periodic(1s); + + keep_doing([this, server_addr] { + return _chan.send(server_addr, "hello!\n") + .then_wrapped([this] (auto&& f) { + try { + f.get(); + n_sent++; + } catch (...) { + n_failed++; + } + }); + }); + + keep_doing([this] { + return _chan.receive().then([this] (auto) { + n_received++; + }); + }); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + client _client; + app_template app; + app.add_options() + ("server", bpo::value<std::string>(), "Server address") + ; + return app.run_deprecated(ac, av, [&_client, &app] { + auto&& config = app.configuration(); + _client.start(config["server"].as<std::string>()); + }); +} diff --git a/src/seastar/demos/udp_server_demo.cc b/src/seastar/demos/udp_server_demo.cc new file mode 100644 index 00000000..86dabaaf --- /dev/null +++ b/src/seastar/demos/udp_server_demo.cc @@ -0,0 +1,80 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <iostream> +#include <seastar/core/distributed.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +class udp_server { +private: + udp_channel _chan; + timer<> _stats_timer; + uint64_t _n_sent {}; +public: + void start(uint16_t port) { + ipv4_addr listen_addr{port}; + _chan = engine().net().make_udp_channel(listen_addr); + + _stats_timer.set_callback([this] { + std::cout << "Out: " << _n_sent << " pps" << std::endl; + _n_sent = 0; + }); + _stats_timer.arm_periodic(1s); + + keep_doing([this] { + return _chan.receive().then([this] (udp_datagram dgram) { + return _chan.send(dgram.get_src(), std::move(dgram.get_data())).then([this] { + _n_sent++; + }); + }); + }); + } + // FIXME: we should properly tear down the service here. + future<> stop() { + return make_ready_future<>(); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "UDP server port") ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto server = new distributed<udp_server>; + server->start().then([server = std::move(server), port] () mutable { + engine().at_exit([server] { + return server->stop(); + }); + server->invoke_on_all(&udp_server::start, port); + }).then([port] { + std::cout << "Seastar UDP server listening on port " << port << " ...\n"; + }); + }); +} diff --git a/src/seastar/demos/udp_zero_copy_demo.cc b/src/seastar/demos/udp_zero_copy_demo.cc new file mode 100644 index 00000000..9bfa8fe7 --- /dev/null +++ b/src/seastar/demos/udp_zero_copy_demo.cc @@ -0,0 +1,149 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/core/app-template.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/scattered_message.hh> +#include <seastar/core/vector-data-sink.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/units.hh> +#include <random> +#include <iomanip> +#include <iostream> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; +namespace bpo = boost::program_options; + +template <typename Duration> +typename Duration::rep to_seconds(Duration d) { + return std::chrono::duration_cast<std::chrono::seconds>(d).count(); +} + +class server { +private: + udp_channel _chan; + timer<> _stats_timer; + uint64_t _n_sent {}; + size_t _chunk_size; + bool _copy; + std::vector<packet> _packets; + std::unique_ptr<output_stream<char>> _out; + steady_clock_type::time_point _last; + sstring _key; + size_t _packet_size = 8*KB; + char* _mem; + size_t _mem_size; + std::mt19937 _rnd; + std::random_device _randem_dev; + std::uniform_int_distribution<size_t> _chunk_distribution; +private: + char* next_chunk() { + return _mem + _chunk_distribution(_rnd); + } +public: + server() + : _rnd(std::random_device()()) { + } + future<> send(ipv4_addr dst, packet p) { + return _chan.send(dst, std::move(p)).then([this] { + _n_sent++; + }); + } + void start(int chunk_size, bool copy, size_t mem_size) { + ipv4_addr listen_addr{10000}; + _chan = engine().net().make_udp_channel(listen_addr); + + std::cout << "Listening on " << listen_addr << std::endl; + + _last = steady_clock_type::now(); + _stats_timer.set_callback([this] { + auto now = steady_clock_type::now(); + std::cout << "Out: " + << std::setprecision(2) << std::fixed + << (double)_n_sent / to_seconds(now - _last) + << " pps" << std::endl; + _last = now; + _n_sent = 0; + }); + _stats_timer.arm_periodic(1s); + + _chunk_size = chunk_size; + _copy = copy; + _key = sstring(new char[64], 64); + + _out = std::make_unique<output_stream<char>>( + data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size); + + _mem = new char[mem_size]; + _mem_size = mem_size; + + _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3); + + assert(3 * _chunk_size <= _packet_size); + + keep_doing([this] { + return _chan.receive().then([this] (udp_datagram dgram) { + auto chunk = next_chunk(); + lw_shared_ptr<sstring> item; + if (_copy) { + _packets.clear(); + _out->write(chunk, _chunk_size); + chunk += _chunk_size; + _out->write(chunk, _chunk_size); + chunk += _chunk_size; + _out->write(chunk, _chunk_size); + _out->flush(); + assert(_packets.size() == 1); + return send(dgram.get_src(), std::move(_packets[0])); + } else { + auto chunk = next_chunk(); + scattered_message<char> msg; + msg.reserve(3); + msg.append_static(chunk, _chunk_size); + msg.append_static(chunk, _chunk_size); + msg.append_static(chunk, _chunk_size); + return send(dgram.get_src(), std::move(msg).release()); + } + }); + }); + } +}; + +int main(int ac, char ** av) { + server s; + app_template app; + app.add_options() + ("chunk-size", bpo::value<int>()->default_value(1024), + "Chunk size") + ("mem-size", bpo::value<int>()->default_value(512), + "Memory pool size in MiB") + ("copy", "Copy data rather than send via zero-copy") + ; + return app.run_deprecated(ac, av, [&app, &s] { + auto&& config = app.configuration(); + auto chunk_size = config["chunk-size"].as<int>(); + auto mem_size = (size_t)config["mem-size"].as<int>() * MB; + auto copy = config.count("copy"); + s.start(chunk_size, copy, mem_size); + }); +} |