summaryrefslogtreecommitdiffstats
path: root/src/seastar/demos
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/seastar/demos
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/demos')
-rw-r--r--src/seastar/demos/CMakeLists.txt115
-rw-r--r--src/seastar/demos/block_discard_demo.cc74
-rw-r--r--src/seastar/demos/coroutines_demo.cc63
-rw-r--r--src/seastar/demos/echo_demo.cc126
-rw-r--r--src/seastar/demos/file_demo.cc175
-rw-r--r--src/seastar/demos/ip_demo.cc47
-rw-r--r--src/seastar/demos/l3_demo.cc49
-rw-r--r--src/seastar/demos/line_count_demo.cc85
-rw-r--r--src/seastar/demos/rpc_demo.cc299
-rw-r--r--src/seastar/demos/scheduling_group_demo.cc186
-rw-r--r--src/seastar/demos/sharded_parameter_demo.cc78
-rw-r--r--src/seastar/demos/tcp_demo.cc75
-rw-r--r--src/seastar/demos/tcp_sctp_client_demo.cc278
-rw-r--r--src/seastar/demos/tcp_sctp_server_demo.cc205
-rw-r--r--src/seastar/demos/tls_echo_server.hh121
-rw-r--r--src/seastar/demos/tls_echo_server_demo.cc67
-rw-r--r--src/seastar/demos/tls_simple_client_demo.cc133
-rw-r--r--src/seastar/demos/tutorial_examples.cc117
-rw-r--r--src/seastar/demos/udp_client_demo.cc89
-rw-r--r--src/seastar/demos/udp_server_demo.cc82
-rw-r--r--src/seastar/demos/udp_zero_copy_demo.cc153
21 files changed, 2617 insertions, 0 deletions
diff --git a/src/seastar/demos/CMakeLists.txt b/src/seastar/demos/CMakeLists.txt
new file mode 100644
index 000000000..084e82987
--- /dev/null
+++ b/src/seastar/demos/CMakeLists.txt
@@ -0,0 +1,115 @@
+#
+# This file is open source software, licensed to you under the terms
+# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+# distributed with this work for additional information regarding copyright
+# ownership. You may not use this file except in compliance with the License.
+#
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Copyright (C) 2018 Scylladb, Ltd.
+#
+
+# Logical target for all demos.
+add_custom_target (demos)
+
+macro (seastar_add_demo name)
+ set (args ${ARGN})
+
+ cmake_parse_arguments (
+ parsed_args
+ ""
+ ""
+ "SOURCES"
+ ${args})
+
+ set (target demo_${name})
+ add_executable (${target} ${parsed_args_SOURCES})
+
+ target_include_directories (${target}
+ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
+
+ target_link_libraries (${target}
+ PRIVATE
+ Boost::program_options
+ seastar_private)
+
+ set_target_properties (${target}
+ PROPERTIES
+ OUTPUT_NAME ${name}_demo)
+
+ add_dependencies (demos ${target})
+endmacro ()
+
+seastar_add_demo (block_discard
+ SOURCES block_discard_demo.cc)
+
+if (${Seastar_API_LEVEL} GREATER_EQUAL 3)
+ seastar_add_demo (coroutines
+ SOURCES coroutines_demo.cc)
+endif ()
+
+seastar_add_demo (echo
+ SOURCES echo_demo.cc)
+
+seastar_add_demo (ip
+ SOURCES ip_demo.cc)
+
+seastar_add_demo (line_count
+ SOURCES line_count_demo.cc)
+
+seastar_add_demo (l3
+ SOURCES l3_demo.cc)
+
+seastar_add_demo (rpc
+ SOURCES rpc_demo.cc)
+
+seastar_add_demo (scheduling_group
+ SOURCES scheduling_group_demo.cc)
+
+seastar_add_demo (tcp
+ SOURCES tcp_demo.cc)
+
+seastar_add_demo (tcp_sctp_client
+ SOURCES tcp_sctp_client_demo.cc)
+
+seastar_add_demo (tcp_sctp_server
+ SOURCES tcp_sctp_server_demo.cc)
+
+seastar_add_demo (tls_echo_server
+ SOURCES
+ tls_echo_server.hh
+ tls_echo_server_demo.cc)
+
+seastar_add_demo (tls_simple_client
+ SOURCES
+ tls_echo_server.hh
+ tls_simple_client_demo.cc)
+
+seastar_add_demo (udp_client
+ SOURCES udp_client_demo.cc)
+
+seastar_add_demo (udp_server
+ SOURCES udp_server_demo.cc)
+
+seastar_add_demo (udp_zero_copy
+ SOURCES udp_zero_copy_demo.cc)
+
+seastar_add_demo (sharded_parameter
+ SOURCES sharded_parameter_demo.cc)
+
+seastar_add_demo (file
+ SOURCES file_demo.cc)
+
+seastar_add_demo (tutorial_examples
+ SOURCES tutorial_examples.cc)
diff --git a/src/seastar/demos/block_discard_demo.cc b/src/seastar/demos/block_discard_demo.cc
new file mode 100644
index 000000000..b14c5ed01
--- /dev/null
+++ b/src/seastar/demos/block_discard_demo.cc
@@ -0,0 +1,74 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <algorithm>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/semaphore.hh>
+#include <iostream>
+
+using namespace seastar;
+
+namespace bpo = boost::program_options;
+
+struct file_test {
+ file_test(file&& f) : f(std::move(f)) {}
+ file f;
+ semaphore sem = { 0 };
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("dev", bpo::value<std::string>(), "e.g. --dev /dev/sdb")
+ ;
+
+ return app.run_deprecated(ac, av, [&app] {
+ static constexpr auto max = 10000;
+ auto&& config = app.configuration();
+ auto filepath = config["dev"].as<std::string>();
+
+ return open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) {
+ auto ft = new file_test{std::move(f)};
+
+ // Discard asynchronously, siganl when done.
+ (void)ft->f.stat().then([ft] (struct stat st) mutable {
+ assert(S_ISBLK(st.st_mode));
+ auto offset = 0;
+ auto length = max * 4096;
+ return ft->f.discard(offset, length).then([ft] () mutable {
+ ft->sem.signal();
+ });
+ });
+
+ // Wait and exit.
+ (void)ft->sem.wait().then([ft] () mutable {
+ return ft->f.flush();
+ }).then([ft] () mutable {
+ std::cout << "done\n";
+ delete ft;
+ engine().exit(0);
+ });
+ });
+ });
+}
diff --git a/src/seastar/demos/coroutines_demo.cc b/src/seastar/demos/coroutines_demo.cc
new file mode 100644
index 000000000..58c881c93
--- /dev/null
+++ b/src/seastar/demos/coroutines_demo.cc
@@ -0,0 +1,63 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2019 ScyllaDB Ltd.
+ */
+
+#include <iostream>
+
+#include <seastar/util/std-compat.hh>
+
+#ifndef SEASTAR_COROUTINES_ENABLED
+
+int main(int argc, char** argv) {
+ std::cout << "coroutines not available\n";
+ return 0;
+}
+
+#else
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/coroutine.hh>
+#include <seastar/core/fstream.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/loop.hh>
+
+int main(int argc, char** argv) {
+ seastar::app_template app;
+ app.run(argc, argv, [] () -> seastar::future<> {
+ std::cout << "this is a completely useless program\nplease stand by...\n";
+ auto f = seastar::parallel_for_each(std::vector<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, [] (int i) -> seastar::future<> {
+ co_await seastar::sleep(std::chrono::seconds(i));
+ std::cout << i << "\n";
+ });
+
+ auto file = co_await seastar::open_file_dma("useless_file.txt", seastar::open_flags::create | seastar::open_flags::wo);
+ auto out = co_await seastar::make_file_output_stream(file);
+ seastar::sstring str = "nothing to see here, move along now\n";
+ co_await out.write(str);
+ co_await out.flush();
+ co_await out.close();
+
+ co_await std::move(f);
+ std::cout << "done\n";
+ });
+}
+
+#endif
diff --git a/src/seastar/demos/echo_demo.cc b/src/seastar/demos/echo_demo.cc
new file mode 100644
index 000000000..f44866941
--- /dev/null
+++ b/src/seastar/demos/echo_demo.cc
@@ -0,0 +1,126 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ *
+ */
+
+#include <seastar/net/virtio.hh>
+#include <seastar/net/dpdk.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/ip.hh>
+#include <iostream>
+#include <utility>
+#include <algorithm>
+
+using namespace seastar;
+using namespace net;
+
+void dump_packet(const packet& p) {
+ std::cout << "rx:";
+ auto f = p.frag(0);
+ for (unsigned i = 0; i < std::min(f.size, size_t(30)); ++i) {
+ char x[4];
+ std::sprintf(x, " %02x", uint8_t(f.base[i]));
+ std::cout << x;
+ }
+ std::cout << "\n";
+}
+
+future<> echo_packet(net::qp& netif, packet p) {
+ auto f = p.frag(0);
+ if (f.size < sizeof(eth_hdr)) {
+ return make_ready_future<>();
+ }
+ auto pos = 0;
+ auto eh = reinterpret_cast<eth_hdr*>(f.base + pos);
+ pos += sizeof(*eh);
+ *eh = ntoh(*eh);
+ if (eh->eth_proto != 0x0800) {
+ return make_ready_future<>();
+ }
+ auto iph = reinterpret_cast<ip_hdr*>(f.base + pos);
+ *iph = ntoh(*iph);
+ pos += iph->ihl * 4;
+ if (iph->ver != 4 || iph->ihl < 5 || iph->ip_proto != 1) {
+ return make_ready_future<>();
+ }
+ auto ip_len = iph->len;
+ auto icmph = reinterpret_cast<icmp_hdr*>(f.base + pos);
+ if (icmph->type != icmp_hdr::msg_type::echo_request) {
+ return make_ready_future<>();
+ }
+ auto icmp_len = ip_len - iph->ihl * 4;
+ std::swap(eh->src_mac, eh->dst_mac);
+ std::swap(iph->src_ip, iph->dst_ip);
+ icmph->type = icmp_hdr::msg_type::echo_reply;
+ icmph->csum = 0;
+ *iph = hton(*iph);
+ *eh = hton(*eh);
+ icmph->csum = ip_checksum(icmph, icmp_len);
+ iph->csum = 0;
+ iph->csum = ip_checksum(iph, iph->ihl * 4);
+ return netif.send(std::move(p));
+}
+
+#ifdef SEASTAR_HAVE_DPDK
+void usage()
+{
+ std::cout<<"Usage: echotest [-virtio|-dpdk]"<<std::endl;
+ std::cout<<" -virtio - use virtio backend (default)"<<std::endl;
+ std::cout<<" -dpdk - use dpdk-pmd backend"<<std::endl;
+}
+#endif
+
+int main(int ac, char** av) {
+ std::unique_ptr<net::device> dnet;
+ net::qp* vnet;
+
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+#ifdef SEASTAR_HAVE_DPDK
+ if (ac > 2) {
+ usage();
+ return -1;
+ }
+
+ if ((ac == 1) || !std::strcmp(av[1], "-virtio")) {
+ dnet = create_virtio_net_device(opts);
+ } else if (!std::strcmp(av[1], "-dpdk")) {
+ dnet = create_dpdk_net_device();
+ } else {
+ usage();
+ return -1;
+ }
+#else
+ dnet = create_virtio_net_device(opts);
+#endif // SEASTAR_HAVE_DPDK
+
+ auto qp = dnet->init_local_queue(opts, 0);
+ vnet = qp.get();
+ dnet->set_local_queue(std::move(qp));
+ future<> rx_done =
+ dnet->receive([vnet] (packet p) {
+ return echo_packet(*vnet, std::move(p));
+ });
+ engine().run();
+ return 0;
+}
+
+
diff --git a/src/seastar/demos/file_demo.cc b/src/seastar/demos/file_demo.cc
new file mode 100644
index 000000000..76974bdc4
--- /dev/null
+++ b/src/seastar/demos/file_demo.cc
@@ -0,0 +1,175 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+
+// Demonstration of seastar::with_file
+
+#include <cstring>
+#include <limits>
+#include <random>
+
+#include <seastar/core/app-template.hh>
+
+#include <seastar/core/aligned_buffer.hh>
+#include <seastar/core/file.hh>
+#include <seastar/core/fstream.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/sstring.hh>
+#include <seastar/core/temporary_buffer.hh>
+#include <seastar/core/loop.hh>
+#include <seastar/util/log.hh>
+#include <seastar/util/tmp_file.hh>
+
+using namespace seastar;
+
+constexpr size_t aligned_size = 4096;
+
+future<> verify_data_file(file& f, temporary_buffer<char>& rbuf, const temporary_buffer<char>& wbuf) {
+ return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf, &wbuf] (size_t count) {
+ assert(count == aligned_size);
+ fmt::print(" verifying {} bytes\n", count);
+ assert(!memcmp(rbuf.get(), wbuf.get(), aligned_size));
+ });
+}
+
+future<file> open_data_file(sstring meta_filename, temporary_buffer<char>& rbuf) {
+ fmt::print(" retrieving data filename from {}\n", meta_filename);
+ return with_file(open_file_dma(meta_filename, open_flags::ro), [&rbuf] (file& f) {
+ return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf] (size_t count) {
+ assert(count == aligned_size);
+ auto data_filename = sstring(rbuf.get());
+ fmt::print(" opening {}\n", data_filename);
+ return open_file_dma(data_filename, open_flags::ro);
+ });
+ });
+}
+
+future<> demo_with_file() {
+ fmt::print("Demonstrating with_file():\n");
+ return tmp_dir::do_with_thread([] (tmp_dir& t) {
+ auto rnd = std::mt19937(std::random_device()());
+ auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max());
+ auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
+ sstring meta_filename = (t.get_path() / "meta_file").native();
+ sstring data_filename = (t.get_path() / "data_file").native();
+
+ // `with_file` is used to create/open `filename` just around the call to `dma_write`
+ auto write_to_file = [] (const sstring filename, temporary_buffer<char>& wbuf) {
+ auto count = with_file(open_file_dma(filename, open_flags::rw | open_flags::create), [&wbuf] (file& f) {
+ return f.dma_write(0, wbuf.get(), aligned_size);
+ }).get0();
+ assert(count == aligned_size);
+ };
+
+ // print the data_filename into the write buffer
+ std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0);
+ std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write());
+
+ // and write it to `meta_filename`
+ fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename);
+
+ write_to_file(meta_filename, wbuf);
+
+ // now write some random data into data_filename
+ fmt::print(" writing random data into {}\n", data_filename);
+ std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
+
+ write_to_file(data_filename, wbuf);
+
+ // verify the data via meta_filename
+ fmt::print(" verifying data...\n");
+ auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
+
+ with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) {
+ return verify_data_file(f, rbuf, wbuf);
+ }).get();
+ });
+}
+
+future<> demo_with_file_close_on_failure() {
+ fmt::print("\nDemonstrating with_file_close_on_failure():\n");
+ return tmp_dir::do_with_thread([] (tmp_dir& t) {
+ auto rnd = std::mt19937(std::random_device()());
+ auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max());
+ auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
+ sstring meta_filename = (t.get_path() / "meta_file").native();
+ sstring data_filename = (t.get_path() / "data_file").native();
+
+ // with_file_close_on_failure will close the opened file only if
+ // `make_file_output_stream` returns an error. Otherwise, in the error-free path,
+ // the opened file is moved to `file_output_stream` that in-turn closes it
+ // when the stream is closed.
+ auto make_output_stream = [] (const sstring filename) {
+ return with_file_close_on_failure(open_file_dma(std::move(filename), open_flags::rw | open_flags::create), [] (file f) {
+ return make_file_output_stream(std::move(f), aligned_size);
+ });
+ };
+
+ // writes the buffer one byte at a time, to demonstrate output stream
+ auto write_to_stream = [] (output_stream<char>& o, const temporary_buffer<char>& wbuf) {
+ return seastar::do_for_each(wbuf, [&o] (char c) {
+ return o.write(&c, 1);
+ }).finally([&o] {
+ return o.close();
+ });
+ };
+
+ // print the data_filename into the write buffer
+ std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0);
+ std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write());
+
+ // and write it to `meta_filename`
+ fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename);
+
+ // with_file_close_on_failure will close the opened file only if
+ // `make_file_output_stream` returns an error. Otherwise, in the error-free path,
+ // the opened file is moved to `file_output_stream` that in-turn closes it
+ // when the stream is closed.
+ output_stream<char> o = make_output_stream(meta_filename).get0();
+
+ write_to_stream(o, wbuf).get();
+
+ // now write some random data into data_filename
+ fmt::print(" writing random data into {}\n", data_filename);
+ std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
+
+ o = make_output_stream(data_filename).get0();
+
+ write_to_stream(o, wbuf).get();
+
+ // verify the data via meta_filename
+ fmt::print(" verifying data...\n");
+ auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
+
+ with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) {
+ return verify_data_file(f, rbuf, wbuf);
+ }).get();
+ });
+}
+
+int main(int ac, char** av) {
+ app_template app;
+ return app.run(ac, av, [] {
+ return demo_with_file().then([] {
+ return demo_with_file_close_on_failure();
+ });
+ });
+}
diff --git a/src/seastar/demos/ip_demo.cc b/src/seastar/demos/ip_demo.cc
new file mode 100644
index 000000000..30372999a
--- /dev/null
+++ b/src/seastar/demos/ip_demo.cc
@@ -0,0 +1,47 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/arp.hh>
+#include <seastar/net/ip.hh>
+#include <seastar/net/net.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/virtio.hh>
+#include <seastar/core/aligned_buffer.hh>
+
+using namespace seastar;
+using namespace net;
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ vnet->set_local_queue(vnet->init_local_queue(opts, 0));
+
+ interface netif(std::move(vnet));
+ ipv4 inet(&netif);
+ inet.set_host_address(ipv4_address("192.168.122.2"));
+ engine().run();
+ return 0;
+}
+
+
+
diff --git a/src/seastar/demos/l3_demo.cc b/src/seastar/demos/l3_demo.cc
new file mode 100644
index 000000000..51fdd7e93
--- /dev/null
+++ b/src/seastar/demos/l3_demo.cc
@@ -0,0 +1,49 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/net.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/net/virtio.hh>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+
+void dump_arp_packets(l3_protocol& proto) {
+ // FIXME: ignored future
+ (void)proto.receive([] (packet p, ethernet_address from) {
+ std::cout << "seen arp packet\n";
+ return make_ready_future<>();
+ }, [] (forward_hash& out_hash_data, packet& p, size_t off) {return false;});
+}
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ interface netif(std::move(vnet));
+ l3_protocol arp(&netif, eth_protocol_num::arp, []{ return std::optional<l3_protocol::l3packet>(); });
+ dump_arp_packets(arp);
+ engine().run();
+ return 0;
+}
+
diff --git a/src/seastar/demos/line_count_demo.cc b/src/seastar/demos/line_count_demo.cc
new file mode 100644
index 000000000..eab4609ec
--- /dev/null
+++ b/src/seastar/demos/line_count_demo.cc
@@ -0,0 +1,85 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+// Demonstration of file_input_stream. Don't expect stellar performance
+// since no read-ahead or caching is done yet.
+
+#include <seastar/core/fstream.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <fmt/printf.h>
+#include <algorithm>
+#include <iostream>
+
+using namespace seastar;
+
+struct reader {
+public:
+ reader(file f)
+ : is(make_file_input_stream(std::move(f), file_input_stream_options{1 << 16, 1})) {
+ }
+
+ input_stream<char> is;
+ size_t count = 0;
+
+ // for input_stream::consume():
+ using unconsumed_remainder = std::optional<temporary_buffer<char>>;
+ future<unconsumed_remainder> operator()(temporary_buffer<char> data) {
+ if (data.empty()) {
+ return make_ready_future<unconsumed_remainder>(std::move(data));
+ } else {
+ count += std::count(data.begin(), data.end(), '\n');
+ // FIXME: last line without \n?
+ return make_ready_future<unconsumed_remainder>();
+ }
+ }
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ namespace bpo = boost::program_options;
+ app.add_positional_options({
+ { "file", bpo::value<std::string>(), "File to process", 1 },
+ });
+ return app.run(ac, av, [&app] {
+ auto fname = app.configuration()["file"].as<std::string>();
+ return open_file_dma(fname, open_flags::ro).then([] (file f) {
+ auto r = make_shared<reader>(std::move(f));
+ return r->is.consume(*r).then([r] {
+ fmt::print("{:d} lines\n", r->count);
+ return r->is.close().then([r] {});
+ });
+ }).then_wrapped([] (future<> f) -> future<int> {
+ try {
+ f.get();
+ return make_ready_future<int>(0);
+ } catch (std::exception& ex) {
+ std::cout << ex.what() << "\n";
+ return make_ready_future<int>(1);
+ } catch (...) {
+ std::cout << "unknown exception\n";
+ return make_ready_future<int>(1);
+ }
+ });
+ });
+}
+
diff --git a/src/seastar/demos/rpc_demo.cc b/src/seastar/demos/rpc_demo.cc
new file mode 100644
index 000000000..d46cd07ec
--- /dev/null
+++ b/src/seastar/demos/rpc_demo.cc
@@ -0,0 +1,299 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/rpc/rpc.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/rpc/lz4_compressor.hh>
+#include <seastar/util/log.hh>
+#include <seastar/core/loop.hh>
+
+using namespace seastar;
+
+struct serializer {
+};
+
+template <typename T, typename Output>
+inline
+void write_arithmetic_type(Output& out, T v) {
+ static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
+ return out.write(reinterpret_cast<const char*>(&v), sizeof(T));
+}
+
+template <typename T, typename Input>
+inline
+T read_arithmetic_type(Input& in) {
+ static_assert(std::is_arithmetic<T>::value, "must be arithmetic type");
+ T v;
+ in.read(reinterpret_cast<char*>(&v), sizeof(T));
+ return v;
+}
+
+template <typename Output>
+inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); }
+template <typename Output>
+inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); }
+template <typename Input>
+inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); }
+template <typename Input>
+inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); }
+template <typename Input>
+inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); }
+template <typename Input>
+inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); }
+template <typename Input>
+inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); }
+
+template <typename Output>
+inline void write(serializer, Output& out, const sstring& v) {
+ write_arithmetic_type(out, uint32_t(v.size()));
+ out.write(v.c_str(), v.size());
+}
+
+template <typename Input>
+inline sstring read(serializer, Input& in, rpc::type<sstring>) {
+ auto size = read_arithmetic_type<uint32_t>(in);
+ sstring ret = uninitialized_string(size);
+ in.read(ret.data(), size);
+ return ret;
+}
+
+namespace bpo = boost::program_options;
+using namespace std::chrono_literals;
+
+class mycomp : public rpc::compressor::factory {
+ const sstring _name = "LZ4";
+public:
+ virtual const sstring& supported() const override {
+ fmt::print("supported called\n");
+ return _name;
+ }
+ virtual std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override {
+ fmt::print("negotiate called with {}\n", feature);
+ return feature == _name ? std::make_unique<rpc::lz4_compressor>() : nullptr;
+ }
+};
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "RPC server port")
+ ("server", bpo::value<std::string>(), "Server address")
+ ("compress", bpo::value<bool>()->default_value(false), "Compress RPC traffic");
+ std::cout << "start ";
+ rpc::protocol<serializer> myrpc(serializer{});
+ static std::unique_ptr<rpc::protocol<serializer>::server> server;
+ static std::unique_ptr<rpc::protocol<serializer>::client> client;
+ static double x = 30.0;
+
+ static logger log("rpc_demo");
+ myrpc.set_logger(&log);
+
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ bool compress = config["compress"].as<bool>();
+ static mycomp mc;
+ auto test1 = myrpc.register_handler(1, [x = 0](int i) mutable { fmt::print("test1 count {:d} got {:d}\n", ++x, i); });
+ auto test2 = myrpc.register_handler(2, [](int a, int b){ fmt::print("test2 got {:d} {:d}\n", a, b); return make_ready_future<int>(a+b); });
+ auto test3 = myrpc.register_handler(3, [](double x){ fmt::print("test3 got {:f}\n", x); return std::make_unique<double>(sin(x)); });
+ auto test4 = myrpc.register_handler(4, [](){ fmt::print("test4 throw!\n"); throw std::runtime_error("exception!"); });
+ auto test5 = myrpc.register_handler(5, [](){ fmt::print("test5 no wait\n"); return rpc::no_wait; });
+ auto test6 = myrpc.register_handler(6, [](const rpc::client_info& info, int x){ fmt::print("test6 client {}, {:d}\n", inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr), x); });
+ auto test8 = myrpc.register_handler(8, [](){ fmt::print("test8 sleep for 2 sec\n"); return sleep(2s); });
+ auto test13 = myrpc.register_handler(13, [](){ fmt::print("test13 sleep for 1 msec\n"); return sleep(1ms); });
+ auto test_message_to_big = myrpc.register_handler(14, [](sstring payload){ fmt::print("test message to bit, should not get here"); });
+
+ if (config.count("server")) {
+ std::cout << "client" << std::endl;
+ auto test7 = myrpc.make_client<long (long a, long b)>(7);
+ auto test9 = myrpc.make_client<long (long a, long b)>(9); // do not send optional
+ auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional
+ auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects
+ auto test10 = myrpc.make_client<long ()>(10); // receive less then replied
+ auto test10_1 = myrpc.make_client<future<rpc::tuple<long, int>> ()>(10); // receive all
+ auto test11 = myrpc.make_client<future<rpc::tuple<long, rpc::optional<int>>> ()>(11); // receive more then replied
+ auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits
+ auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb
+ auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call
+ rpc::client_options co;
+ if (compress) {
+ co.compressor_factory = &mc;
+ }
+
+ client = std::make_unique<rpc::protocol<serializer>::client>(myrpc, co, ipv4_addr{config["server"].as<std::string>()});
+
+ auto f = test8(*client, 1500ms).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ printf("test8 should not get here!\n");
+ } catch (rpc::timeout_error&) {
+ printf("test8 timeout!\n");
+ }
+ });
+ for (auto i = 0; i < 100; i++) {
+ fmt::print("iteration={:d}\n", i);
+ (void)test1(*client, 5).then([] (){ fmt::print("test1 ended\n");});
+ (void)test2(*client, 1, 2).then([] (int r) { fmt::print("test2 got {:d}\n", r); });
+ (void)test3(*client, x).then([](double x) { fmt::print("sin={:f}\n", x); });
+ (void)test4(*client).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test4 your should not see this!\n");
+ } catch (std::runtime_error& x){
+ fmt::print("test4 {}\n", x.what());
+ }
+ });
+ (void)test5(*client).then([] { fmt::print("test5 no wait ended\n"); });
+ (void)test6(*client, 1).then([] { fmt::print("test6 ended\n"); });
+ (void)test7(*client, 5, 6).then([] (long r) { fmt::print("test7 got {:d}\n", r); });
+ (void)test9(*client, 1, 2).then([] (long r) { fmt::print("test9 got {:d}\n", r); });
+ (void)test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); });
+ (void)test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); });
+ (void)test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); });
+ (void)test10_1(*client).then([] (rpc::tuple<long, int> r) { fmt::print("test10_1 got {:d} and {:d}\n", std::get<0>(r), std::get<1>(r)); });
+ (void)test11(*client).then([] (rpc::tuple<long, rpc::optional<int> > r) { fmt::print("test11 got {:d} and {:d}\n", std::get<0>(r), bool(std::get<1>(r))); });
+ (void)test_nohandler(*client).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test_nohandler your should not see this!\n");
+ } catch (rpc::unknown_verb_error& x){
+ fmt::print("test_nohandle no such verb\n");
+ } catch (...) {
+ fmt::print("incorrect exception!\n");
+ }
+ });
+ (void)test_nohandler_nowait(*client);
+ auto c = make_lw_shared<rpc::cancellable>();
+ (void)test13(*client, *c).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test13 shold not get here\n");
+ } catch(rpc::canceled_error&) {
+ fmt::print("test13 canceled\n");
+ } catch(...) {
+ fmt::print("test13 wrong exception\n");
+ }
+ });
+ c->cancel();
+ (void)test13(*client, *c).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test13 shold not get here\n");
+ } catch(rpc::canceled_error&) {
+ fmt::print("test13 canceled\n");
+ } catch(...) {
+ fmt::print("test13 wrong exception\n");
+ }
+ });
+ (void)sleep(500us).then([c] { c->cancel(); });
+ (void)test_message_to_big(*client, uninitialized_string(10'000'001)).then_wrapped([](future<> f) {
+ try {
+ f.get();
+ fmt::print("test message to big shold not get here\n");
+ } catch(std::runtime_error& err) {
+ fmt::print("test message to big get error {}\n", err.what());
+ } catch(...) {
+ fmt::print("test message to big wrong exception\n");
+ }
+ });
+ }
+ // delay a little for a time-sensitive test
+ (void)sleep(400ms).then([test12] () mutable {
+ // server is configured for 10MB max, throw 25MB worth of requests at it.
+ auto now = rpc::rpc_clock_type::now();
+ return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable {
+ return test12(*client, 100, uninitialized_string(1'000'000)).then([idx, now] {
+ auto later = rpc::rpc_clock_type::now();
+ auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
+ fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count());
+ });
+ }).then([now] {
+ auto later = rpc::rpc_clock_type::now();
+ auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
+ fmt::print("test12 completed after {:d} ms (should be ~300)\n", delta.count());
+ });
+ });
+ (void)f.finally([] {
+ return sleep(1s).then([] {
+ return client->stop().then([] {
+ return engine().exit(0);
+ });
+ });
+ });
+ } else {
+ std::cout << "server on port " << port << std::endl;
+ myrpc.register_handler(7, [](long a, long b) mutable {
+ auto p = make_lw_shared<promise<>>();
+ auto t = make_lw_shared<timer<>>();
+ fmt::print("test7 got {:d} {:d}\n", a, b);
+ auto f = p->get_future().then([a, b, t] {
+ fmt::print("test7 calc res\n");
+ return a - b;
+ });
+ t->set_callback([p = std::move(p)] () mutable { p->set_value(); });
+ t->arm(1s);
+ return f;
+ });
+ myrpc.register_handler(9, [] (long a, long b, rpc::optional<int> c) {
+ long r = 2;
+ fmt::print("test9 got {:d} {:d} ", a, b);
+ if (c) {
+ fmt::print("{:d}", c.value());
+ r++;
+ }
+ fmt::print("\n");
+ return r;
+ });
+ myrpc.register_handler(10, [] {
+ fmt::print("test 10\n");
+ return make_ready_future<rpc::tuple<long, int>>(rpc::tuple<long, int>(1, 2));
+ });
+ myrpc.register_handler(11, [] {
+ fmt::print("test 11\n");
+ return 1ul;
+ });
+ myrpc.register_handler(12, [] (int sleep_ms, sstring payload) {
+ return sleep(std::chrono::milliseconds(sleep_ms)).then([] {
+ return make_ready_future<>();
+ });
+ });
+
+ rpc::resource_limits limits;
+ limits.bloat_factor = 1;
+ limits.basic_request_size = 0;
+ limits.max_memory = 10'000'000;
+ rpc::server_options so;
+ if (compress) {
+ so.compressor_factory = &mc;
+ }
+ server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, so, ipv4_addr{port}, limits);
+ }
+ });
+
+}
diff --git a/src/seastar/demos/scheduling_group_demo.cc b/src/seastar/demos/scheduling_group_demo.cc
new file mode 100644
index 000000000..8e1314c66
--- /dev/null
+++ b/src/seastar/demos/scheduling_group_demo.cc
@@ -0,0 +1,186 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2016 Scylla DB Ltd
+ */
+
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/scheduling.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/core/loop.hh>
+#include <seastar/core/when_all.hh>
+#include <seastar/core/with_scheduling_group.hh>
+#include <seastar/core/condition-variable.hh>
+#include <seastar/util/defer.hh>
+#include <fmt/printf.h>
+#include <chrono>
+#include <cmath>
+#include <boost/range/irange.hpp>
+
+using namespace seastar;
+using namespace std::chrono_literals;
+
+template <typename Func, typename Duration>
+future<>
+compute_intensive_task(Duration duration, unsigned& counter, Func func) {
+ auto end = std::chrono::steady_clock::now() + duration;
+ while (std::chrono::steady_clock::now() < end) {
+ func();
+ }
+ ++counter;
+ return make_ready_future<>();
+}
+
+future<>
+heavy_task(unsigned& counter) {
+ return compute_intensive_task(1ms, counter, [] {
+ static thread_local double x = 1;
+ x = std::exp(x) / 3;
+ });
+}
+
+future<>
+light_task(unsigned& counter) {
+ return compute_intensive_task(100us, counter, [] {
+ static thread_local double x = 0.1;
+ x = std::log(x + 1);
+ });
+}
+
+future<>
+medium_task(unsigned& counter) {
+ return compute_intensive_task(400us, counter, [] {
+ static thread_local double x = 0.1;
+ x = std::cos(x);
+ });
+}
+
+using done_func = std::function<bool ()>;
+
+future<>
+run_compute_intensive_tasks(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) {
+ return seastar::async([task = std::move(task), sg, concurrency, done, &counter] () mutable {
+ while (!done()) {
+ parallel_for_each(boost::irange(0u, concurrency), [task, sg, &counter] (unsigned i) mutable {
+ return with_scheduling_group(sg, [task, &counter] {
+ return task(counter);
+ });
+ }).get();
+ thread::maybe_yield();
+ }
+ });
+}
+
+future<>
+run_compute_intensive_tasks_in_threads(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) {
+ auto attr = seastar::thread_attributes();
+ attr.sched_group = sg;
+ return parallel_for_each(boost::irange(0u, concurrency), [attr, done, &counter, task] (unsigned i) {
+ return seastar::async(attr, [done, &counter, task] {
+ while (!done()) {
+ task(counter).get();
+ thread::maybe_yield();
+ }
+ });
+ });
+}
+
+future<>
+run_with_duty_cycle(float utilization, std::chrono::steady_clock::duration period, done_func done, std::function<future<> (done_func done)> task) {
+ return seastar::async([=] {
+ bool duty_toggle = true;
+ auto t0 = std::chrono::steady_clock::now();
+ condition_variable cv;
+ timer<> tmr_on([&] { duty_toggle = true; cv.signal(); });
+ timer<> tmr_off([&] { duty_toggle = false; });
+ tmr_on.arm(t0, period);
+ tmr_off.arm(t0 + std::chrono::duration_cast<decltype(t0)::duration>(period * utilization), period);
+ auto combined_done = [&] {
+ return done() || !duty_toggle;
+ };
+ while (!done()) {
+ while (!combined_done()) {
+ task(std::cref(combined_done)).get();
+ thread::maybe_yield();
+ }
+ cv.wait([&] {
+ return done() || duty_toggle;
+ }).get();
+ }
+ tmr_on.cancel();
+ tmr_off.cancel();
+ });
+}
+
+#include <fenv.h>
+
+template <typename T>
+auto var_fn(T& var) {
+ return [&var] { return var; };
+}
+
+int main(int ac, char** av) {
+ app_template app;
+ return app.run(ac, av, [] {
+ return seastar::async([] {
+ auto sg100 = seastar::create_scheduling_group("sg100", 100).get0();
+ auto ksg100 = seastar::defer([&] { seastar::destroy_scheduling_group(sg100).get(); });
+ auto sg20 = seastar::create_scheduling_group("sg20", 20).get0();
+ auto ksg20 = seastar::defer([&] { seastar::destroy_scheduling_group(sg20).get(); });
+ auto sg50 = seastar::create_scheduling_group("sg50", 50).get0();
+ auto ksg50 = seastar::defer([&] { seastar::destroy_scheduling_group(sg50).get(); });
+
+ bool done = false;
+ auto end = timer<>([&done] {
+ done = true;
+ });
+
+ end.arm(10s);
+ unsigned ctr100 = 0, ctr20 = 0, ctr50 = 0;
+ fmt::print("running three scheduling groups with 100% duty cycle each:\n");
+ when_all(
+ run_compute_intensive_tasks(sg100, var_fn(done), 5, ctr100, heavy_task),
+ run_compute_intensive_tasks(sg20, var_fn(done), 3, ctr20, light_task),
+ run_compute_intensive_tasks_in_threads(sg50, var_fn(done), 2, ctr50, medium_task)
+ ).get();
+ fmt::print("{:10} {:15} {:10} {:12} {:8}\n", "shares", "task_time (us)", "executed", "runtime (ms)", "vruntime");
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 1000, ctr100, ctr100 * 1000 / 1000, ctr100 * 1000 / 1000 / 100.);
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 20, 100, ctr20, ctr20 * 100 / 1000, ctr20 * 100 / 1000 / 20.);
+ fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 400, ctr50, ctr50 * 400 / 1000, ctr50 * 400 / 1000 / 50.);
+ fmt::print("\n");
+
+ fmt::print("running two scheduling groups with 100%/50% duty cycles (period=1s:\n");
+ unsigned ctr100_2 = 0, ctr50_2 = 0;
+ done = false;
+ end.arm(10s);
+ when_all(
+ run_compute_intensive_tasks(sg50, var_fn(done), 5, ctr50_2, heavy_task),
+ run_with_duty_cycle(0.5, 1s, var_fn(done), [=, &ctr100_2] (done_func done) {
+ return run_compute_intensive_tasks(sg100, done, 4, ctr100_2, heavy_task);
+ })
+ ).get();
+ fmt::print("{:10} {:10} {:15} {:10} {:12} {:8}\n", "shares", "duty", "task_time (us)", "executed", "runtime (ms)", "vruntime");
+ fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 50, 1000, ctr100_2, ctr100_2 * 1000 / 1000, ctr100_2 * 1000 / 1000 / 100.);
+ fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 100, 400, ctr50_2, ctr50_2 * 1000 / 1000, ctr50_2 * 1000 / 1000 / 50.);
+
+ return 0;
+ });
+ });
+}
diff --git a/src/seastar/demos/sharded_parameter_demo.cc b/src/seastar/demos/sharded_parameter_demo.cc
new file mode 100644
index 000000000..da6da789a
--- /dev/null
+++ b/src/seastar/demos/sharded_parameter_demo.cc
@@ -0,0 +1,78 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2020 ScyllaDB
+ */
+
+
+// Demonstration of seastar::sharded_parameter
+
+#include <seastar/core/sharded.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/util/defer.hh>
+#include <cassert>
+
+// This is some service that we wish to run on all shards.
+class service_one {
+ int _capacity = 7;
+public:
+ // Pretend that this int is some important resource.
+ int get_capacity() const { return _capacity; }
+};
+
+// Another service that we run on all shards, that depends on service_one.
+class service_two {
+ int _resource_allocation;
+public:
+ service_two(service_one& s1, int resource_allocation) : _resource_allocation(resource_allocation) {}
+ int get_resource_allocation() const { return _resource_allocation; }
+};
+
+int main(int ac, char** av) {
+ seastar::app_template app;
+ return app.run(ac, av, [&] {
+ // sharded<> setup code is typically run in a seastar::thread
+ return seastar::async([&] {
+
+ // Launch service_one
+ seastar::sharded<service_one> s1;
+ s1.start().get();
+ auto stop_s1 = seastar::defer([&] { s1.stop().get(); });
+
+ auto calculate_half_capacity = [] (service_one& s1) {
+ return s1.get_capacity() / 2;
+ };
+
+ // Launch service_two, passing it per-shard dependencies from s1
+ seastar::sharded<service_two> s2;
+ // Start s2, passing two parameters to service_two's constructor
+ s2.start(
+ // Each service_two instance will get a reference to a service_one instance on the same shard
+ std::ref(s1),
+ // This calculation will be performed on each shard
+ seastar::sharded_parameter(calculate_half_capacity, std::ref(s1))
+ ).get();
+ seastar::defer([&] { s2.stop().get(); });
+
+ s2.invoke_on_all([] (service_two& s2) {
+ assert(s2.get_resource_allocation() == 3);
+ }).get();
+ });
+ });
+}
diff --git a/src/seastar/demos/tcp_demo.cc b/src/seastar/demos/tcp_demo.cc
new file mode 100644
index 000000000..5f2c9f3fe
--- /dev/null
+++ b/src/seastar/demos/tcp_demo.cc
@@ -0,0 +1,75 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/net/ip.hh>
+#include <seastar/net/virtio.hh>
+#include <seastar/net/tcp.hh>
+#include <seastar/core/reactor.hh>
+#include <fmt/printf.h>
+
+using namespace seastar;
+using namespace net;
+
+struct tcp_test {
+ ipv4& inet;
+ using tcp = net::tcp<ipv4_traits>;
+ tcp::listener _listener;
+ struct connection {
+ tcp::connection tcp_conn;
+ explicit connection(tcp::connection tc) : tcp_conn(std::move(tc)) {}
+ void run() {
+ // Read packets and echo back in the background.
+ (void)tcp_conn.wait_for_data().then([this] {
+ auto p = tcp_conn.read();
+ if (!p.len()) {
+ tcp_conn.close_write();
+ return;
+ }
+ fmt::print("read {:d} bytes\n", p.len());
+ (void)tcp_conn.send(std::move(p));
+ run();
+ });
+ }
+ };
+ tcp_test(ipv4& inet) : inet(inet), _listener(inet.get_tcp().listen(10000)) {}
+ void run() {
+ // Run all connections in the background.
+ (void)_listener.accept().then([this] (tcp::connection conn) {
+ (new connection(std::move(conn)))->run();
+ run();
+ });
+ }
+};
+
+int main(int ac, char** av) {
+ boost::program_options::variables_map opts;
+ opts.insert(std::make_pair("tap-device", boost::program_options::variable_value(std::string("tap0"), false)));
+
+ auto vnet = create_virtio_net_device(opts);
+ interface netif(std::move(vnet));
+ ipv4 inet(&netif);
+ inet.set_host_address(ipv4_address("192.168.122.2"));
+ tcp_test tt(inet);
+ (void)engine().when_started().then([&tt] { tt.run(); });
+ engine().run();
+}
+
+
diff --git a/src/seastar/demos/tcp_sctp_client_demo.cc b/src/seastar/demos/tcp_sctp_client_demo.cc
new file mode 100644
index 000000000..4e46a5658
--- /dev/null
+++ b/src/seastar/demos/tcp_sctp_client_demo.cc
@@ -0,0 +1,278 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <iostream>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/print.hh>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+static int rx_msg_size = 4 * 1024;
+static int tx_msg_total_size = 100 * 1024 * 1024;
+static int tx_msg_size = 4 * 1024;
+static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
+static std::string str_txbuf(tx_msg_size, 'X');
+
+class client;
+distributed<client> clients;
+
+transport protocol = transport::TCP;
+
+class client {
+private:
+ static constexpr unsigned _pings_per_connection = 10000;
+ unsigned _total_pings;
+ unsigned _concurrent_connections;
+ ipv4_addr _server_addr;
+ std::string _test;
+ lowres_clock::time_point _earliest_started;
+ lowres_clock::time_point _latest_finished;
+ size_t _processed_bytes;
+ unsigned _num_reported;
+public:
+ class connection {
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ size_t _bytes_read = 0;
+ size_t _bytes_write = 0;
+ public:
+ connection(connected_socket&& fd)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output()) {}
+
+ future<> do_read() {
+ return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
+ _bytes_read += buf.size();
+ if (buf.size() == 0) {
+ return make_ready_future();
+ } else {
+ return do_read();
+ }
+ });
+ }
+
+ future<> do_write(int end) {
+ if (end == 0) {
+ return make_ready_future();
+ }
+ return _write_buf.write(str_txbuf).then([this] {
+ _bytes_write += tx_msg_size;
+ return _write_buf.flush();
+ }).then([this, end] {
+ return do_write(end - 1);
+ });
+ }
+
+ future<> ping(int times) {
+ return _write_buf.write("ping").then([this] {
+ return _write_buf.flush();
+ }).then([this, times] {
+ return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) {
+ if (buf.size() != 4) {
+ fprint(std::cerr, "illegal packet received: %d\n", buf.size());
+ return make_ready_future();
+ }
+ auto str = std::string(buf.get(), buf.size());
+ if (str != "pong") {
+ fprint(std::cerr, "illegal packet received: %d\n", buf.size());
+ return make_ready_future();
+ }
+ if (times > 0) {
+ return ping(times - 1);
+ } else {
+ return make_ready_future();
+ }
+ });
+ });
+ }
+
+ future<size_t> rxrx() {
+ return _write_buf.write("rxrx").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return do_write(tx_msg_nr).then([this] {
+ return _write_buf.close();
+ }).then([this] {
+ return make_ready_future<size_t>(_bytes_write);
+ });
+ });
+ }
+
+ future<size_t> txtx() {
+ return _write_buf.write("txtx").then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return do_read().then([this] {
+ return make_ready_future<size_t>(_bytes_read);
+ });
+ });
+ }
+ };
+
+ future<> ping_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->ping(_pings_per_connection).then([started] {
+ auto finished = lowres_clock::now();
+ (void)clients.invoke_on(0, &client::ping_report, started, finished);
+ });
+ }
+
+ future<> rxrx_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->rxrx().then([started] (size_t bytes) {
+ auto finished = lowres_clock::now();
+ (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
+ });
+ }
+
+ future<> txtx_test(connection *conn) {
+ auto started = lowres_clock::now();
+ return conn->txtx().then([started] (size_t bytes) {
+ auto finished = lowres_clock::now();
+ (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes);
+ });
+ }
+
+ void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) {
+ if (_earliest_started > started)
+ _earliest_started = started;
+ if (_latest_finished < finished)
+ _latest_finished = finished;
+ if (++_num_reported == _concurrent_connections) {
+ auto elapsed = _latest_finished - _earliest_started;
+ auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
+ auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
+ fprint(std::cout, "========== ping ============\n");
+ fprint(std::cout, "Server: %s\n", _server_addr);
+ fprint(std::cout,"Connections: %u\n", _concurrent_connections);
+ fprint(std::cout, "Total PingPong: %u\n", _total_pings);
+ fprint(std::cout, "Total Time(Secs): %f\n", secs);
+ fprint(std::cout, "Requests/Sec: %f\n",
+ static_cast<double>(_total_pings) / secs);
+ (void)clients.stop().then([] {
+ engine().exit(0);
+ });
+ }
+ }
+
+ void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) {
+ if (_earliest_started > started)
+ _earliest_started = started;
+ if (_latest_finished < finished)
+ _latest_finished = finished;
+ _processed_bytes += bytes;
+ if (++_num_reported == _concurrent_connections) {
+ auto elapsed = _latest_finished - _earliest_started;
+ auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
+ auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000);
+ fprint(std::cout, "========== %s ============\n", _test);
+ fprint(std::cout, "Server: %s\n", _server_addr);
+ fprint(std::cout, "Connections: %u\n", _concurrent_connections);
+ fprint(std::cout, "Bytes Received(MiB): %u\n", _processed_bytes/1024/1024);
+ fprint(std::cout, "Total Time(Secs): %f\n", secs);
+ fprint(std::cout, "Bandwidth(Gbits/Sec): %f\n",
+ static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs);
+ (void)clients.stop().then([] {
+ engine().exit(0);
+ });
+ }
+ }
+
+ future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) {
+ _server_addr = server_addr;
+ _concurrent_connections = ncon * smp::count;
+ _total_pings = _pings_per_connection * _concurrent_connections;
+ _test = test;
+
+ for (unsigned i = 0; i < ncon; i++) {
+ socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}});
+ (void)connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) {
+ auto conn = new connection(std::move(fd));
+ (void)(this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) {
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ fprint(std::cerr, "request error: %s\n", ex.what());
+ }
+ });
+ });
+ }
+ return make_ready_future();
+ }
+ future<> stop() {
+ return make_ready_future();
+ }
+
+ typedef future<> (client::*test_fn)(connection *conn);
+ static const std::map<std::string, test_fn> tests;
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ app_template app;
+ app.add_options()
+ ("server", bpo::value<std::string>()->required(), "Server address")
+ ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)")
+ ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu")
+ ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp")
+ ;
+
+ return app.run_deprecated(ac, av, [&app] {
+ auto&& config = app.configuration();
+ auto server = config["server"].as<std::string>();
+ auto test = config["test"].as<std::string>();
+ auto ncon = config["conn"].as<unsigned>();
+ auto proto = config["proto"].as<std::string>();
+
+ if (proto == "tcp") {
+ protocol = transport::TCP;
+ } else if (proto == "sctp") {
+ protocol = transport::SCTP;
+ } else {
+ fprint(std::cerr, "Error: --proto=tcp|sctp\n");
+ return engine().exit(1);
+ }
+
+ if (!client::tests.count(test)) {
+ fprint(std::cerr, "Error: -test=ping | rxrx | txtx\n");
+ return engine().exit(1);
+ }
+
+ (void)clients.start().then([server, test, ncon] () {
+ return clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon);
+ });
+ });
+}
+
+const std::map<std::string, client::test_fn> client::tests = {
+ {"ping", &client::ping_test},
+ {"rxrx", &client::rxrx_test},
+ {"txtx", &client::txtx_test},
+};
+
diff --git a/src/seastar/demos/tcp_sctp_server_demo.cc b/src/seastar/demos/tcp_sctp_server_demo.cc
new file mode 100644
index 000000000..ce0b18654
--- /dev/null
+++ b/src/seastar/demos/tcp_sctp_server_demo.cc
@@ -0,0 +1,205 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2014 Cloudius Systems
+ */
+
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/temporary_buffer.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/print.hh>
+#include <vector>
+#include <iostream>
+
+using namespace seastar;
+
+static std::string str_ping{"ping"};
+static std::string str_txtx{"txtx"};
+static std::string str_rxrx{"rxrx"};
+static std::string str_pong{"pong"};
+static std::string str_unknow{"unknow cmd"};
+static int tx_msg_total_size = 100 * 1024 * 1024;
+static int tx_msg_size = 4 * 1024;
+static int tx_msg_nr = tx_msg_total_size / tx_msg_size;
+static int rx_msg_size = 4 * 1024;
+static std::string str_txbuf(tx_msg_size, 'X');
+static bool enable_tcp = false;
+static bool enable_sctp = false;
+
+class tcp_server {
+ std::vector<server_socket> _tcp_listeners;
+ std::vector<server_socket> _sctp_listeners;
+public:
+ future<> listen(ipv4_addr addr) {
+ if (enable_tcp) {
+ listen_options lo;
+ lo.proto = transport::TCP;
+ lo.reuse_address = true;
+ _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo));
+ do_accepts(_tcp_listeners);
+ }
+
+ if (enable_sctp) {
+ listen_options lo;
+ lo.proto = transport::SCTP;
+ lo.reuse_address = true;
+ _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo));
+ do_accepts(_sctp_listeners);
+ }
+ return make_ready_future<>();
+ }
+
+ // FIXME: We should properly tear down the service here.
+ future<> stop() {
+ return make_ready_future<>();
+ }
+
+ void do_accepts(std::vector<server_socket>& listeners) {
+ int which = listeners.size() - 1;
+ // Accept in the background.
+ (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable {
+ connected_socket fd = std::move(ar.connection);
+ socket_address addr = std::move(ar.remote_address);
+ auto conn = new connection(*this, std::move(fd), addr);
+ (void)conn->process().then_wrapped([conn] (auto&& f) {
+ delete conn;
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ std::cout << "request error " << ex.what() << "\n";
+ }
+ });
+ do_accepts(listeners);
+ }).then_wrapped([] (auto&& f) {
+ try {
+ f.get();
+ } catch (std::exception& ex) {
+ std::cout << "accept failed: " << ex.what() << "\n";
+ }
+ });
+ }
+ class connection {
+ connected_socket _fd;
+ input_stream<char> _read_buf;
+ output_stream<char> _write_buf;
+ public:
+ connection(tcp_server& server, connected_socket&& fd, socket_address addr)
+ : _fd(std::move(fd))
+ , _read_buf(_fd.input())
+ , _write_buf(_fd.output()) {}
+ future<> process() {
+ return read();
+ }
+ future<> read() {
+ if (_read_buf.eof()) {
+ return make_ready_future();
+ }
+ // Expect 4 bytes cmd from client
+ size_t n = 4;
+ return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) {
+ if (buf.size() == 0) {
+ return make_ready_future();
+ }
+ auto cmd = std::string(buf.get(), buf.size());
+ // pingpong test
+ if (cmd == str_ping) {
+ return _write_buf.write(str_pong).then([this] {
+ return _write_buf.flush();
+ }).then([this] {
+ return this->read();
+ });
+ // server tx test
+ } else if (cmd == str_txtx) {
+ return tx_test();
+ // server tx test
+ } else if (cmd == str_rxrx) {
+ return rx_test();
+ // unknow test
+ } else {
+ return _write_buf.write(str_unknow).then([this] {
+ return _write_buf.flush();
+ }).then([] {
+ return make_ready_future();
+ });
+ }
+ });
+ }
+ future<> do_write(int end) {
+ if (end == 0) {
+ return make_ready_future<>();
+ }
+ return _write_buf.write(str_txbuf).then([this] {
+ return _write_buf.flush();
+ }).then([this, end] {
+ return do_write(end - 1);
+ });
+ }
+ future<> tx_test() {
+ return do_write(tx_msg_nr).then([this] {
+ return _write_buf.close();
+ }).then([] {
+ return make_ready_future<>();
+ });
+ }
+ future<> do_read() {
+ return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) {
+ if (buf.size() == 0) {
+ return make_ready_future();
+ } else {
+ return do_read();
+ }
+ });
+ }
+ future<> rx_test() {
+ return do_read().then([] {
+ return make_ready_future<>();
+ });
+ }
+ };
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port")
+ ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen")
+ ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ enable_tcp = config["tcp"].as<std::string>() == "yes";
+ enable_sctp = config["sctp"].as<std::string>() == "yes";
+ if (!enable_tcp && !enable_sctp) {
+ fprint(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n");
+ return engine().exit(1);
+ }
+ auto server = new distributed<tcp_server>;
+ (void)server->start().then([server = std::move(server), port] () mutable {
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ // Start listening in the background.
+ (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port});
+ }).then([port] {
+ std::cout << "Seastar TCP server listening on port " << port << " ...\n";
+ });
+ });
+}
diff --git a/src/seastar/demos/tls_echo_server.hh b/src/seastar/demos/tls_echo_server.hh
new file mode 100644
index 000000000..e7185dac9
--- /dev/null
+++ b/src/seastar/demos/tls_echo_server.hh
@@ -0,0 +1,121 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2017 ScyllaDB
+ */
+#pragma once
+
+#include <seastar/core/sstring.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/sharded.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/loop.hh>
+#include <seastar/net/tls.hh>
+#include <seastar/util/log.hh>
+#include <iostream>
+
+using namespace seastar;
+
+struct streams {
+ connected_socket s;
+ input_stream<char> in;
+ output_stream<char> out;
+
+ streams(connected_socket cs) : s(std::move(cs)), in(s.input()), out(s.output())
+ {}
+};
+
+class echoserver {
+ server_socket _socket;
+ shared_ptr<tls::server_credentials> _certs;
+ seastar::gate _gate;
+ bool _stopped = false;
+ bool _verbose = false;
+public:
+ echoserver(bool verbose = false)
+ : _certs(make_shared<tls::server_credentials>(make_shared<tls::dh_params>()))
+ , _verbose(verbose)
+ {}
+
+ future<> listen(socket_address addr, sstring crtfile, sstring keyfile, tls::client_auth ca = tls::client_auth::NONE) {
+ _certs->set_client_auth(ca);
+ return _certs->set_x509_key_file(crtfile, keyfile, tls::x509_crt_format::PEM).then([this, addr] {
+ ::listen_options opts;
+ opts.reuse_address = true;
+
+ _socket = tls::listen(_certs, addr, opts);
+
+ // Listen in background.
+ (void)repeat([this] {
+ if (_stopped) {
+ return make_ready_future<stop_iteration>(stop_iteration::yes);
+ }
+ return with_gate(_gate, [this] {
+ return _socket.accept().then([this](accept_result ar) {
+ ::connected_socket s = std::move(ar.connection);
+ socket_address a = std::move(ar.remote_address);
+ if (_verbose) {
+ std::cout << "Got connection from "<< a << std::endl;
+ }
+ auto strms = make_lw_shared<streams>(std::move(s));
+ return repeat([strms, this]() {
+ return strms->in.read().then([this, strms](temporary_buffer<char> buf) {
+ if (buf.empty()) {
+ if (_verbose) {
+ std::cout << "EOM" << std::endl;
+ }
+ return make_ready_future<stop_iteration>(stop_iteration::yes);
+ }
+ sstring tmp(buf.begin(), buf.end());
+ if (_verbose) {
+ std::cout << "Read " << tmp.size() << "B" << std::endl;
+ }
+ return strms->out.write(tmp).then([strms]() {
+ return strms->out.flush();
+ }).then([] {
+ return make_ready_future<stop_iteration>(stop_iteration::no);
+ });
+ });
+ }).then([strms]{
+ return strms->out.close();
+ }).handle_exception([](auto ep) {
+ }).finally([this, strms]{
+ if (_verbose) {
+ std::cout << "Ending session" << std::endl;
+ }
+ return strms->in.close();
+ });
+ }).handle_exception([this](auto ep) {
+ if (!_stopped) {
+ std::cerr << "Error: " << ep << std::endl;
+ }
+ }).then([this] {
+ return make_ready_future<stop_iteration>(_stopped ? stop_iteration::yes : stop_iteration::no);
+ });
+ });
+ });
+ return make_ready_future();
+ });
+ }
+
+ future<> stop() {
+ _stopped = true;
+ _socket.abort_accept();
+ return _gate.close();
+ }
+};
diff --git a/src/seastar/demos/tls_echo_server_demo.cc b/src/seastar/demos/tls_echo_server_demo.cc
new file mode 100644
index 000000000..222412019
--- /dev/null
+++ b/src/seastar/demos/tls_echo_server_demo.cc
@@ -0,0 +1,67 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/net/dns.hh>
+#include "tls_echo_server.hh"
+
+using namespace seastar;
+namespace bpo = boost::program_options;
+
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "Server port")
+ ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Server address")
+ ("cert,c", bpo::value<std::string>()->required(), "Server certificate file")
+ ("key,k", bpo::value<std::string>()->required(), "Certificate key")
+ ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose")
+ ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto crt = config["cert"].as<std::string>();
+ auto key = config["key"].as<std::string>();
+ auto addr = config["address"].as<std::string>();
+ auto verbose = config["verbose"].as<bool>();
+
+ std::cout << "Starting..." << std::endl;
+ return net::dns::resolve_name(addr).then([=](net::inet_address a) {
+ ipv4_addr ia(a, port);
+
+ auto server = ::make_shared<seastar::sharded<echoserver>>();
+ return server->start(verbose).then([=]() {
+ return server->invoke_on_all(&echoserver::listen, socket_address(ia), sstring(crt), sstring(key), tls::client_auth::NONE);
+ }).handle_exception([=](auto e) {
+ std::cerr << "Error: " << e << std::endl;
+ engine().exit(1);
+ }).then([=] {
+ std::cout << "TLS echo server running at " << addr << ":" << port << std::endl;
+ engine().at_exit([server] {
+ return server->stop().finally([server] {});
+ });
+ });
+ });
+ });
+}
diff --git a/src/seastar/demos/tls_simple_client_demo.cc b/src/seastar/demos/tls_simple_client_demo.cc
new file mode 100644
index 000000000..d53461b0e
--- /dev/null
+++ b/src/seastar/demos/tls_simple_client_demo.cc
@@ -0,0 +1,133 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright 2015 Cloudius Systems
+ */
+#include <cmath>
+
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/loop.hh>
+#include <seastar/net/dns.hh>
+#include "tls_echo_server.hh"
+
+using namespace seastar;
+namespace bpo = boost::program_options;
+
+
+int main(int ac, char** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "Remote port")
+ ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Remote address")
+ ("trust,t", bpo::value<std::string>(), "Trust store")
+ ("msg,m", bpo::value<std::string>(), "Message to send")
+ ("bytes,b", bpo::value<size_t>()->default_value(512), "Use random bytes of length as message")
+ ("iterations,i", bpo::value<size_t>()->default_value(1), "Repeat X times")
+ ("read-response,r", bpo::value<bool>()->default_value(true)->implicit_value(true), "Read echoed message")
+ ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose operation")
+ ("check-name,c", bpo::value<bool>()->default_value(false)->implicit_value(true), "Check server name")
+ ("server-name,s", bpo::value<std::string>(), "Expected server name")
+ ;
+
+
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto addr = config["address"].as<std::string>();
+ auto n = config["bytes"].as<size_t>();
+ auto i = config["iterations"].as<size_t>();
+ auto do_read = config["read-response"].as<bool>();
+ auto verbose = config["verbose"].as<bool>();
+ auto check = config["check-name"].as<bool>();
+
+ std::cout << "Starting..." << std::endl;
+
+ auto certs = ::make_shared<tls::certificate_credentials>();
+ auto f = make_ready_future();
+
+ if (config.count("trust")) {
+ f = certs->set_x509_trust_file(config["trust"].as<std::string>(), tls::x509_crt_format::PEM);
+ }
+
+ seastar::shared_ptr<sstring> msg;
+
+ if (config.count("msg")) {
+ msg = seastar::make_shared<sstring>(config["msg"].as<std::string>());
+ } else {
+ msg = seastar::make_shared<sstring>(uninitialized_string(n));
+ for (size_t i = 0; i < n; ++i) {
+ (*msg)[i] = '0' + char(::rand() % 30);
+ }
+ }
+
+ sstring server_name;
+ if (config.count("server-name")) {
+ server_name = config["server-name"].as<std::string>();
+ }
+ if (verbose) {
+ std::cout << "Msg (" << msg->size() << "B):" << std::endl << *msg << std::endl;
+ }
+ return f.then([=]() {
+ return net::dns::get_host_by_name(addr).then([=](net::hostent e) {
+ ipv4_addr ia(e.addr_list.front(), port);
+
+ sstring name;
+ if (check) {
+ name = server_name.empty() ? e.names.front() : server_name;
+ }
+ return tls::connect(certs, ia, name).then([=](::connected_socket s) {
+ auto strms = ::make_lw_shared<streams>(std::move(s));
+ auto range = boost::irange(size_t(0), i);
+ return do_for_each(range, [=](auto) {
+ auto f = strms->out.write(*msg);
+ if (!do_read) {
+ return strms->out.close().then([f = std::move(f)]() mutable {
+ return std::move(f);
+ });
+ }
+ return f.then([=]() {
+ return strms->out.flush().then([=] {
+ return strms->in.read_exactly(msg->size()).then([=](temporary_buffer<char> buf) {
+ sstring tmp(buf.begin(), buf.end());
+ if (tmp != *msg) {
+ std::cerr << "Got garbled message!" << std::endl;
+ if (verbose) {
+ std::cout << "Got (" << tmp.size() << ") :" << std::endl << tmp << std::endl;
+ }
+ throw std::runtime_error("Got garbled message!");
+ }
+ });
+ });
+ });
+ }).then([strms, do_read]{
+ return do_read ? strms->out.close() : make_ready_future<>();
+ }).finally([strms]{
+ return strms->in.close();
+ });
+ });
+ }).handle_exception([](auto ep) {
+ std::cerr << "Error: " << ep << std::endl;
+ });
+ }).finally([] {
+ engine().exit(0);
+ });
+ });
+}
diff --git a/src/seastar/demos/tutorial_examples.cc b/src/seastar/demos/tutorial_examples.cc
new file mode 100644
index 000000000..0f67f5fd8
--- /dev/null
+++ b/src/seastar/demos/tutorial_examples.cc
@@ -0,0 +1,117 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2020 ScyllaDB.
+ */
+
+#include <iostream>
+
+#include <seastar/core/seastar.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/net/api.hh>
+
+seastar::future<> service_loop() {
+ return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})),
+ [] (auto& listener) {
+ return seastar::keep_doing([&listener] () {
+ return listener.accept().then(
+ [] (seastar::accept_result res) {
+ std::cout << "Accepted connection from " << res.remote_address << "\n";
+ });
+ });
+ });
+}
+
+const char* canned_response = "Seastar is the future!\n";
+
+seastar::future<> service_loop_2() {
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
+ [] (auto& listener) {
+ return seastar::keep_doing([&listener] () {
+ return listener.accept().then(
+ [] (seastar::accept_result res) {
+ auto s = std::move(res.connection);
+ auto out = s.output();
+ return seastar::do_with(std::move(s), std::move(out),
+ [] (auto& s, auto& out) {
+ return out.write(canned_response).then([&out] {
+ return out.close();
+ });
+ });
+ });
+ });
+ });
+}
+
+seastar::future<> handle_connection_3(seastar::connected_socket s,
+ seastar::socket_address a) {
+ auto out = s.output();
+ auto in = s.input();
+ return do_with(std::move(s), std::move(out), std::move(in),
+ [] (auto& s, auto& out, auto& in) {
+ return seastar::repeat([&out, &in] {
+ return in.read().then([&out] (auto buf) {
+ if (buf) {
+ return out.write(std::move(buf)).then([&out] {
+ return out.flush();
+ }).then([] {
+ return seastar::stop_iteration::no;
+ });
+ } else {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ });
+ }).then([&out] {
+ return out.close();
+ });
+ });
+}
+
+seastar::future<> service_loop_3() {
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo),
+ [] (auto& listener) {
+ return seastar::keep_doing([&listener] () {
+ return listener.accept().then(
+ [] (seastar::accept_result res) {
+ // Note we ignore, not return, the future returned by
+ // handle_connection(), so we do not wait for one
+ // connection to be handled before accepting the next one.
+ (void)handle_connection_3(std::move(res.connection), std::move(res.remote_address)).handle_exception(
+ [] (std::exception_ptr ep) {
+ fmt::print(stderr, "Could not handle connection: {}\n", ep);
+ });
+ });
+ });
+ });
+}
+
+#include <seastar/core/app-template.hh>
+
+int main(int ac, char** av) {
+ seastar::app_template app;
+ return app.run(ac, av, [] {
+ std::cout << "This is the tutorial examples demo. It is not running anything but rather makes sure the tutorial examples compile" << std::endl;
+ return seastar::make_ready_future<>();
+ });
+}
diff --git a/src/seastar/demos/udp_client_demo.cc b/src/seastar/demos/udp_client_demo.cc
new file mode 100644
index 000000000..016b03c21
--- /dev/null
+++ b/src/seastar/demos/udp_client_demo.cc
@@ -0,0 +1,89 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/net/api.hh>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+class client {
+private:
+ udp_channel _chan;
+ uint64_t n_sent {};
+ uint64_t n_received {};
+ uint64_t n_failed {};
+ timer<> _stats_timer;
+public:
+ void start(ipv4_addr server_addr) {
+ std::cout << "Sending to " << server_addr << std::endl;
+
+ _chan = make_udp_channel();
+
+ _stats_timer.set_callback([this] {
+ std::cout << "Out: " << n_sent << " pps, \t";
+ std::cout << "Err: " << n_failed << " pps, \t";
+ std::cout << "In: " << n_received << " pps" << std::endl;
+ n_sent = 0;
+ n_received = 0;
+ n_failed = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ // Run sender in background.
+ (void)keep_doing([this, server_addr] {
+ return _chan.send(server_addr, "hello!\n")
+ .then_wrapped([this] (auto&& f) {
+ try {
+ f.get();
+ n_sent++;
+ } catch (...) {
+ n_failed++;
+ }
+ });
+ });
+
+ // Run receiver in background.
+ (void)keep_doing([this] {
+ return _chan.receive().then([this] (auto) {
+ n_received++;
+ });
+ });
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ client _client;
+ app_template app;
+ app.add_options()
+ ("server", bpo::value<std::string>(), "Server address")
+ ;
+ return app.run_deprecated(ac, av, [&_client, &app] {
+ auto&& config = app.configuration();
+ _client.start(config["server"].as<std::string>());
+ });
+}
diff --git a/src/seastar/demos/udp_server_demo.cc b/src/seastar/demos/udp_server_demo.cc
new file mode 100644
index 000000000..ec25cce5d
--- /dev/null
+++ b/src/seastar/demos/udp_server_demo.cc
@@ -0,0 +1,82 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <iostream>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/app-template.hh>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+
+class udp_server {
+private:
+ udp_channel _chan;
+ timer<> _stats_timer;
+ uint64_t _n_sent {};
+public:
+ void start(uint16_t port) {
+ ipv4_addr listen_addr{port};
+ _chan = make_udp_channel(listen_addr);
+
+ _stats_timer.set_callback([this] {
+ std::cout << "Out: " << _n_sent << " pps" << std::endl;
+ _n_sent = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ // Run server in background.
+ (void)keep_doing([this] {
+ return _chan.receive().then([this] (udp_datagram dgram) {
+ return _chan.send(dgram.get_src(), std::move(dgram.get_data())).then([this] {
+ _n_sent++;
+ });
+ });
+ });
+ }
+ // FIXME: we should properly tear down the service here.
+ future<> stop() {
+ return make_ready_future<>();
+ }
+};
+
+namespace bpo = boost::program_options;
+
+int main(int ac, char ** av) {
+ app_template app;
+ app.add_options()
+ ("port", bpo::value<uint16_t>()->default_value(10000), "UDP server port") ;
+ return app.run_deprecated(ac, av, [&] {
+ auto&& config = app.configuration();
+ uint16_t port = config["port"].as<uint16_t>();
+ auto server = new distributed<udp_server>;
+ // Run server in background.
+ (void)server->start().then([server = std::move(server), port] () mutable {
+ engine().at_exit([server] {
+ return server->stop();
+ });
+ return server->invoke_on_all(&udp_server::start, port);
+ }).then([port] {
+ std::cout << "Seastar UDP server listening on port " << port << " ...\n";
+ });
+ });
+}
diff --git a/src/seastar/demos/udp_zero_copy_demo.cc b/src/seastar/demos/udp_zero_copy_demo.cc
new file mode 100644
index 000000000..a0d7733a3
--- /dev/null
+++ b/src/seastar/demos/udp_zero_copy_demo.cc
@@ -0,0 +1,153 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/seastar.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/scattered_message.hh>
+#include <seastar/core/vector-data-sink.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/units.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/net/api.hh>
+#include <random>
+#include <iomanip>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+namespace bpo = boost::program_options;
+
+template <typename Duration>
+typename Duration::rep to_seconds(Duration d) {
+ return std::chrono::duration_cast<std::chrono::seconds>(d).count();
+}
+
+class server {
+private:
+ udp_channel _chan;
+ timer<> _stats_timer;
+ uint64_t _n_sent {};
+ size_t _chunk_size;
+ bool _copy;
+ std::vector<packet> _packets;
+ std::unique_ptr<output_stream<char>> _out;
+ steady_clock_type::time_point _last;
+ sstring _key;
+ size_t _packet_size = 8*KB;
+ char* _mem;
+ size_t _mem_size;
+ std::mt19937 _rnd;
+ std::random_device _randem_dev;
+ std::uniform_int_distribution<size_t> _chunk_distribution;
+private:
+ char* next_chunk() {
+ return _mem + _chunk_distribution(_rnd);
+ }
+public:
+ server()
+ : _rnd(std::random_device()()) {
+ }
+ future<> send(ipv4_addr dst, packet p) {
+ return _chan.send(dst, std::move(p)).then([this] {
+ _n_sent++;
+ });
+ }
+ void start(int chunk_size, bool copy, size_t mem_size) {
+ ipv4_addr listen_addr{10000};
+ _chan = make_udp_channel(listen_addr);
+
+ std::cout << "Listening on " << listen_addr << std::endl;
+
+ _last = steady_clock_type::now();
+ _stats_timer.set_callback([this] {
+ auto now = steady_clock_type::now();
+ std::cout << "Out: "
+ << std::setprecision(2) << std::fixed
+ << (double)_n_sent / to_seconds(now - _last)
+ << " pps" << std::endl;
+ _last = now;
+ _n_sent = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ _chunk_size = chunk_size;
+ _copy = copy;
+ _key = sstring(new char[64], 64);
+
+ _out = std::make_unique<output_stream<char>>(
+ data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size);
+
+ _mem = new char[mem_size];
+ _mem_size = mem_size;
+
+ _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3);
+
+ assert(3 * _chunk_size <= _packet_size);
+
+ // Run sender in background.
+ (void)keep_doing([this] {
+ return _chan.receive().then([this] (udp_datagram dgram) {
+ auto chunk = next_chunk();
+ lw_shared_ptr<sstring> item;
+ if (_copy) {
+ _packets.clear();
+ // FIXME: future is discarded
+ (void)_out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ (void)_out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ (void)_out->write(chunk, _chunk_size);
+ (void)_out->flush();
+ assert(_packets.size() == 1);
+ return send(dgram.get_src(), std::move(_packets[0]));
+ } else {
+ auto chunk = next_chunk();
+ scattered_message<char> msg;
+ msg.reserve(3);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ return send(dgram.get_src(), std::move(msg).release());
+ }
+ });
+ });
+ }
+};
+
+int main(int ac, char ** av) {
+ server s;
+ app_template app;
+ app.add_options()
+ ("chunk-size", bpo::value<int>()->default_value(1024),
+ "Chunk size")
+ ("mem-size", bpo::value<int>()->default_value(512),
+ "Memory pool size in MiB")
+ ("copy", "Copy data rather than send via zero-copy")
+ ;
+ return app.run_deprecated(ac, av, [&app, &s] {
+ auto&& config = app.configuration();
+ auto chunk_size = config["chunk-size"].as<int>();
+ auto mem_size = (size_t)config["mem-size"].as<int>() * MB;
+ auto copy = config.count("copy");
+ s.start(chunk_size, copy, mem_size);
+ });
+}