summaryrefslogtreecommitdiffstats
path: root/src/seastar/demos
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/seastar/demos/CMakeLists.txt101
-rw-r--r--src/seastar/demos/block_discard_demo.cc71
-rw-r--r--src/seastar/demos/echo_demo.cc126
-rw-r--r--src/seastar/demos/ip_demo.cc46
-rw-r--r--src/seastar/demos/l3_demo.cc48
-rw-r--r--src/seastar/demos/line_count_demo.cc84
-rw-r--r--src/seastar/demos/rpc_demo.cc299
-rw-r--r--src/seastar/demos/scheduling_group_demo.cc179
-rw-r--r--src/seastar/demos/tcp_demo.cc71
-rw-r--r--src/seastar/demos/tcp_sctp_client_demo.cc277
-rw-r--r--src/seastar/demos/tcp_sctp_server_demo.cc200
-rw-r--r--src/seastar/demos/tls_echo_server.hh118
-rw-r--r--src/seastar/demos/tls_echo_server_demo.cc67
-rw-r--r--src/seastar/demos/tls_simple_client_demo.cc132
-rw-r--r--src/seastar/demos/udp_client_demo.cc87
-rw-r--r--src/seastar/demos/udp_server_demo.cc80
-rw-r--r--src/seastar/demos/udp_zero_copy_demo.cc149
17 files changed, 2135 insertions, 0 deletions
diff --git a/src/seastar/demos/CMakeLists.txt b/src/seastar/demos/CMakeLists.txt
new file mode 100644
index 00000000..202e2805
--- /dev/null
+++ b/src/seastar/demos/CMakeLists.txt
@@ -0,0 +1,101 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+# Logical target for all demos.
+add_custom_target (demos)
+
+macro (seastar_add_demo name)
+ set (args ${ARGN})
+
+ cmake_parse_arguments (
+ parsed_args
+ ""
+ ""
+ "SOURCES"
+ ${args})
+
+ set (target demo_${name})
+ add_executable (${target} ${parsed_args_SOURCES})
+
+ target_include_directories (${target}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
+
+ target_link_libraries (${target}
+ PRIVATE
+ Boost::program_options
+ seastar_with_flags)
+
+ set_target_properties (${target}
+ PROPERTIES
+ OUTPUT_NAME ${name})
+
+ add_dependencies (demos ${target})
+endmacro ()
+
+seastar_add_demo (block_discard
+ SOURCES block_discard_demo.cc)
+
+seastar_add_demo (echo
+ SOURCES echo_demo.cc)
+
+seastar_add_demo (ip
+ SOURCES ip_demo.cc)
+
+seastar_add_demo (line_count
+ SOURCES line_count_demo.cc)
+
+seastar_add_demo (l3
+ SOURCES l3_demo.cc)
+
+seastar_add_demo (rpc
+ SOURCES rpc_demo.cc)
+
+seastar_add_demo (scheduling_group
+ SOURCES scheduling_group_demo.cc)
+
+seastar_add_demo (tcp
+ SOURCES tcp_demo.cc)
+
+seastar_add_demo (tcp_sctp_client
+ SOURCES tcp_sctp_client_demo.cc)
+
+seastar_add_demo (tcp_sctp_server
+ SOURCES tcp_sctp_server_demo.cc)
+
+seastar_add_demo (tls_echo_server
+ SOURCES
+ tls_echo_server.hh
+ tls_echo_server_demo.cc)
+
+seastar_add_demo (tls_simple_client
+ SOURCES
+ tls_echo_server.hh
+ tls_simple_client_demo.cc)
+
+seastar_add_demo (udp_client
+ SOURCES udp_client_demo.cc)
+
+seastar_add_demo (udp_server
+ SOURCES udp_server_demo.cc)
+
+seastar_add_demo (udp_zero_copy
+ SOURCES udp_zero_copy_demo.cc)
diff --git a/src/seastar/demos/block_discard_demo.cc b/src/seastar/demos/block_discard_demo.cc
new file mode 100644
index 00000000..a1fbcfd9
--- /dev/null
+++ b/src/seastar/demos/block_discard_demo.cc
@@ -0,0 +1,71 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <algorithm>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/reactor.hh>
+#include <iostream>
+
+using namespace seastar;
+
+namespace bpo = boost::program_options;
+
+struct file_test {
+ file_test(file&& f) : f(std::move(f)) {}
+ file f;
+ semaphore sem = { 0 };
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("dev", bpo::value<std::string>(), "e.g. --dev /dev/sdb")
+ ;
+
+ return app.run_deprecated(ac, av, [&app] {
+ static constexpr auto max = 10000;
+ auto&& config = app.configuration();
+ auto filepath = config["dev"].as<std::string>();
+
+ open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) {
+ auto ft = new file_test{std::move(f)};
+
+ ft->f.stat().then([ft] (struct stat st) mutable {
+ assert(S_ISBLK(st.st_mode));
+ auto offset = 0;
+ auto length = max * 4096;
+ ft->f.discard(offset, length).then([ft] () mutable {
+ ft->sem.signal();
+ });
+ });
+
+ ft->sem.wait().then([ft] () mutable {
+ return ft->f.flush();
+ }).then([ft] () mutable {
+ std::cout << "done\n";
+ delete ft;
+ engine().exit(0);
+ });
+ });
+ });
+}
diff --git a/src/seastar/demos/echo_demo.cc b/src/seastar/demos/echo_demo.cc
new file mode 100644
index 00000000..220e463d
--- /dev/null
+++ b/src/seastar/demos/echo_demo.cc
@@ -0,0 +1,126 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ *
+ */
+
+#include <seastar/net/virtio.hh>
+#include <seastar/net/dpdk.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/ip.hh>
+#include <iostream>
+#include <utility>
+#include <algorithm>
+
+using namespace seastar;
+using namespace net;
+
+void dump_packet(const packet& p) {
+ std::cout << "rx:";
+ auto f = p.frag(0);
+ for (unsigned i = 0; i < std::min(f.size, size_t(30)); ++i) {
+ char x[4];
+ std::sprintf(x, " %02x", uint8_t(f.base[i]));
+ std::cout << x;
+ }
+ std::cout << "\n";
+}
+
+future<> echo_packet(net::qp& netif, packet p) {
+ auto f = p.frag(0);
+ if (f.size < sizeof(eth_hdr)) {
+ return make_ready_future<>();
+ }
+ auto pos = 0;
+ auto eh = reinterpret_cast<eth_hdr*>(f.base + pos);
+ pos += sizeof(*eh);
+ *eh = ntoh(*eh);
+ if (eh->eth_proto != 0x0800) {
+ return make_ready_future<>();
+ }
+ auto iph = reinterpret_cast<ip_hdr*>(f.base + pos);
+ *iph = ntoh(*iph);
+ pos += iph->ihl * 4;
+ if (iph->ver != 4 || iph->ihl < 5 || iph->ip_proto != 1) {
+ return make_ready_future<>();
+ }
+ auto ip_len = iph->len;
+ auto icmph = reinterpret_cast<icmp_hdr*>(f.base + pos);
+ if (icmph->type != icmp_hdr::msg_type::echo_request) {
+ return make_ready_future<>();
+ }
+ auto icmp_len = ip_len - iph->ihl * 4;
+ std::swap(eh->src_mac, eh->dst_mac);
+ std::swap(iph->src_ip, iph->dst_ip);
+ icmph->type = icmp_hdr::msg_type::echo_reply;
+ icmph->csum = 0;
+ *iph = hton(*iph);
+ *eh = hton(*eh);
+ icmph->csum = ip_checksum(icmph, icmp_len);
+ iph->csum = 0;
+ iph->csum = ip_checksum(iph, iph->ihl * 4);
+ return netif.send(std::move(p));
+}
+
+#ifdef SEASTAR_HAVE_DPDK
+void usage()
+{
+ std::cout<<"Usage: echotest [-virtio|-dpdk]"<<std::endl;
+ std::cout<<" -virtio - use virtio backend (default)"<<std::endl;
+ std::cout<<" -dpdk - use dpdk-pmd backend"<<std::endl;
+}
+#endif
+
+int main(int ac, char** av) {
+ std::unique_ptr<net::device> dnet;
+ net::qp* vnet;
+
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+#ifdef SEASTAR_HAVE_DPDK
+ if (ac > 2) {
+ usage();
+ return -1;
+ }
+
+ if ((ac == 1) || !std::strcmp(av[1], "-virtio")) {
+ dnet = create_virtio_net_device(opts);
+ } else if (!std::strcmp(av[1], "-dpdk")) {
+ dnet = create_dpdk_net_device();
+ } else {
+ usage();
+ return -1;
+ }
+#else
+ dnet = create_virtio_net_device(opts);
+#endif // SEASTAR_HAVE_DPDK
+
+ auto qp = dnet->init_local_queue(opts, 0);
+ vnet = qp.get();
+ dnet->set_local_queue(std::move(qp));
+ subscription<packet> rx =
+ dnet->receive([vnet] (packet p) {
+ return echo_packet(*vnet, std::move(p));
+ });
+ engine().run();
+ return 0;
+}
+
+
diff --git a/src/seastar/demos/ip_demo.cc b/src/seastar/demos/ip_demo.cc
new file mode 100644
index 00000000..02c88fdc
--- /dev/null
+++ b/src/seastar/demos/ip_demo.cc
@@ -0,0 +1,46 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/arp.hh>
+#include <seastar/net/ip.hh>
+#include <seastar/net/net.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/virtio.hh>
+
+using namespace seastar;
+using namespace net;
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ vnet->set_local_queue(vnet->init_local_queue(opts, 0));
+
+ interface netif(std::move(vnet));
+ ipv4 inet(&netif);
+ inet.set_host_address(ipv4_address("192.168.122.2"));
+ engine().run();
+ return 0;
+}
+
+
+
diff --git a/src/seastar/demos/l3_demo.cc b/src/seastar/demos/l3_demo.cc
new file mode 100644
index 00000000..98415817
--- /dev/null
+++ b/src/seastar/demos/l3_demo.cc
@@ -0,0 +1,48 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/net.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/virtio.hh>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+
+void dump_arp_packets(l3_protocol& proto) {
+ proto.receive([] (packet p, ethernet_address from) {
+ std::cout << "seen arp packet\n";
+ return make_ready_future<>();
+ }, [] (forward_hash& out_hash_data, packet& p, size_t off) {return false;});
+}
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ interface netif(std::move(vnet));
+ l3_protocol arp(&netif, eth_protocol_num::arp, []{ return compat::optional<l3_protocol::l3packet>(); });
+ dump_arp_packets(arp);
+ engine().run();
+ return 0;
+}
+
diff --git a/src/seastar/demos/line_count_demo.cc b/src/seastar/demos/line_count_demo.cc
new file mode 100644
index 00000000..8d07553e
--- /dev/null
+++ b/src/seastar/demos/line_count_demo.cc
@@ -0,0 +1,84 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+// Demonstration of file_input_stream. Don't expect stellar performance
+// since no read-ahead or caching is done yet.
+
+#include <seastar/core/fstream.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/reactor.hh>
+#include <algorithm>
+#include <iostream>
+
+using namespace seastar;
+
+struct reader {
+public:
+ reader(file f)
+ : is(make_file_input_stream(std::move(f), file_input_stream_options{1 << 16, 1})) {
+ }
+
+ input_stream<char> is;
+ size_t count = 0;
+
+ // for input_stream::consume():
+ using unconsumed_remainder = compat::optional<temporary_buffer<char>>;
+ future<unconsumed_remainder> operator()(temporary_buffer<char> data) {
+ if (data.empty()) {
+ return make_ready_future<unconsumed_remainder>(std::move(data));
+ } else {
+ count += std::count(data.begin(), data.end(), '\n');
+ // FIXME: last line without \n?
+ return make_ready_future<unconsumed_remainder>();
+ }
+ }
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ namespace bpo = boost::program_options;
+ app.add_positional_options({
+ { "file", bpo::value<std::string>(), "File to process", 1 },
+ });
+ return app.run(ac, av, [&app] {
+ auto fname = app.configuration()["file"].as<std::string>();
+ return open_file_dma(fname, open_flags::ro).then([] (file f) {
+ auto r = make_shared<reader>(std::move(f));
+ return r->is.consume(*r).then([r] {
+ fmt::print("{:d} lines\n", r->count);
+ return r->is.close().then([r] {});
+ });
+ }).then_wrapped([] (future<> f) -> future<int> {
+ try {
+ f.get();
+ return make_ready_future<int>(0);
+ } catch (std::exception& ex) {
+ std::cout << ex.what() << "\n";
+ return make_ready_future<int>(1);
+ } catch (...) {
+ std::cout << "unknown exception\n";
+ return make_ready_future<int>(1);
+ }
+ });
+ });
+}
+
diff --git a/src/seastar/demos/rpc_demo.cc b/src/seastar/demos/rpc_demo.cc
new file mode 100644
index 00000000..899af1c1
--- /dev/null
+++ b/src/seastar/demos/rpc_demo.cc
@@ -0,0 +1,299 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/rpc/rpc.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/rpc/lz4_compressor.hh>
+
+using namespace seastar;
+
+struct serializer {
+};
+
+template <typename T, typename Output>
+inline
+void write_arithmetic_type(Output& out, T v) {
+ static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
+ return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
+}
+
+template <typename T, typename Input>
+inline
+T read_arithmetic_type(Input& in) {
+ static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
+ T v;
+ in.read(reinterpret_cast<char*>(&v), sizeof(T));
+ return v;
+}
+
+template <typename Output>
+inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
+template <typename Input>
+inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
+template <typename Input>
+inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
+template <typename Input>
+inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
+template <typename Input>
+inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
+template <typename Input>
+inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
+
+template <typename Output>
+inline void write(serializer, Output& out, const sstring& v) {
+ write_arithmetic_type(out, uint32_t(v.size()));
+ out.write(v.c_str(), v.size());
+}
+
+template <typename Input>
+inline sstring read(serializer, Input& in, rpc::type<sstring>) {
+ auto size = read_arithmetic_type<uint32_t>(in);
+ sstring ret(sstring::initialized_later(), size);
+ in.read(ret.begin(), size);
+ return ret;
+}
+
+namespace bpo = boost::program_options;
+using namespace std::chrono_literals;
+
+class mycomp : public rpc::compressor::factory {
+ const sstring _name = "LZ4";
+public:
+ virtual const sstring& supported() const override {
+ fmt::print("supported called\n");
+ return _name;
+ }
+ virtual std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
+ fmt::print("negotiate called with {}\n", feature);
+ return feature == _name ? std::make_unique<rpc::lz4_compressor>() : nullptr;
+ }
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "RPC server port")
+ ("server", bpo::value<std::string>(), "Server address")
+ ("compress", bpo::value<bool>()->default_value(false), "Compress RPC traffic");
+ std::cout << "start ";
+ rpc::protocol<serializer> myrpc(serializer{});
+ static std::unique_ptr<rpc::protocol<serializer>::server> server;
+ static std::unique_ptr<rpc::protocol<serializer>::client> client;
+ static double x = 30.0;
+
+ myrpc.set_logger([] (const sstring& log) {
+ fmt::print("{}", log);
+ std::cout << std::endl;
+ });
+
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ bool compress = config["compress"].as<bool>();
+ static mycomp mc;
+ auto test1 = myrpc.register_handler(1, [x = 0](int i) mutable { fmt::print("test1 count {:d} got {:d}\n", ++x, i); });
+ auto test2 = myrpc.register_handler(2, [](int a, int b){ fmt::print("test2 got {:d} {:d}\n", a, b); return make_ready_future<int>(a+b); });
+ auto test3 = myrpc.register_handler(3, [](double x){ fmt::print("test3 got {:f}\n", x); return std::make_unique<double>(sin(x)); });
+ auto test4 = myrpc.register_handler(4, [](){ fmt::print("test4 throw!\n"); throw std::runtime_error("exception!"); });
+ auto test5 = myrpc.register_handler(5, [](){ fmt::print("test5 no wait\n"); return rpc::no_wait; });
+ auto test6 = myrpc.register_handler(6, [](const rpc::client_info& info, int x){ fmt::print("test6 client {}, {:d}\n", inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr), x); });
+ auto test8 = myrpc.register_handler(8, [](){ fmt::print("test8 sleep for 2 sec\n"); return sleep(2s); });
+ auto test13 = myrpc.register_handler(13, [](){ fmt::print("test13 sleep for 1 msec\n"); return sleep(1ms); });
+ auto test_message_to_big = myrpc.register_handler(14, [](sstring payload){ fmt::print("test message to bit, should not get here"); });
+
+ if (config.count("server")) {
+ std::cout << "client" << std::endl;
+ auto test7 = myrpc.make_client<long (long a, long b)>(7);
+ auto test9 = myrpc.make_client<long (long a, long b)>(9); // do not send optional
+ auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional
+ auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects
+ auto test10 = myrpc.make_client<long ()>(10); // receive less then replied
+ auto test10_1 = myrpc.make_client<future<long, int> ()>(10); // receive all
+ auto test11 = myrpc.make_client<future<long, rpc::optional<int>> ()>(11); // receive more then replied
+ auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits
+ auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb
+ auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call
+ rpc::client_options co;
+ if (compress) {
+ co.compressor_factory = &mc;
+ }
+
+ client = std::make_unique<rpc::protocol<serializer>::client>(myrpc, co, ipv4_addr{config["server"].as<std::string>()});
+
+ auto f = test8(*client, 1500ms).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ printf("test8 should not get here!\n");
+ } catch (rpc::timeout_error&) {
+ printf("test8 timeout!\n");
+ }
+ });
+ for (auto i = 0; i < 100; i++) {
+ fmt::print("iteration={:d}\n", i);
+ test1(*client, 5).then([] (){ fmt::print("test1 ended\n");});
+ test2(*client, 1, 2).then([] (int r) { fmt::print("test2 got {:d}\n", r); });
+ test3(*client, x).then([](double x) { fmt::print("sin={:f}\n", x); });
+ test4(*client).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test4 your should not see this!\n");
+ } catch (std::runtime_error& x){
+ fmt::print("test4 {}\n", x.what());
+ }
+ });
+ test5(*client).then([] { fmt::print("test5 no wait ended\n"); });
+ test6(*client, 1).then([] { fmt::print("test6 ended\n"); });
+ test7(*client, 5, 6).then([] (long r) { fmt::print("test7 got {:d}\n", r); });
+ test9(*client, 1, 2).then([] (long r) { fmt::print("test9 got {:d}\n", r); });
+ test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); });
+ test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); });
+ test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); });
+ test10_1(*client).then([] (long r, int rr) { fmt::print("test10_1 got {:d} and {:d}\n", r, rr); });
+ test11(*client).then([] (long r, rpc::optional<int> rr) { fmt::print("test11 got {:d} and {:d}\n", r, bool(rr)); });
+ test_nohandler(*client).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test_nohandler your should not see this!\n");
+ } catch (rpc::unknown_verb_error& x){
+ fmt::print("test_nohandle no such verb\n");
+ } catch (...) {
+ fmt::print("incorrect exception!\n");
+ }
+ });
+ test_nohandler_nowait(*client);
+ auto c = make_lw_shared<rpc::cancellable>();
+ test13(*client, *c).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test13 shold not get here\n");
+ } catch(rpc::canceled_error&) {
+ fmt::print("test13 canceled\n");
+ } catch(...) {
+ fmt::print("test13 wrong exception\n");
+ }
+ });
+ c->cancel();
+ test13(*client, *c).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test13 shold not get here\n");
+ } catch(rpc::canceled_error&) {
+ fmt::print("test13 canceled\n");
+ } catch(...) {
+ fmt::print("test13 wrong exception\n");
+ }
+ });
+ sleep(500us).then([c] { c->cancel(); });
+ test_message_to_big(*client, sstring(sstring::initialized_later(), 10'000'001)).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test message to big shold not get here\n");
+ } catch(std::runtime_error& err) {
+ fmt::print("test message to big get error {}\n", err.what());
+ } catch(...) {
+ fmt::print("test message to big wrong exception\n");
+ }
+ });
+ }
+ // delay a little for a time-sensitive test
+ sleep(400ms).then([test12] () mutable {
+ // server is configured for 10MB max, throw 25MB worth of requests at it.
+ auto now = rpc::rpc_clock_type::now();
+ return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable {
+ return test12(*client, 100, sstring(sstring::initialized_later(), 1'000'000)).then([idx, now] {
+ auto later = rpc::rpc_clock_type::now();
+ auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
+ fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count());
+ });
+ }).then([now] {
+ auto later = rpc::rpc_clock_type::now();
+ auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
+ fmt::print("test12 completed after {:d} ms (should be ~300)\n", delta.count());
+ });
+ });
+ f.finally([] {
+ sleep(1s).then([] {
+ client->stop().then([] {
+ engine().exit(0);
+ });
+ });
+ });
+ } else {
+ std::cout << "server on port " << port << std::endl;
+ myrpc.register_handler(7, [](long a, long b) mutable {
+ auto p = make_lw_shared<promise<>>();
+ auto t = make_lw_shared<timer<>>();
+ fmt::print("test7 got {:d} {:d}\n", a, b);
+ auto f = p->get_future().then([a, b, t] {
+ fmt::print("test7 calc res\n");
+ return a - b;
+ });
+ t->set_callback([p = std::move(p)] () mutable { p->set_value(); });
+ t->arm(1s);
+ return f;
+ });
+ myrpc.register_handler(9, [] (long a, long b, rpc::optional<int> c) {
+ long r = 2;
+ fmt::print("test9 got {:d} {:d} ", a, b);
+ if (c) {
+ fmt::print("{:d}", c.value());
+ r++;
+ }
+ fmt::print("\n");
+ return r;
+ });
+ myrpc.register_handler(10, [] {
+ fmt::print("test 10\n");
+ return make_ready_future<long, int>(1, 2);
+ });
+ myrpc.register_handler(11, [] {
+ fmt::print("test 11\n");
+ return 1ul;
+ });
+ myrpc.register_handler(12, [] (int sleep_ms, sstring payload) {
+ return sleep(std::chrono::milliseconds(sleep_ms)).then([] {
+ return make_ready_future<>();
+ });
+ });
+
+ rpc::resource_limits limits;
+ limits.bloat_factor = 1;
+ limits.basic_request_size = 0;
+ limits.max_memory = 10'000'000;
+ rpc::server_options so;
+ if (compress) {
+ so.compressor_factory = &mc;
+ }
+ server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, so, ipv4_addr{port}, limits);
+ }
+ });
+
+}
diff --git a/src/seastar/demos/scheduling_group_demo.cc b/src/seastar/demos/scheduling_group_demo.cc
new file mode 100644
index 00000000..edbfb0a4
--- /dev/null
+++ b/src/seastar/demos/scheduling_group_demo.cc
@@ -0,0 +1,179 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2016 Scylla DB Ltd
+ */
+
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/scheduling.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/util/defer.hh>
+#include <chrono>
+#include <cmath>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+
+template <typename Func, typename Duration>
+future<>
+compute_intensive_task(Duration duration, unsigned& counter, Func func) {
+ auto end = std::chrono::steady_clock::now() + duration;
+ while (std::chrono::steady_clock::now() < end) {
+ func();
+ }
+ ++counter;
+ return make_ready_future<>();
+}
+
+future<>
+heavy_task(unsigned& counter) {
+ return compute_intensive_task(1ms, counter, [] {
+ static thread_local double x = 1;
+ x = std::exp(x) / 3;
+ });
+}
+
+future<>
+light_task(unsigned& counter) {
+ return compute_intensive_task(100us, counter, [] {
+ static thread_local double x = 0.1;
+ x = std::log(x + 1);
+ });
+}
+
+future<>
+medium_task(unsigned& counter) {
+ return compute_intensive_task(400us, counter, [] {
+ static thread_local double x = 0.1;
+ x = std::cos(x);
+ });
+}
+
+using done_func = std::function<bool ()>;
+
+future<>
+run_compute_intensive_tasks(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) {
+ return seastar::async([task, sg, concurrency, done, &counter] {
+ while (!done()) {
+ parallel_for_each(boost::irange(0u, concurrency), [task, sg, &counter] (unsigned i) {
+ return with_scheduling_group(sg, [task, &counter] {
+ return task(counter);
+ });
+ }).get();
+ }
+ });
+}
+
+future<>
+run_compute_intensive_tasks_in_threads(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) {
+ auto attr = seastar::thread_attributes();
+ attr.sched_group = sg;
+ return parallel_for_each(boost::irange(0u, concurrency), [attr, done, &counter, task] (unsigned i) {
+ return seastar::async(attr, [done, &counter, task] {
+ while (!done()) {
+ task(counter).get();
+ }
+ });
+ });
+}
+
+future<>
+run_with_duty_cycle(float utilization, std::chrono::steady_clock::duration period, done_func done, std::function<future<> (done_func done)> task) {
+ return seastar::async([=] {
+ bool duty_toggle = true;
+ auto t0 = std::chrono::steady_clock::now();
+ condition_variable cv;
+ timer<> tmr_on([&] { duty_toggle = true; cv.signal(); });
+ timer<> tmr_off([&] { duty_toggle = false; });
+ tmr_on.arm(t0, period);
+ tmr_off.arm(t0 + std::chrono::duration_cast<decltype(t0)::duration>(period * utilization), period);
+ auto combined_done = [&] {
+ return done() || !duty_toggle;
+ };
+ while (!done()) {
+ while (!combined_done()) {
+ task(std::cref(combined_done)).get();
+ }
+ cv.wait([&] {
+ return done() || duty_toggle;
+ }).get();
+ }
+ tmr_on.cancel();
+ tmr_off.cancel();
+ });
+}
+
+#include <fenv.h>
+
+template <typename T>
+auto var_fn(T& var) {
+ return [&var] { return var; };
+}
+
+int main(int ac, char** av) {
+ app_template app;
+ return app.run(ac, av, [] {
+ return seastar::async([] {
+ auto sg100 = seastar::create_scheduling_group("sg100", 100).get0();
+ auto ksg100 = seastar::defer([&] { seastar::destroy_scheduling_group(sg100).get(); });
+ auto sg20 = seastar::create_scheduling_group("sg20", 20).get0();
+ auto ksg20 = seastar::defer([&] { seastar::destroy_scheduling_group(sg20).get(); });
+ auto sg50 = seastar::create_scheduling_group("sg50", 50).get0();
+ auto ksg50 = seastar::defer([&] { seastar::destroy_scheduling_group(sg50).get(); });
+
+ bool done = false;
+ auto end = timer<>([&done] {
+ done = true;
+ });
+
+ end.arm(10s);
+ unsigned ctr100 = 0, ctr20 = 0, ctr50 = 0;
+ fmt::print("running three scheduling groups with 100% duty cycle each:\n");
+ when_all(
+ run_compute_intensive_tasks(sg100, var_fn(done), 5, ctr100, heavy_task),
+ run_compute_intensive_tasks(sg20, var_fn(done), 3, ctr20, light_task),
+ run_compute_intensive_tasks_in_threads(sg50, var_fn(done), 2, ctr50, medium_task)
+ ).get();
+ fmt::print("{:10} {:15} {:10} {:12} {:8}\n", "shares", "task_time (us)", "executed", "runtime (ms)", "vruntime");
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 1000, ctr100, ctr100 * 1000 / 1000, ctr100 * 1000 / 1000 / 100.);
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 20, 100, ctr20, ctr20 * 100 / 1000, ctr20 * 100 / 1000 / 20.);
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 400, ctr50, ctr50 * 400 / 1000, ctr50 * 400 / 1000 / 50.);
+ fmt::print("\n");
+
+ fmt::print("running two scheduling groups with 100%/50% duty cycles (period=1s:\n");
+ unsigned ctr100_2 = 0, ctr50_2 = 0;
+ done = false;
+ end.arm(10s);
+ when_all(
+ run_compute_intensive_tasks(sg50, var_fn(done), 5, ctr50_2, heavy_task),
+ run_with_duty_cycle(0.5, 1s, var_fn(done), [=, &ctr100_2] (done_func done) {
+ return run_compute_intensive_tasks(sg100, done, 4, ctr100_2, heavy_task);
+ })
+ ).get();
+ fmt::print("{:10} {:10} {:15} {:10} {:12} {:8}\n", "shares", "duty", "task_time (us)", "executed", "runtime (ms)", "vruntime");
+ fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 50, 1000, ctr100_2, ctr100_2 * 1000 / 1000, ctr100_2 * 1000 / 1000 / 100.);
+ fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 100, 400, ctr50_2, ctr50_2 * 1000 / 1000, ctr50_2 * 1000 / 1000 / 50.);
+
+ return 0;
+ });
+ });
+}
diff --git a/src/seastar/demos/tcp_demo.cc b/src/seastar/demos/tcp_demo.cc
new file mode 100644
index 00000000..fd14a33b
--- /dev/null
+++ b/src/seastar/demos/tcp_demo.cc
@@ -0,0 +1,71 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/ip.hh>
+#include <seastar/net/virtio.hh>
+#include <seastar/net/tcp.hh>
+
+using namespace seastar;
+using namespace net;
+
+struct tcp_test {
+ ipv4& inet;
+ using tcp = net::tcp<ipv4_traits>;
+ tcp::listener _listener;
+ struct connection {
+ tcp::connection tcp_conn;
+ explicit connection(tcp::connection tc) : tcp_conn(std::move(tc)) {}
+ void run() {
+ tcp_conn.wait_for_data().then([this] {
+ auto p = tcp_conn.read();
+ if (!p.len()) {
+ tcp_conn.close_write();
+ return;
+ }
+ fmt::print("read {:d} bytes\n", p.len());
+ tcp_conn.send(std::move(p));
+ run();
+ });
+ }
+ };
+ tcp_test(ipv4& inet) : inet(inet), _listener(inet.get_tcp().listen(10000)) {}
+ void run() {
+ _listener.accept().then([this] (tcp::connection conn) {
+ (new connection(std::move(conn)))->run();
+ run();
+ });
+ }
+};
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ interface netif(std::move(vnet));
+ ipv4 inet(&netif);
+ inet.set_host_address(ipv4_address("192.168.122.2"));
+ tcp_test tt(inet);
+ engine().when_started().then([&tt] { tt.run(); });
+ engine().run();
+}
+
+
diff --git a/src/seastar/demos/tcp_sctp_client_demo.cc b/src/seastar/demos/tcp_sctp_client_demo.cc
new file mode 100644
index 00000000..a07e55f5
--- /dev/null
+++ b/src/seastar/demos/tcp_sctp_client_demo.cc
@@ -0,0 +1,277 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <iostream>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/distributed.hh>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+static int rx_msg_size = 4 * 1024;
+static int tx_msg_total_size = 100 * 1024 * 1024;
+static int tx_msg_size = 4 * 1024;
+static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
+static std::string str_txbuf(tx_msg_size, 'X');
+
+class client;
+distributed<client> clients;
+
+transport protocol = transport::TCP;
+
+class client {
+private:
+ static constexpr unsigned _pings_per_connection = 10000;
+ unsigned _total_pings;
+ unsigned _concurrent_connections;
+ ipv4_addr _server_addr;
+ std::string _test;
+ lowres_clock::time_point _earliest_started;
+ lowres_clock::time_point _latest_finished;
+ size_t _processed_bytes;
+ unsigned _num_reported;
+public:
+ class connection {
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ size_t _bytes_read = 0;
+ size_t _bytes_write = 0;
+ public:
+ connection(connected_socket&& fd)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output()) {}
+
+ future<> do_read() {
+ return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
+ _bytes_read += buf.size();
+ if (buf.size() == 0) {
+ return make_ready_future();
+ } else {
+ return do_read();
+ }
+ });
+ }
+
+ future<> do_write(int end) {
+ if (end == 0) {
+ return make_ready_future();
+ }
+ return _write_buf.write(str_txbuf).then([this] {
+ _bytes_write += tx_msg_size;
+ return _write_buf.flush();
+ }).then([this, end] {
+ return do_write(end - 1);
+ });
+ }
+
+ future<> ping(int times) {
+ return _write_buf.write("ping").then([this] {
+ return _write_buf.flush();
+ }).then([this, times] {
+ return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) {
+ if (buf.size() != 4) {
+ fprint(std::cerr, "illegal packet received: %d\n", buf.size());
+ return make_ready_future();
+ }
+ auto str = std::string(buf.get(), buf.size());
+ if (str != "pong") {
+ fprint(std::cerr, "illegal packet received: %d\n", buf.size());
+ return make_ready_future();
+ }
+ if (times > 0) {
+ return ping(times - 1);
+ } else {
+ return make_ready_future();
+ }
+ });
+ });
+ }
+
+ future<size_t> rxrx() {
+ return _write_buf.write("rxrx").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return do_write(tx_msg_nr).then([this] {
+ return _write_buf.close();
+ }).then([this] {
+ return make_ready_future<size_t>(_bytes_write);
+ });
+ });
+ }
+
+ future<size_t> txtx() {
+ return _write_buf.write("txtx").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return do_read().then([this] {
+ return make_ready_future<size_t>(_bytes_read);
+ });
+ });
+ }
+ };
+
+ future<> ping_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->ping(_pings_per_connection).then([started] {
+ auto finished = lowres_clock::now();
+ clients.invoke_on(0, &client::ping_report, started, finished);
+ });
+ }
+
+ future<> rxrx_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->rxrx().then([started] (size_t bytes) {
+ auto finished = lowres_clock::now();
+ clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
+ });
+ }
+
+ future<> txtx_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->txtx().then([started] (size_t bytes) {
+ auto finished = lowres_clock::now();
+ clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
+ });
+ }
+
+ void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) {
+ if (_earliest_started > started)
+ _earliest_started = started;
+ if (_latest_finished < finished)
+ _latest_finished = finished;
+ if (++_num_reported == _concurrent_connections) {
+ auto elapsed = _latest_finished - _earliest_started;
+ auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
+ auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
+ fprint(std::cout, "========== ping ============\n");
+ fprint(std::cout, "Server: %s\n", _server_addr);
+ fprint(std::cout,"Connections: %u\n", _concurrent_connections);
+ fprint(std::cout, "Total PingPong: %u\n", _total_pings);
+ fprint(std::cout, "Total Time(Secs): %f\n", secs);
+ fprint(std::cout, "Requests/Sec: %f\n",
+ static_cast<double>(_total_pings) / secs);
+ clients.stop().then([] {
+ engine().exit(0);
+ });
+ }
+ }
+
+ void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) {
+ if (_earliest_started > started)
+ _earliest_started = started;
+ if (_latest_finished < finished)
+ _latest_finished = finished;
+ _processed_bytes += bytes;
+ if (++_num_reported == _concurrent_connections) {
+ auto elapsed = _latest_finished - _earliest_started;
+ auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
+ auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
+ fprint(std::cout, "========== %s ============\n", _test);
+ fprint(std::cout, "Server: %s\n", _server_addr);
+ fprint(std::cout, "Connections: %u\n", _concurrent_connections);
+ fprint(std::cout, "Bytes Received(MiB): %u\n", _processed_bytes/1024/1024);
+ fprint(std::cout, "Total Time(Secs): %f\n", secs);
+ fprint(std::cout, "Bandwidth(Gbits/Sec): %f\n",
+ static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs);
+ clients.stop().then([] {
+ engine().exit(0);
+ });
+ }
+ }
+
+ future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) {
+ _server_addr = server_addr;
+ _concurrent_connections = ncon * smp::count;
+ _total_pings = _pings_per_connection * _concurrent_connections;
+ _test = test;
+
+ for (unsigned i = 0; i < ncon; i++) {
+ socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}});
+ engine().net().connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) {
+ auto conn = new connection(std::move(fd));
+ (this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) {
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ fprint(std::cerr, "request error: %s\n", ex.what());
+ }
+ });
+ });
+ }
+ return make_ready_future();
+ }
+ future<> stop() {
+ return make_ready_future();
+ }
+
+ typedef future<> (client::*test_fn)(connection *conn);
+ static const std::map<std::string, test_fn> tests;
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ app_template app;
+ app.add_options()
+ ("server", bpo::value<std::string>()->required(), "Server address")
+ ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)")
+ ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu")
+ ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp")
+ ;
+
+ return app.run_deprecated(ac, av, [&app] {
+ auto&& config = app.configuration();
+ auto server = config["server"].as<std::string>();
+ auto test = config["test"].as<std::string>();
+ auto ncon = config["conn"].as<unsigned>();
+ auto proto = config["proto"].as<std::string>();
+
+ if (proto == "tcp") {
+ protocol = transport::TCP;
+ } else if (proto == "sctp") {
+ protocol = transport::SCTP;
+ } else {
+ fprint(std::cerr, "Error: --proto=tcp|sctp\n");
+ return engine().exit(1);
+ }
+
+ if (!client::tests.count(test)) {
+ fprint(std::cerr, "Error: -test=ping | rxrx | txtx\n");
+ return engine().exit(1);
+ }
+
+ clients.start().then([server, test, ncon] () {
+ clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon);
+ });
+ });
+}
+
+const std::map<std::string, client::test_fn> client::tests = {
+ {"ping", &client::ping_test},
+ {"rxrx", &client::rxrx_test},
+ {"txtx", &client::txtx_test},
+};
+
diff --git a/src/seastar/demos/tcp_sctp_server_demo.cc b/src/seastar/demos/tcp_sctp_server_demo.cc
new file mode 100644
index 00000000..f8231e17
--- /dev/null
+++ b/src/seastar/demos/tcp_sctp_server_demo.cc
@@ -0,0 +1,200 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2014 Cloudius Systems
+ */
+
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/temporary_buffer.hh>
+#include <seastar/core/distributed.hh>
+#include <vector>
+#include <iostream>
+
+using namespace seastar;
+
+static std::string str_ping{"ping"};
+static std::string str_txtx{"txtx"};
+static std::string str_rxrx{"rxrx"};
+static std::string str_pong{"pong"};
+static std::string str_unknow{"unknow cmd"};
+static int tx_msg_total_size = 100 * 1024 * 1024;
+static int tx_msg_size = 4 * 1024;
+static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
+static int rx_msg_size = 4 * 1024;
+static std::string str_txbuf(tx_msg_size, 'X');
+static bool enable_tcp = false;
+static bool enable_sctp = false;
+
+class tcp_server {
+ std::vector<server_socket> _tcp_listeners;
+ std::vector<server_socket> _sctp_listeners;
+public:
+ future<> listen(ipv4_addr addr) {
+ if (enable_tcp) {
+ listen_options lo;
+ lo.proto = transport::TCP;
+ lo.reuse_address = true;
+ _tcp_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
+ do_accepts(_tcp_listeners);
+ }
+
+ if (enable_sctp) {
+ listen_options lo;
+ lo.proto = transport::SCTP;
+ lo.reuse_address = true;
+ _sctp_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
+ do_accepts(_sctp_listeners);
+ }
+ return make_ready_future<>();
+ }
+
+ // FIXME: We should properly tear down the service here.
+ future<> stop() {
+ return make_ready_future<>();
+ }
+
+ void do_accepts(std::vector<server_socket>& listeners) {
+ int which = listeners.size() - 1;
+ listeners[which].accept().then([this, &listeners] (connected_socket fd, socket_address addr) mutable {
+ auto conn = new connection(*this, std::move(fd), addr);
+ conn->process().then_wrapped([conn] (auto&& f) {
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ std::cout << "request error " << ex.what() << "\n";
+ }
+ });
+ do_accepts(listeners);
+ }).then_wrapped([] (auto&& f) {
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ std::cout << "accept failed: " << ex.what() << "\n";
+ }
+ });
+ }
+ class connection {
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ public:
+ connection(tcp_server& server, connected_socket&& fd, socket_address addr)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output()) {}
+ future<> process() {
+ return read();
+ }
+ future<> read() {
+ if (_read_buf.eof()) {
+ return make_ready_future();
+ }
+ // Expect 4 bytes cmd from client
+ size_t n = 4;
+ return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) {
+ if (buf.size() == 0) {
+ return make_ready_future();
+ }
+ auto cmd = std::string(buf.get(), buf.size());
+ // pingpong test
+ if (cmd == str_ping) {
+ return _write_buf.write(str_pong).then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return this->read();
+ });
+ // server tx test
+ } else if (cmd == str_txtx) {
+ return tx_test();
+ // server tx test
+ } else if (cmd == str_rxrx) {
+ return rx_test();
+ // unknow test
+ } else {
+ return _write_buf.write(str_unknow).then([this] {
+ return _write_buf.flush();
+ }).then([] {
+ return make_ready_future();
+ });
+ }
+ });
+ }
+ future<> do_write(int end) {
+ if (end == 0) {
+ return make_ready_future<>();
+ }
+ return _write_buf.write(str_txbuf).then([this] {
+ return _write_buf.flush();
+ }).then([this, end] {
+ return do_write(end - 1);
+ });
+ }
+ future<> tx_test() {
+ return do_write(tx_msg_nr).then([this] {
+ return _write_buf.close();
+ }).then([] {
+ return make_ready_future<>();
+ });
+ }
+ future<> do_read() {
+ return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
+ if (buf.size() == 0) {
+ return make_ready_future();
+ } else {
+ return do_read();
+ }
+ });
+ }
+ future<> rx_test() {
+ return do_read().then([] {
+ return make_ready_future<>();
+ });
+ }
+ };
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port")
+ ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen")
+ ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ enable_tcp = config["tcp"].as<std::string>() == "yes";
+ enable_sctp = config["sctp"].as<std::string>() == "yes";
+ if (!enable_tcp && !enable_sctp) {
+ fprint(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n");
+ return engine().exit(1);
+ }
+ auto server = new distributed<tcp_server>;
+ server->start().then([server = std::move(server), port] () mutable {
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ server->invoke_on_all(&tcp_server::listen, ipv4_addr{port});
+ }).then([port] {
+ std::cout << "Seastar TCP server listening on port " << port << " ...\n";
+ });
+ });
+}
diff --git a/src/seastar/demos/tls_echo_server.hh b/src/seastar/demos/tls_echo_server.hh
new file mode 100644
index 00000000..e9e09ac6
--- /dev/null
+++ b/src/seastar/demos/tls_echo_server.hh
@@ -0,0 +1,118 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2017 ScyllaDB
+ */
+#pragma once
+
+#include <seastar/core/do_with.hh>
+#include <seastar/core/sstring.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/net/tls.hh>
+
+using namespace seastar;
+
+struct streams {
+ connected_socket s;
+ input_stream<char> in;
+ output_stream<char> out;
+
+ streams(connected_socket cs) : s(std::move(cs)), in(s.input()), out(s.output())
+ {}
+};
+
+class echoserver {
+ server_socket _socket;
+ shared_ptr<tls::server_credentials> _certs;
+ seastar::gate _gate;
+ bool _stopped = false;
+ bool _verbose = false;
+public:
+ echoserver(bool verbose = false)
+ : _certs(make_shared<tls::server_credentials>(make_shared<tls::dh_params>()))
+ , _verbose(verbose)
+ {}
+
+ future<> listen(socket_address addr, sstring crtfile, sstring keyfile, tls::client_auth ca = tls::client_auth::NONE) {
+ _certs->set_client_auth(ca);
+ return _certs->set_x509_key_file(crtfile, keyfile, tls::x509_crt_format::PEM).then([this, addr] {
+ ::listen_options opts;
+ opts.reuse_address = true;
+
+ _socket = tls::listen(_certs, addr, opts);
+
+ repeat([this] {
+ if (_stopped) {
+ return make_ready_future<stop_iteration>(stop_iteration::yes);
+ }
+ return with_gate(_gate, [this] {
+ return _socket.accept().then([this](::connected_socket s, socket_address a) {
+ if (_verbose) {
+ std::cout << "Got connection from "<< a << std::endl;
+ }
+ auto strms = make_lw_shared<streams>(std::move(s));
+ return repeat([strms, this]() {
+ return strms->in.read().then([this, strms](temporary_buffer<char> buf) {
+ if (buf.empty()) {
+ if (_verbose) {
+ std::cout << "EOM" << std::endl;
+ }
+ return make_ready_future<stop_iteration>(stop_iteration::yes);
+ }
+ sstring tmp(buf.begin(), buf.end());
+ if (_verbose) {
+ std::cout << "Read " << tmp.size() << "B" << std::endl;
+ }
+ return strms->out.write(tmp).then([strms]() {
+ return strms->out.flush();
+ }).then([] {
+ return make_ready_future<stop_iteration>(stop_iteration::no);
+ });
+ });
+ }).then([strms]{
+ return strms->out.close();
+ }).handle_exception([this](auto ep) {
+ }).finally([this, strms]{
+ if (_verbose) {
+ std::cout << "Ending session" << std::endl;
+ }
+ return strms->in.close();
+ });
+ }).handle_exception([this](auto ep) {
+ if (!_stopped) {
+ std::cerr << "Error: " << ep << std::endl;
+ }
+ }).then([this] {
+ return make_ready_future<stop_iteration>(_stopped ? stop_iteration::yes : stop_iteration::no);
+ });
+ });
+ });
+ return make_ready_future();
+ });
+ }
+
+ future<> stop() {
+ _stopped = true;
+ _socket.abort_accept();
+ return _gate.close();
+ }
+};
diff --git a/src/seastar/demos/tls_echo_server_demo.cc b/src/seastar/demos/tls_echo_server_demo.cc
new file mode 100644
index 00000000..1ca0efbb
--- /dev/null
+++ b/src/seastar/demos/tls_echo_server_demo.cc
@@ -0,0 +1,67 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/dns.hh>
+#include "tls_echo_server.hh"
+
+using namespace seastar;
+namespace bpo = boost::program_options;
+
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "Server port")
+ ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Server address")
+ ("cert,c", bpo::value<std::string>()->required(), "Server certificate file")
+ ("key,k", bpo::value<std::string>()->required(), "Certificate key")
+ ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose")
+ ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto crt = config["cert"].as<std::string>();
+ auto key = config["key"].as<std::string>();
+ auto addr = config["address"].as<std::string>();
+ auto verbose = config["verbose"].as<bool>();
+
+ std::cout << "Starting..." << std::endl;
+ return net::dns::resolve_name(addr).then([=](net::inet_address a) {
+ ipv4_addr ia(a, port);
+
+ auto server = ::make_shared<seastar::sharded<echoserver>>();
+ return server->start(verbose).then([=]() {
+ return server->invoke_on_all(&echoserver::listen, socket_address(ia), sstring(crt), sstring(key), tls::client_auth::NONE);
+ }).handle_exception([=](auto e) {
+ std::cerr << "Error: " << e << std::endl;
+ engine().exit(1);
+ }).then([=] {
+ std::cout << "TLS echo server running at " << addr << ":" << port << std::endl;
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ });
+ });
+ });
+}
diff --git a/src/seastar/demos/tls_simple_client_demo.cc b/src/seastar/demos/tls_simple_client_demo.cc
new file mode 100644
index 00000000..ea598e48
--- /dev/null
+++ b/src/seastar/demos/tls_simple_client_demo.cc
@@ -0,0 +1,132 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/dns.hh>
+#include "tls_echo_server.hh"
+
+using namespace seastar;
+namespace bpo = boost::program_options;
+
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "Remote port")
+ ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Remote address")
+ ("trust,t", bpo::value<std::string>(), "Trust store")
+ ("msg,m", bpo::value<std::string>(), "Message to send")
+ ("bytes,b", bpo::value<size_t>()->default_value(512), "Use random bytes of length as message")
+ ("iterations,i", bpo::value<size_t>()->default_value(1), "Repeat X times")
+ ("read-response,r", bpo::value<bool>()->default_value(true)->implicit_value(true), "Read echoed message")
+ ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose operation")
+ ("check-name,c", bpo::value<bool>()->default_value(false)->implicit_value(true), "Check server name")
+ ("server-name,s", bpo::value<std::string>(), "Expected server name")
+ ;
+
+
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto addr = config["address"].as<std::string>();
+ auto n = config["bytes"].as<size_t>();
+ auto i = config["iterations"].as<size_t>();
+ auto do_read = config["read-response"].as<bool>();
+ auto verbose = config["verbose"].as<bool>();
+ auto check = config["check-name"].as<bool>();
+
+ std::cout << "Starting..." << std::endl;
+
+ auto certs = ::make_shared<tls::certificate_credentials>();
+ auto f = make_ready_future();
+
+ if (config.count("trust")) {
+ f = certs->set_x509_trust_file(config["trust"].as<std::string>(), tls::x509_crt_format::PEM);
+ }
+
+ seastar::shared_ptr<sstring> msg;
+
+ if (config.count("msg")) {
+ msg = seastar::make_shared<sstring>(config["msg"].as<std::string>());
+ } else {
+ msg = seastar::make_shared<sstring>(sstring(sstring::initialized_later(), n));
+ for (size_t i = 0; i < n; ++i) {
+ (*msg)[i] = '0' + char(::rand() % 30);
+ }
+ }
+
+ sstring server_name;
+ if (config.count("server-name")) {
+ server_name = config["server-name"].as<std::string>();
+ }
+ if (verbose) {
+ std::cout << "Msg (" << msg->size() << "B):" << std::endl << *msg << std::endl;
+ }
+ return f.then([=]() {
+ return net::dns::get_host_by_name(addr).then([=](net::hostent e) {
+ ipv4_addr ia(e.addr_list.front(), port);
+
+ sstring name;
+ if (check) {
+ name = server_name.empty() ? e.names.front() : server_name;
+ }
+ return tls::connect(certs, ia, name).then([=](::connected_socket s) {
+ auto strms = ::make_lw_shared<streams>(std::move(s));
+ auto range = boost::irange(size_t(0), i);
+ return do_for_each(range, [=](auto) {
+ auto f = strms->out.write(*msg);
+ if (!do_read) {
+ return strms->out.close().then([f = std::move(f)]() mutable {
+ return std::move(f);
+ });
+ }
+ return f.then([=]() {
+ return strms->out.flush().then([=] {
+ return strms->in.read_exactly(msg->size()).then([=](temporary_buffer<char> buf) {
+ sstring tmp(buf.begin(), buf.end());
+ if (tmp != *msg) {
+ std::cerr << "Got garbled message!" << std::endl;
+ if (verbose) {
+ std::cout << "Got (" << tmp.size() << ") :" << std::endl << tmp << std::endl;
+ }
+ throw std::runtime_error("Got garbled message!");
+ }
+ });
+ });
+ });
+ }).then([strms, do_read]{
+ return do_read ? strms->out.close() : make_ready_future<>();
+ }).finally([strms]{
+ return strms->in.close();
+ });
+ });
+ }).handle_exception([](auto ep) {
+ std::cerr << "Error: " << ep << std::endl;
+ });
+ }).finally([] {
+ engine().exit(0);
+ });
+ });
+}
diff --git a/src/seastar/demos/udp_client_demo.cc b/src/seastar/demos/udp_client_demo.cc
new file mode 100644
index 00000000..cfd23fc0
--- /dev/null
+++ b/src/seastar/demos/udp_client_demo.cc
@@ -0,0 +1,87 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/api.hh>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+class client {
+private:
+ udp_channel _chan;
+ uint64_t n_sent {};
+ uint64_t n_received {};
+ uint64_t n_failed {};
+ timer<> _stats_timer;
+public:
+ void start(ipv4_addr server_addr) {
+ std::cout << "Sending to " << server_addr << std::endl;
+
+ _chan = engine().net().make_udp_channel();
+
+ _stats_timer.set_callback([this] {
+ std::cout << "Out: " << n_sent << " pps, \t";
+ std::cout << "Err: " << n_failed << " pps, \t";
+ std::cout << "In: " << n_received << " pps" << std::endl;
+ n_sent = 0;
+ n_received = 0;
+ n_failed = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ keep_doing([this, server_addr] {
+ return _chan.send(server_addr, "hello!\n")
+ .then_wrapped([this] (auto&& f) {
+ try {
+ f.get();
+ n_sent++;
+ } catch (...) {
+ n_failed++;
+ }
+ });
+ });
+
+ keep_doing([this] {
+ return _chan.receive().then([this] (auto) {
+ n_received++;
+ });
+ });
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ client _client;
+ app_template app;
+ app.add_options()
+ ("server", bpo::value<std::string>(), "Server address")
+ ;
+ return app.run_deprecated(ac, av, [&_client, &app] {
+ auto&& config = app.configuration();
+ _client.start(config["server"].as<std::string>());
+ });
+}
diff --git a/src/seastar/demos/udp_server_demo.cc b/src/seastar/demos/udp_server_demo.cc
new file mode 100644
index 00000000..86dabaaf
--- /dev/null
+++ b/src/seastar/demos/udp_server_demo.cc
@@ -0,0 +1,80 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <iostream>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+class udp_server {
+private:
+ udp_channel _chan;
+ timer<> _stats_timer;
+ uint64_t _n_sent {};
+public:
+ void start(uint16_t port) {
+ ipv4_addr listen_addr{port};
+ _chan = engine().net().make_udp_channel(listen_addr);
+
+ _stats_timer.set_callback([this] {
+ std::cout << "Out: " << _n_sent << " pps" << std::endl;
+ _n_sent = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ keep_doing([this] {
+ return _chan.receive().then([this] (udp_datagram dgram) {
+ return _chan.send(dgram.get_src(), std::move(dgram.get_data())).then([this] {
+ _n_sent++;
+ });
+ });
+ });
+ }
+ // FIXME: we should properly tear down the service here.
+ future<> stop() {
+ return make_ready_future<>();
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "UDP server port") ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto server = new distributed<udp_server>;
+ server->start().then([server = std::move(server), port] () mutable {
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ server->invoke_on_all(&udp_server::start, port);
+ }).then([port] {
+ std::cout << "Seastar UDP server listening on port " << port << " ...\n";
+ });
+ });
+}
diff --git a/src/seastar/demos/udp_zero_copy_demo.cc b/src/seastar/demos/udp_zero_copy_demo.cc
new file mode 100644
index 00000000..9bfa8fe7
--- /dev/null
+++ b/src/seastar/demos/udp_zero_copy_demo.cc
@@ -0,0 +1,149 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/scattered_message.hh>
+#include <seastar/core/vector-data-sink.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/units.hh>
+#include <random>
+#include <iomanip>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+namespace bpo = boost::program_options;
+
+template <typename Duration>
+typename Duration::rep to_seconds(Duration d) {
+ return std::chrono::duration_cast<std::chrono::seconds>(d).count();
+}
+
+class server {
+private:
+ udp_channel _chan;
+ timer<> _stats_timer;
+ uint64_t _n_sent {};
+ size_t _chunk_size;
+ bool _copy;
+ std::vector<packet> _packets;
+ std::unique_ptr<output_stream<char>> _out;
+ steady_clock_type::time_point _last;
+ sstring _key;
+ size_t _packet_size = 8*KB;
+ char* _mem;
+ size_t _mem_size;
+ std::mt19937 _rnd;
+ std::random_device _randem_dev;
+ std::uniform_int_distribution<size_t> _chunk_distribution;
+private:
+ char* next_chunk() {
+ return _mem + _chunk_distribution(_rnd);
+ }
+public:
+ server()
+ : _rnd(std::random_device()()) {
+ }
+ future<> send(ipv4_addr dst, packet p) {
+ return _chan.send(dst, std::move(p)).then([this] {
+ _n_sent++;
+ });
+ }
+ void start(int chunk_size, bool copy, size_t mem_size) {
+ ipv4_addr listen_addr{10000};
+ _chan = engine().net().make_udp_channel(listen_addr);
+
+ std::cout << "Listening on " << listen_addr << std::endl;
+
+ _last = steady_clock_type::now();
+ _stats_timer.set_callback([this] {
+ auto now = steady_clock_type::now();
+ std::cout << "Out: "
+ << std::setprecision(2) << std::fixed
+ << (double)_n_sent / to_seconds(now - _last)
+ << " pps" << std::endl;
+ _last = now;
+ _n_sent = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ _chunk_size = chunk_size;
+ _copy = copy;
+ _key = sstring(new char[64], 64);
+
+ _out = std::make_unique<output_stream<char>>(
+ data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size);
+
+ _mem = new char[mem_size];
+ _mem_size = mem_size;
+
+ _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3);
+
+ assert(3 * _chunk_size <= _packet_size);
+
+ keep_doing([this] {
+ return _chan.receive().then([this] (udp_datagram dgram) {
+ auto chunk = next_chunk();
+ lw_shared_ptr<sstring> item;
+ if (_copy) {
+ _packets.clear();
+ _out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ _out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ _out->write(chunk, _chunk_size);
+ _out->flush();
+ assert(_packets.size() == 1);
+ return send(dgram.get_src(), std::move(_packets[0]));
+ } else {
+ auto chunk = next_chunk();
+ scattered_message<char> msg;
+ msg.reserve(3);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ return send(dgram.get_src(), std::move(msg).release());
+ }
+ });
+ });
+ }
+};
+
+int main(int ac, char ** av) {
+ server s;
+ app_template app;
+ app.add_options()
+ ("chunk-size", bpo::value<int>()->default_value(1024),
+ "Chunk size")
+ ("mem-size", bpo::value<int>()->default_value(512),
+ "Memory pool size in MiB")
+ ("copy", "Copy data rather than send via zero-copy")
+ ;
+ return app.run_deprecated(ac, av, [&app, &s] {
+ auto&& config = app.configuration();
+ auto chunk_size = config["chunk-size"].as<int>();
+ auto mem_size = (size_t)config["mem-size"].as<int>() * MB;
+ auto copy = config.count("copy");
+ s.start(chunk_size, copy, mem_size);
+ });
+}