diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/seastar/demos | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/demos')
24 files changed, 2910 insertions, 0 deletions
diff --git a/src/seastar/demos/CMakeLists.txt b/src/seastar/demos/CMakeLists.txt new file mode 100644 index 000000000..d24cb97a7 --- /dev/null +++ b/src/seastar/demos/CMakeLists.txt @@ -0,0 +1,124 @@ +# +# This file is open source software, licensed to you under the terms +# of the Apache License, Version 2.0 (the "License"). See the NOTICE file +# distributed with this work for additional information regarding copyright +# ownership. You may not use this file except in compliance with the License. +# +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Copyright (C) 2018 Scylladb, Ltd. +# + +# Logical target for all demos. +add_custom_target (demos) + +macro (seastar_add_demo name) + set (args ${ARGN}) + + cmake_parse_arguments ( + parsed_args + "" + "" + "SOURCES" + ${args}) + + set (target demo_${name}) + add_executable (${target} ${parsed_args_SOURCES}) + + target_include_directories (${target} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) + + target_link_libraries (${target} + PRIVATE + Boost::program_options + seastar_private) + + set_target_properties (${target} + PROPERTIES + OUTPUT_NAME ${name}_demo) + + add_dependencies (demos ${target}) +endmacro () + +seastar_add_demo (block_discard + SOURCES block_discard_demo.cc) + +if (${Seastar_API_LEVEL} GREATER_EQUAL 3) + seastar_add_demo (coroutines + SOURCES coroutines_demo.cc) +endif () + +seastar_add_demo (hello-world + SOURCES hello-world.cc) + +seastar_add_demo (websocket + SOURCES websocket_demo.cc) + +seastar_add_demo (echo + SOURCES echo_demo.cc) + +seastar_add_demo (ip + SOURCES ip_demo.cc) + +seastar_add_demo (line_count + SOURCES line_count_demo.cc) + +seastar_add_demo (l3 + SOURCES l3_demo.cc) + +seastar_add_demo (rpc + SOURCES rpc_demo.cc) + +seastar_add_demo (scheduling_group + SOURCES scheduling_group_demo.cc) + +seastar_add_demo (tcp + SOURCES tcp_demo.cc) + +seastar_add_demo (tcp_sctp_client + SOURCES tcp_sctp_client_demo.cc) + +seastar_add_demo (tcp_sctp_server + SOURCES tcp_sctp_server_demo.cc) + +seastar_add_demo (tls_echo_server + SOURCES + tls_echo_server.hh + tls_echo_server_demo.cc) + +seastar_add_demo (tls_simple_client + SOURCES + tls_echo_server.hh + tls_simple_client_demo.cc) + +seastar_add_demo (udp_client + SOURCES udp_client_demo.cc) + +seastar_add_demo (udp_server + SOURCES udp_server_demo.cc) + +seastar_add_demo (udp_zero_copy + SOURCES udp_zero_copy_demo.cc) + +seastar_add_demo (sharded_parameter + SOURCES sharded_parameter_demo.cc) + +seastar_add_demo (file + SOURCES file_demo.cc) + +seastar_add_demo (tutorial_examples + SOURCES tutorial_examples.cc) + +seastar_add_demo (http_client + SOURCES http_client_demo.cc) diff --git a/src/seastar/demos/block_discard_demo.cc b/src/seastar/demos/block_discard_demo.cc new file mode 100644 index 000000000..b14c5ed01 --- /dev/null +++ b/src/seastar/demos/block_discard_demo.cc @@ -0,0 +1,74 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <algorithm> +#include <seastar/core/app-template.hh> +#include <seastar/core/file.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/semaphore.hh> +#include <iostream> + +using namespace seastar; + +namespace bpo = boost::program_options; + +struct file_test { + file_test(file&& f) : f(std::move(f)) {} + file f; + semaphore sem = { 0 }; +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("dev", bpo::value<std::string>(), "e.g. --dev /dev/sdb") + ; + + return app.run_deprecated(ac, av, [&app] { + static constexpr auto max = 10000; + auto&& config = app.configuration(); + auto filepath = config["dev"].as<std::string>(); + + return open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) { + auto ft = new file_test{std::move(f)}; + + // Discard asynchronously, siganl when done. + (void)ft->f.stat().then([ft] (struct stat st) mutable { + assert(S_ISBLK(st.st_mode)); + auto offset = 0; + auto length = max * 4096; + return ft->f.discard(offset, length).then([ft] () mutable { + ft->sem.signal(); + }); + }); + + // Wait and exit. + (void)ft->sem.wait().then([ft] () mutable { + return ft->f.flush(); + }).then([ft] () mutable { + std::cout << "done\n"; + delete ft; + engine().exit(0); + }); + }); + }); +} diff --git a/src/seastar/demos/coroutines_demo.cc b/src/seastar/demos/coroutines_demo.cc new file mode 100644 index 000000000..06e1ab47b --- /dev/null +++ b/src/seastar/demos/coroutines_demo.cc @@ -0,0 +1,72 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2019 ScyllaDB Ltd. + */ + +#include <iostream> + +#include <seastar/util/std-compat.hh> + +#ifndef SEASTAR_COROUTINES_ENABLED + +int main(int argc, char** argv) { + std::cout << "coroutines not available\n"; + return 0; +} + +#else + +#include <seastar/core/app-template.hh> +#include <seastar/core/coroutine.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/loop.hh> +#include <seastar/core/sstring.hh> +#include <seastar/coroutine/parallel_for_each.hh> + +int main(int argc, char** argv) { + seastar::app_template app; + app.run(argc, argv, [] () -> seastar::future<> { + std::cout << "this is a completely useless program\nplease stand by...\n"; + auto f = seastar::coroutine::parallel_for_each(std::vector<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, [] (int i) -> seastar::future<> { + co_await seastar::sleep(std::chrono::seconds(i)); + std::cout << i << "\n"; + }); + + auto file = co_await seastar::open_file_dma("useless_file.txt", seastar::open_flags::create | seastar::open_flags::wo); + auto out = co_await seastar::make_file_output_stream(file); + seastar::sstring str = "nothing to see here, move along now\n"; + co_await out.write(str); + co_await out.flush(); + co_await out.close(); + + bool all_exist = true; + std::vector<seastar::sstring> filenames = { "useless_file.txt", "non_existing" }; + co_await seastar::coroutine::parallel_for_each(filenames, [&all_exist] (const seastar::sstring& name) -> seastar::future<> { + all_exist &= co_await seastar::file_exists(name); + }); + std::cout << (all_exist ? "" : "not ") << "all files exist" << std::endl; + + co_await std::move(f); + std::cout << "done\n"; + }); +} + +#endif diff --git a/src/seastar/demos/echo_demo.cc b/src/seastar/demos/echo_demo.cc new file mode 100644 index 000000000..d8a95a793 --- /dev/null +++ b/src/seastar/demos/echo_demo.cc @@ -0,0 +1,126 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + */ + +#include <seastar/net/virtio.hh> +#include <seastar/net/dpdk.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/ip.hh> +#include <seastar/net/native-stack.hh> +#include <iostream> +#include <utility> +#include <algorithm> + +using namespace seastar; +using namespace net; + +void dump_packet(const packet& p) { + std::cout << "rx:"; + auto f = p.frag(0); + for (unsigned i = 0; i < std::min(f.size, size_t(30)); ++i) { + char x[4]; + std::sprintf(x, " %02x", uint8_t(f.base[i])); + std::cout << x; + } + std::cout << "\n"; +} + +future<> echo_packet(net::qp& netif, packet p) { + auto f = p.frag(0); + if (f.size < sizeof(eth_hdr)) { + return make_ready_future<>(); + } + auto pos = 0; + auto eh = reinterpret_cast<eth_hdr*>(f.base + pos); + pos += sizeof(*eh); + *eh = ntoh(*eh); + if (eh->eth_proto != 0x0800) { + return make_ready_future<>(); + } + auto iph = reinterpret_cast<ip_hdr*>(f.base + pos); + *iph = ntoh(*iph); + pos += iph->ihl * 4; + if (iph->ver != 4 || iph->ihl < 5 || iph->ip_proto != 1) { + return make_ready_future<>(); + } + auto ip_len = iph->len; + auto icmph = reinterpret_cast<icmp_hdr*>(f.base + pos); + if (icmph->type != icmp_hdr::msg_type::echo_request) { + return make_ready_future<>(); + } + auto icmp_len = ip_len - iph->ihl * 4; + std::swap(eh->src_mac, eh->dst_mac); + std::swap(iph->src_ip, iph->dst_ip); + icmph->type = icmp_hdr::msg_type::echo_reply; + icmph->csum = 0; + *iph = hton(*iph); + *eh = hton(*eh); + icmph->csum = ip_checksum(icmph, icmp_len); + iph->csum = 0; + iph->csum = ip_checksum(iph, iph->ihl * 4); + return netif.send(std::move(p)); +} + +#ifdef SEASTAR_HAVE_DPDK +void usage() +{ + std::cout<<"Usage: echotest [-virtio|-dpdk]"<<std::endl; + std::cout<<" -virtio - use virtio backend (default)"<<std::endl; + std::cout<<" -dpdk - use dpdk-pmd backend"<<std::endl; +} +#endif + +int main(int ac, char** av) { + std::unique_ptr<net::device> dnet; + net::qp* vnet; + + native_stack_options opts; + +#ifdef SEASTAR_HAVE_DPDK + if (ac > 2) { + usage(); + return -1; + } + + if ((ac == 1) || !std::strcmp(av[1], "-virtio")) { + dnet = create_virtio_net_device(opts.virtio_opts, opts.lro); + } else if (!std::strcmp(av[1], "-dpdk")) { + dnet = create_dpdk_net_device(); + } else { + usage(); + return -1; + } +#else + dnet = create_virtio_net_device(opts.virtio_opts, opts.lro); +#endif // SEASTAR_HAVE_DPDK + + auto qp = dnet->init_local_queue(opts, 0); + vnet = qp.get(); + dnet->set_local_queue(std::move(qp)); + future<> rx_done = + dnet->receive([vnet] (packet p) { + return echo_packet(*vnet, std::move(p)); + }); + engine().run(); + return 0; +} + + diff --git a/src/seastar/demos/file_demo.cc b/src/seastar/demos/file_demo.cc new file mode 100644 index 000000000..20f6b2cd8 --- /dev/null +++ b/src/seastar/demos/file_demo.cc @@ -0,0 +1,237 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2020 ScyllaDB + */ + + +// Demonstration of seastar::with_file + +#include <cstring> +#include <limits> +#include <random> + +#include <seastar/core/app-template.hh> + +#include <seastar/core/aligned_buffer.hh> +#include <seastar/core/file.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/sstring.hh> +#include <seastar/core/temporary_buffer.hh> +#include <seastar/core/loop.hh> +#include <seastar/core/io_intent.hh> +#include <seastar/util/log.hh> +#include <seastar/util/tmp_file.hh> + +using namespace seastar; + +constexpr size_t aligned_size = 4096; + +future<> verify_data_file(file& f, temporary_buffer<char>& rbuf, const temporary_buffer<char>& wbuf) { + return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf, &wbuf] (size_t count) { + assert(count == aligned_size); + fmt::print(" verifying {} bytes\n", count); + assert(!memcmp(rbuf.get(), wbuf.get(), aligned_size)); + }); +} + +future<file> open_data_file(sstring meta_filename, temporary_buffer<char>& rbuf) { + fmt::print(" retrieving data filename from {}\n", meta_filename); + return with_file(open_file_dma(meta_filename, open_flags::ro), [&rbuf] (file& f) { + return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf] (size_t count) { + assert(count == aligned_size); + auto data_filename = sstring(rbuf.get()); + fmt::print(" opening {}\n", data_filename); + return open_file_dma(data_filename, open_flags::ro); + }); + }); +} + +future<> demo_with_file() { + fmt::print("Demonstrating with_file():\n"); + return tmp_dir::do_with_thread([] (tmp_dir& t) { + auto rnd = std::mt19937(std::random_device()()); + auto dist = std::uniform_int_distribution<int>(0, std::numeric_limits<char>::max()); + auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); + sstring meta_filename = (t.get_path() / "meta_file").native(); + sstring data_filename = (t.get_path() / "data_file").native(); + + // `with_file` is used to create/open `filename` just around the call to `dma_write` + auto write_to_file = [] (const sstring filename, temporary_buffer<char>& wbuf) { + auto count = with_file(open_file_dma(filename, open_flags::rw | open_flags::create), [&wbuf] (file& f) { + return f.dma_write(0, wbuf.get(), aligned_size); + }).get0(); + assert(count == aligned_size); + }; + + // print the data_filename into the write buffer + std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0); + std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write()); + + // and write it to `meta_filename` + fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename); + + write_to_file(meta_filename, wbuf); + + // now write some random data into data_filename + fmt::print(" writing random data into {}\n", data_filename); + std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); + + write_to_file(data_filename, wbuf); + + // verify the data via meta_filename + fmt::print(" verifying data...\n"); + auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); + + with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) { + return verify_data_file(f, rbuf, wbuf); + }).get(); + }); +} + +future<> demo_with_file_close_on_failure() { + fmt::print("\nDemonstrating with_file_close_on_failure():\n"); + return tmp_dir::do_with_thread([] (tmp_dir& t) { + auto rnd = std::mt19937(std::random_device()()); + auto dist = std::uniform_int_distribution<int>(0, std::numeric_limits<char>::max()); + auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); + sstring meta_filename = (t.get_path() / "meta_file").native(); + sstring data_filename = (t.get_path() / "data_file").native(); + + // with_file_close_on_failure will close the opened file only if + // `make_file_output_stream` returns an error. Otherwise, in the error-free path, + // the opened file is moved to `file_output_stream` that in-turn closes it + // when the stream is closed. + auto make_output_stream = [] (const sstring filename) { + return with_file_close_on_failure(open_file_dma(std::move(filename), open_flags::rw | open_flags::create), [] (file f) { + return make_file_output_stream(std::move(f), aligned_size); + }); + }; + + // writes the buffer one byte at a time, to demonstrate output stream + auto write_to_stream = [] (output_stream<char>& o, const temporary_buffer<char>& wbuf) { + return seastar::do_for_each(wbuf, [&o] (char c) { + return o.write(&c, 1); + }).finally([&o] { + return o.close(); + }); + }; + + // print the data_filename into the write buffer + std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0); + std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write()); + + // and write it to `meta_filename` + fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename); + + // with_file_close_on_failure will close the opened file only if + // `make_file_output_stream` returns an error. Otherwise, in the error-free path, + // the opened file is moved to `file_output_stream` that in-turn closes it + // when the stream is closed. + output_stream<char> o = make_output_stream(meta_filename).get0(); + + write_to_stream(o, wbuf).get(); + + // now write some random data into data_filename + fmt::print(" writing random data into {}\n", data_filename); + std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); + + o = make_output_stream(data_filename).get0(); + + write_to_stream(o, wbuf).get(); + + // verify the data via meta_filename + fmt::print(" verifying data...\n"); + auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); + + with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) { + return verify_data_file(f, rbuf, wbuf); + }).get(); + }); +} + +static constexpr size_t half_aligned_size = aligned_size / 2; + +future<> demo_with_io_intent() { + fmt::print("\nDemonstrating demo_with_io_intent():\n"); + return tmp_dir::do_with_thread([] (tmp_dir& t) { + sstring filename = (t.get_path() / "testfile.tmp").native(); + auto f = open_file_dma(filename, open_flags::rw | open_flags::create).get0(); + + auto rnd = std::mt19937(std::random_device()()); + auto dist = std::uniform_int_distribution<int>(0, std::numeric_limits<char>::max()); + + auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); + fmt::print(" writing random data into {}\n", filename); + std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); + + f.dma_write(0, wbuf.get(), aligned_size).get(); + + auto wbuf_n = temporary_buffer<char>::aligned(aligned_size, aligned_size); + fmt::print(" starting to overwrite {} with other random data in two steps\n", filename); + std::generate(wbuf_n.get_write(), wbuf_n.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); + + io_intent intent; + auto f1 = f.dma_write(0, wbuf_n.get(), half_aligned_size); + auto f2 = f.dma_write(half_aligned_size, wbuf_n.get() + half_aligned_size, half_aligned_size, default_priority_class(), &intent); + + fmt::print(" cancel the 2nd overwriting\n"); + intent.cancel(); + + fmt::print(" wait for overwriting IOs to complete\n"); + f1.get(); + + bool cancelled = false; + try { + f2.get(); + // The file::dma_write doesn't preemt, but if it + // suddenly will, the 2nd write will pass before + // the intent would be cancelled + fmt::print(" 2nd write won the race with cancellation\n"); + } catch (cancelled_error& ex) { + cancelled = true; + } + + fmt::print(" verifying data...\n"); + auto rbuf = allocate_aligned_buffer<unsigned char>(aligned_size, aligned_size); + f.dma_read(0, rbuf.get(), aligned_size).get(); + + // First part of the buffer must coincide with the overwritten data + assert(!memcmp(rbuf.get(), wbuf_n.get(), half_aligned_size)); + + if (cancelled) { + // Second part -- with the old data ... + assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size)); + } else { + // ... or with new if the cancellation didn't happen + assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size)); + } + }); +} + +int main(int ac, char** av) { + app_template app; + return app.run(ac, av, [] { + return demo_with_file().then([] { + return demo_with_file_close_on_failure().then([] { + return demo_with_io_intent(); + }); + }); + }); +} diff --git a/src/seastar/demos/hello-world.cc b/src/seastar/demos/hello-world.cc new file mode 100644 index 000000000..e010ff26b --- /dev/null +++ b/src/seastar/demos/hello-world.cc @@ -0,0 +1,35 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2022 ScyllaDB Ltd. + */ + +#include <seastar/core/app-template.hh> +#include <seastar/core/seastar.hh> +#include <seastar/util/log.hh> + +using namespace seastar; +logger applog("app"); + +int main(int argc, char** argv) { + seastar::app_template app; + app.run(argc, argv, [] () -> future<> { + applog.info("Hello world!"); + return make_ready_future<>(); + }); +} diff --git a/src/seastar/demos/http_client_demo.cc b/src/seastar/demos/http_client_demo.cc new file mode 100644 index 000000000..c1db64468 --- /dev/null +++ b/src/seastar/demos/http_client_demo.cc @@ -0,0 +1,111 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2022 ScyllaDB Ltd. + */ + +#include <seastar/core/app-template.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/iostream.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/thread.hh> +#include <seastar/http/client.hh> +#include <seastar/http/request.hh> +#include <seastar/http/reply.hh> +#include <seastar/net/inet_address.hh> +#include <seastar/net/dns.hh> +#include <seastar/net/tls.hh> + +using namespace seastar; +namespace bpo = boost::program_options; + +struct printer { + future<consumption_result<char>> operator() (temporary_buffer<char> buf) { + if (buf.empty()) { + return make_ready_future<consumption_result<char>>(stop_consuming(std::move(buf))); + } + fmt::print("{}", sstring(buf.get(), buf.size())); + return make_ready_future<consumption_result<char>>(continue_consuming()); + } +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("https", bpo::bool_switch(), "Use HTTPS on port 443 (if off -- use HTTP on port 80)") + ("host", bpo::value<std::string>(), "Host to connect") + ("path", bpo::value<std::string>(), "Path to query upon") + ("method", bpo::value<std::string>()->default_value("GET"), "Method to use") + ("file", bpo::value<std::string>(), "File to get body from (no body if missing)") + ; + + + return app.run(ac, av, [&] { + auto&& config = app.configuration(); + auto host = config["host"].as<std::string>(); + auto path = config["path"].as<std::string>(); + auto method = config["method"].as<std::string>(); + auto body = config.count("file") == 0 ? std::string("") : config["file"].as<std::string>(); + auto https = config["https"].as<bool>(); + + return seastar::async([=] { + net::hostent e = net::dns::get_host_by_name(host, net::inet_address::family::INET).get0(); + auto make_socket = [=] () -> connected_socket { + if (!https) { + socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}}); + ipv4_addr addr(e.addr_list.front(), 80); + fmt::print("{} {}:80{}\n", method, e.addr_list.front(), path); + return connect(make_ipv4_address(addr), local, transport::TCP).get0(); + } else { + auto certs = ::make_shared<tls::certificate_credentials>(); + certs->set_system_trust().get(); + socket_address remote = socket_address(e.addr_list.front(), 443); + fmt::print("{} {}:443{}\n", method, e.addr_list.front(), path); + return tls::connect(certs, remote, host).get0(); + } + }; + + connected_socket s = make_socket(); + http::experimental::connection conn(std::move(s)); + auto req = http::request::make(method, host, path); + if (body != "") { + future<file> f = open_file_dma(body, open_flags::ro); + req.write_body("txt", [ f = std::move(f) ] (output_stream<char>&& out) mutable { + return seastar::async([f = std::move(f), out = std::move(out)] () mutable { + auto in = make_file_input_stream(f.get0()); + copy(in, out).get(); + out.flush().get(); + out.close().get(); + in.close().get(); + }); + }); + } + http::reply rep = conn.make_request(std::move(req)).get0(); + + fmt::print("Reply status {}\n--------8<--------\n", rep._status); + auto in = conn.in(rep); + in.consume(printer{}).get(); + in.close().get(); + + conn.close().get(); + }).handle_exception([](auto ep) { + fmt::print("Error: {}", ep); + }); + }); +} diff --git a/src/seastar/demos/ip_demo.cc b/src/seastar/demos/ip_demo.cc new file mode 100644 index 000000000..c58efc29b --- /dev/null +++ b/src/seastar/demos/ip_demo.cc @@ -0,0 +1,47 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/arp.hh> +#include <seastar/net/ip.hh> +#include <seastar/net/net.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/virtio.hh> +#include <seastar/net/native-stack.hh> +#include <seastar/core/aligned_buffer.hh> + +using namespace seastar; +using namespace net; + +int main(int ac, char** av) { + native_stack_options opts; + + auto vnet = create_virtio_net_device(opts.virtio_opts, opts.lro); + vnet->set_local_queue(vnet->init_local_queue(opts, 0)); + + interface netif(std::move(vnet)); + ipv4 inet(&netif); + inet.set_host_address(ipv4_address("192.168.122.2")); + engine().run(); + return 0; +} + + + diff --git a/src/seastar/demos/l3_demo.cc b/src/seastar/demos/l3_demo.cc new file mode 100644 index 000000000..7813e109d --- /dev/null +++ b/src/seastar/demos/l3_demo.cc @@ -0,0 +1,49 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/net.hh> +#include <seastar/core/reactor.hh> +#include <seastar/net/virtio.hh> +#include <seastar/net/native-stack.hh> +#include <iostream> + +using namespace seastar; +using namespace net; + +void dump_arp_packets(l3_protocol& proto) { + // FIXME: ignored future + (void)proto.receive([] (packet p, ethernet_address from) { + std::cout << "seen arp packet\n"; + return make_ready_future<>(); + }, [] (forward_hash& out_hash_data, packet& p, size_t off) {return false;}); +} + +int main(int ac, char** av) { + native_stack_options opts; + + auto vnet = create_virtio_net_device(opts.virtio_opts, opts.lro); + interface netif(std::move(vnet)); + l3_protocol arp(&netif, eth_protocol_num::arp, []{ return std::optional<l3_protocol::l3packet>(); }); + dump_arp_packets(arp); + engine().run(); + return 0; +} + diff --git a/src/seastar/demos/line_count_demo.cc b/src/seastar/demos/line_count_demo.cc new file mode 100644 index 000000000..eab4609ec --- /dev/null +++ b/src/seastar/demos/line_count_demo.cc @@ -0,0 +1,85 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +// Demonstration of file_input_stream. Don't expect stellar performance +// since no read-ahead or caching is done yet. + +#include <seastar/core/fstream.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/shared_ptr.hh> +#include <fmt/printf.h> +#include <algorithm> +#include <iostream> + +using namespace seastar; + +struct reader { +public: + reader(file f) + : is(make_file_input_stream(std::move(f), file_input_stream_options{1 << 16, 1})) { + } + + input_stream<char> is; + size_t count = 0; + + // for input_stream::consume(): + using unconsumed_remainder = std::optional<temporary_buffer<char>>; + future<unconsumed_remainder> operator()(temporary_buffer<char> data) { + if (data.empty()) { + return make_ready_future<unconsumed_remainder>(std::move(data)); + } else { + count += std::count(data.begin(), data.end(), '\n'); + // FIXME: last line without \n? + return make_ready_future<unconsumed_remainder>(); + } + } +}; + +int main(int ac, char** av) { + app_template app; + namespace bpo = boost::program_options; + app.add_positional_options({ + { "file", bpo::value<std::string>(), "File to process", 1 }, + }); + return app.run(ac, av, [&app] { + auto fname = app.configuration()["file"].as<std::string>(); + return open_file_dma(fname, open_flags::ro).then([] (file f) { + auto r = make_shared<reader>(std::move(f)); + return r->is.consume(*r).then([r] { + fmt::print("{:d} lines\n", r->count); + return r->is.close().then([r] {}); + }); + }).then_wrapped([] (future<> f) -> future<int> { + try { + f.get(); + return make_ready_future<int>(0); + } catch (std::exception& ex) { + std::cout << ex.what() << "\n"; + return make_ready_future<int>(1); + } catch (...) { + std::cout << "unknown exception\n"; + return make_ready_future<int>(1); + } + }); + }); +} + diff --git a/src/seastar/demos/rpc_demo.cc b/src/seastar/demos/rpc_demo.cc new file mode 100644 index 000000000..d46cd07ec --- /dev/null +++ b/src/seastar/demos/rpc_demo.cc @@ -0,0 +1,299 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/rpc/rpc.hh> +#include <seastar/core/sleep.hh> +#include <seastar/rpc/lz4_compressor.hh> +#include <seastar/util/log.hh> +#include <seastar/core/loop.hh> + +using namespace seastar; + +struct serializer { +}; + +template <typename T, typename Output> +inline +void write_arithmetic_type(Output& out, T v) { + static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); + return out.write(reinterpret_cast<const char*>(&v), sizeof(T)); +} + +template <typename T, typename Input> +inline +T read_arithmetic_type(Input& in) { + static_assert(std::is_arithmetic<T>::value, "must be arithmetic type"); + T v; + in.read(reinterpret_cast<char*>(&v), sizeof(T)); + return v; +} + +template <typename Output> +inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } +template <typename Output> +inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } +template <typename Input> +inline int32_t read(serializer, Input& input, rpc::type<int32_t>) { return read_arithmetic_type<int32_t>(input); } +template <typename Input> +inline uint32_t read(serializer, Input& input, rpc::type<uint32_t>) { return read_arithmetic_type<uint32_t>(input); } +template <typename Input> +inline uint64_t read(serializer, Input& input, rpc::type<uint64_t>) { return read_arithmetic_type<uint64_t>(input); } +template <typename Input> +inline uint64_t read(serializer, Input& input, rpc::type<int64_t>) { return read_arithmetic_type<int64_t>(input); } +template <typename Input> +inline double read(serializer, Input& input, rpc::type<double>) { return read_arithmetic_type<double>(input); } + +template <typename Output> +inline void write(serializer, Output& out, const sstring& v) { + write_arithmetic_type(out, uint32_t(v.size())); + out.write(v.c_str(), v.size()); +} + +template <typename Input> +inline sstring read(serializer, Input& in, rpc::type<sstring>) { + auto size = read_arithmetic_type<uint32_t>(in); + sstring ret = uninitialized_string(size); + in.read(ret.data(), size); + return ret; +} + +namespace bpo = boost::program_options; +using namespace std::chrono_literals; + +class mycomp : public rpc::compressor::factory { + const sstring _name = "LZ4"; +public: + virtual const sstring& supported() const override { + fmt::print("supported called\n"); + return _name; + } + virtual std::unique_ptr<rpc::compressor> negotiate(sstring feature, bool is_server) const override { + fmt::print("negotiate called with {}\n", feature); + return feature == _name ? std::make_unique<rpc::lz4_compressor>() : nullptr; + } +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "RPC server port") + ("server", bpo::value<std::string>(), "Server address") + ("compress", bpo::value<bool>()->default_value(false), "Compress RPC traffic"); + std::cout << "start "; + rpc::protocol<serializer> myrpc(serializer{}); + static std::unique_ptr<rpc::protocol<serializer>::server> server; + static std::unique_ptr<rpc::protocol<serializer>::client> client; + static double x = 30.0; + + static logger log("rpc_demo"); + myrpc.set_logger(&log); + + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + bool compress = config["compress"].as<bool>(); + static mycomp mc; + auto test1 = myrpc.register_handler(1, [x = 0](int i) mutable { fmt::print("test1 count {:d} got {:d}\n", ++x, i); }); + auto test2 = myrpc.register_handler(2, [](int a, int b){ fmt::print("test2 got {:d} {:d}\n", a, b); return make_ready_future<int>(a+b); }); + auto test3 = myrpc.register_handler(3, [](double x){ fmt::print("test3 got {:f}\n", x); return std::make_unique<double>(sin(x)); }); + auto test4 = myrpc.register_handler(4, [](){ fmt::print("test4 throw!\n"); throw std::runtime_error("exception!"); }); + auto test5 = myrpc.register_handler(5, [](){ fmt::print("test5 no wait\n"); return rpc::no_wait; }); + auto test6 = myrpc.register_handler(6, [](const rpc::client_info& info, int x){ fmt::print("test6 client {}, {:d}\n", inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr), x); }); + auto test8 = myrpc.register_handler(8, [](){ fmt::print("test8 sleep for 2 sec\n"); return sleep(2s); }); + auto test13 = myrpc.register_handler(13, [](){ fmt::print("test13 sleep for 1 msec\n"); return sleep(1ms); }); + auto test_message_to_big = myrpc.register_handler(14, [](sstring payload){ fmt::print("test message to bit, should not get here"); }); + + if (config.count("server")) { + std::cout << "client" << std::endl; + auto test7 = myrpc.make_client<long (long a, long b)>(7); + auto test9 = myrpc.make_client<long (long a, long b)>(9); // do not send optional + auto test9_1 = myrpc.make_client<long (long a, long b, int c)>(9); // send optional + auto test9_2 = myrpc.make_client<long (long a, long b, int c, long d)>(9); // send more data than handler expects + auto test10 = myrpc.make_client<long ()>(10); // receive less then replied + auto test10_1 = myrpc.make_client<future<rpc::tuple<long, int>> ()>(10); // receive all + auto test11 = myrpc.make_client<future<rpc::tuple<long, rpc::optional<int>>> ()>(11); // receive more then replied + auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits + auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb + auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call + rpc::client_options co; + if (compress) { + co.compressor_factory = &mc; + } + + client = std::make_unique<rpc::protocol<serializer>::client>(myrpc, co, ipv4_addr{config["server"].as<std::string>()}); + + auto f = test8(*client, 1500ms).then_wrapped([](future<> f) { + try { + f.get(); + printf("test8 should not get here!\n"); + } catch (rpc::timeout_error&) { + printf("test8 timeout!\n"); + } + }); + for (auto i = 0; i < 100; i++) { + fmt::print("iteration={:d}\n", i); + (void)test1(*client, 5).then([] (){ fmt::print("test1 ended\n");}); + (void)test2(*client, 1, 2).then([] (int r) { fmt::print("test2 got {:d}\n", r); }); + (void)test3(*client, x).then([](double x) { fmt::print("sin={:f}\n", x); }); + (void)test4(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test4 your should not see this!\n"); + } catch (std::runtime_error& x){ + fmt::print("test4 {}\n", x.what()); + } + }); + (void)test5(*client).then([] { fmt::print("test5 no wait ended\n"); }); + (void)test6(*client, 1).then([] { fmt::print("test6 ended\n"); }); + (void)test7(*client, 5, 6).then([] (long r) { fmt::print("test7 got {:d}\n", r); }); + (void)test9(*client, 1, 2).then([] (long r) { fmt::print("test9 got {:d}\n", r); }); + (void)test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); }); + (void)test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); }); + (void)test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); }); + (void)test10_1(*client).then([] (rpc::tuple<long, int> r) { fmt::print("test10_1 got {:d} and {:d}\n", std::get<0>(r), std::get<1>(r)); }); + (void)test11(*client).then([] (rpc::tuple<long, rpc::optional<int> > r) { fmt::print("test11 got {:d} and {:d}\n", std::get<0>(r), bool(std::get<1>(r))); }); + (void)test_nohandler(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test_nohandler your should not see this!\n"); + } catch (rpc::unknown_verb_error& x){ + fmt::print("test_nohandle no such verb\n"); + } catch (...) { + fmt::print("incorrect exception!\n"); + } + }); + (void)test_nohandler_nowait(*client); + auto c = make_lw_shared<rpc::cancellable>(); + (void)test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + c->cancel(); + (void)test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + (void)sleep(500us).then([c] { c->cancel(); }); + (void)test_message_to_big(*client, uninitialized_string(10'000'001)).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test message to big shold not get here\n"); + } catch(std::runtime_error& err) { + fmt::print("test message to big get error {}\n", err.what()); + } catch(...) { + fmt::print("test message to big wrong exception\n"); + } + }); + } + // delay a little for a time-sensitive test + (void)sleep(400ms).then([test12] () mutable { + // server is configured for 10MB max, throw 25MB worth of requests at it. + auto now = rpc::rpc_clock_type::now(); + return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable { + return test12(*client, 100, uninitialized_string(1'000'000)).then([idx, now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now); + fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count()); + }); + }).then([now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now); + fmt::print("test12 completed after {:d} ms (should be ~300)\n", delta.count()); + }); + }); + (void)f.finally([] { + return sleep(1s).then([] { + return client->stop().then([] { + return engine().exit(0); + }); + }); + }); + } else { + std::cout << "server on port " << port << std::endl; + myrpc.register_handler(7, [](long a, long b) mutable { + auto p = make_lw_shared<promise<>>(); + auto t = make_lw_shared<timer<>>(); + fmt::print("test7 got {:d} {:d}\n", a, b); + auto f = p->get_future().then([a, b, t] { + fmt::print("test7 calc res\n"); + return a - b; + }); + t->set_callback([p = std::move(p)] () mutable { p->set_value(); }); + t->arm(1s); + return f; + }); + myrpc.register_handler(9, [] (long a, long b, rpc::optional<int> c) { + long r = 2; + fmt::print("test9 got {:d} {:d} ", a, b); + if (c) { + fmt::print("{:d}", c.value()); + r++; + } + fmt::print("\n"); + return r; + }); + myrpc.register_handler(10, [] { + fmt::print("test 10\n"); + return make_ready_future<rpc::tuple<long, int>>(rpc::tuple<long, int>(1, 2)); + }); + myrpc.register_handler(11, [] { + fmt::print("test 11\n"); + return 1ul; + }); + myrpc.register_handler(12, [] (int sleep_ms, sstring payload) { + return sleep(std::chrono::milliseconds(sleep_ms)).then([] { + return make_ready_future<>(); + }); + }); + + rpc::resource_limits limits; + limits.bloat_factor = 1; + limits.basic_request_size = 0; + limits.max_memory = 10'000'000; + rpc::server_options so; + if (compress) { + so.compressor_factory = &mc; + } + server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, so, ipv4_addr{port}, limits); + } + }); + +} diff --git a/src/seastar/demos/scheduling_group_demo.cc b/src/seastar/demos/scheduling_group_demo.cc new file mode 100644 index 000000000..9ea9edc1b --- /dev/null +++ b/src/seastar/demos/scheduling_group_demo.cc @@ -0,0 +1,186 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2016 Scylla DB Ltd + */ + + +#include <seastar/core/app-template.hh> +#include <seastar/core/future.hh> +#include <seastar/core/scheduling.hh> +#include <seastar/core/thread.hh> +#include <seastar/core/loop.hh> +#include <seastar/core/when_all.hh> +#include <seastar/core/with_scheduling_group.hh> +#include <seastar/core/condition-variable.hh> +#include <seastar/util/defer.hh> +#include <fmt/printf.h> +#include <chrono> +#include <cmath> +#include <boost/range/irange.hpp> + +using namespace seastar; +using namespace std::chrono_literals; + +template <typename Func, typename Duration> +future<> +compute_intensive_task(Duration duration, unsigned& counter, Func func) { + auto end = std::chrono::steady_clock::now() + duration; + while (std::chrono::steady_clock::now() < end) { + func(); + } + ++counter; + return make_ready_future<>(); +} + +future<> +heavy_task(unsigned& counter) { + return compute_intensive_task(1ms, counter, [] { + static thread_local double x = 1; + x = std::exp(x) / 3; + }); +} + +future<> +light_task(unsigned& counter) { + return compute_intensive_task(100us, counter, [] { + static thread_local double x = 0.1; + x = std::log(x + 1); + }); +} + +future<> +medium_task(unsigned& counter) { + return compute_intensive_task(400us, counter, [] { + static thread_local double x = 0.1; + x = std::cos(x); + }); +} + +using done_func = std::function<bool ()>; + +future<> +run_compute_intensive_tasks(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) { + return seastar::async([task = std::move(task), sg, concurrency, done, &counter] () mutable { + while (!done()) { + parallel_for_each(boost::irange(0u, concurrency), [task, sg, &counter] (unsigned i) mutable { + return with_scheduling_group(sg, [task, &counter] { + return task(counter); + }); + }).get(); + thread::maybe_yield(); + } + }); +} + +future<> +run_compute_intensive_tasks_in_threads(seastar::scheduling_group sg, done_func done, unsigned concurrency, unsigned& counter, std::function<future<> (unsigned& counter)> task) { + auto attr = seastar::thread_attributes(); + attr.sched_group = sg; + return parallel_for_each(boost::irange(0u, concurrency), [attr, done, &counter, task] (unsigned i) { + return seastar::async(attr, [done, &counter, task] { + while (!done()) { + task(counter).get(); + thread::maybe_yield(); + } + }); + }); +} + +future<> +run_with_duty_cycle(float utilization, std::chrono::steady_clock::duration period, done_func done, std::function<future<> (done_func done)> task) { + return seastar::async([=] { + bool duty_toggle = true; + auto t0 = std::chrono::steady_clock::now(); + condition_variable cv; + timer<> tmr_on([&] { duty_toggle = true; cv.signal(); }); + timer<> tmr_off([&] { duty_toggle = false; }); + tmr_on.arm(t0, period); + tmr_off.arm(t0 + std::chrono::duration_cast<decltype(t0)::duration>(period * utilization), period); + auto combined_done = [&] { + return done() || !duty_toggle; + }; + while (!done()) { + while (!combined_done()) { + task(std::cref(combined_done)).get(); + thread::maybe_yield(); + } + cv.wait([&] { + return done() || duty_toggle; + }).get(); + } + tmr_on.cancel(); + tmr_off.cancel(); + }); +} + +#include <fenv.h> + +template <typename T> +auto var_fn(T& var) { + return [&var] { return var; }; +} + +int main(int ac, char** av) { + app_template app; + return app.run(ac, av, [] { + return seastar::async([] { + auto sg100 = seastar::create_scheduling_group("sg100", 100).get0(); + auto ksg100 = seastar::defer([&] () noexcept { seastar::destroy_scheduling_group(sg100).get(); }); + auto sg20 = seastar::create_scheduling_group("sg20", 20).get0(); + auto ksg20 = seastar::defer([&] () noexcept { seastar::destroy_scheduling_group(sg20).get(); }); + auto sg50 = seastar::create_scheduling_group("sg50", 50).get0(); + auto ksg50 = seastar::defer([&] () noexcept { seastar::destroy_scheduling_group(sg50).get(); }); + + bool done = false; + auto end = timer<>([&done] { + done = true; + }); + + end.arm(10s); + unsigned ctr100 = 0, ctr20 = 0, ctr50 = 0; + fmt::print("running three scheduling groups with 100% duty cycle each:\n"); + when_all( + run_compute_intensive_tasks(sg100, var_fn(done), 5, ctr100, heavy_task), + run_compute_intensive_tasks(sg20, var_fn(done), 3, ctr20, light_task), + run_compute_intensive_tasks_in_threads(sg50, var_fn(done), 2, ctr50, medium_task) + ).get(); + fmt::print("{:10} {:15} {:10} {:12} {:8}\n", "shares", "task_time (us)", "executed", "runtime (ms)", "vruntime"); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 1000, ctr100, ctr100 * 1000 / 1000, ctr100 * 1000 / 1000 / 100.); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 20, 100, ctr20, ctr20 * 100 / 1000, ctr20 * 100 / 1000 / 20.); + fmt::print("{:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 400, ctr50, ctr50 * 400 / 1000, ctr50 * 400 / 1000 / 50.); + fmt::print("\n"); + + fmt::print("running two scheduling groups with 100%/50% duty cycles (period=1s:\n"); + unsigned ctr100_2 = 0, ctr50_2 = 0; + done = false; + end.arm(10s); + when_all( + run_compute_intensive_tasks(sg50, var_fn(done), 5, ctr50_2, heavy_task), + run_with_duty_cycle(0.5, 1s, var_fn(done), [=, &ctr100_2] (done_func done) { + return run_compute_intensive_tasks(sg100, done, 4, ctr100_2, heavy_task); + }) + ).get(); + fmt::print("{:10} {:10} {:15} {:10} {:12} {:8}\n", "shares", "duty", "task_time (us)", "executed", "runtime (ms)", "vruntime"); + fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 100, 50, 1000, ctr100_2, ctr100_2 * 1000 / 1000, ctr100_2 * 1000 / 1000 / 100.); + fmt::print("{:10d} {:10d} {:15d} {:10d} {:12d} {:8.2f}\n", 50, 100, 400, ctr50_2, ctr50_2 * 1000 / 1000, ctr50_2 * 1000 / 1000 / 50.); + + return 0; + }); + }); +} diff --git a/src/seastar/demos/sharded_parameter_demo.cc b/src/seastar/demos/sharded_parameter_demo.cc new file mode 100644 index 000000000..09b53a9ab --- /dev/null +++ b/src/seastar/demos/sharded_parameter_demo.cc @@ -0,0 +1,78 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2020 ScyllaDB + */ + + +// Demonstration of seastar::sharded_parameter + +#include <seastar/core/sharded.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/thread.hh> +#include <seastar/util/closeable.hh> +#include <cassert> + +// This is some service that we wish to run on all shards. +class service_one { + int _capacity = 7; +public: + // Pretend that this int is some important resource. + int get_capacity() const { return _capacity; } +}; + +// Another service that we run on all shards, that depends on service_one. +class service_two { + int _resource_allocation; +public: + service_two(service_one& s1, int resource_allocation) : _resource_allocation(resource_allocation) {} + int get_resource_allocation() const { return _resource_allocation; } +}; + +int main(int ac, char** av) { + seastar::app_template app; + return app.run(ac, av, [&] { + // sharded<> setup code is typically run in a seastar::thread + return seastar::async([&] { + + // Launch service_one + seastar::sharded<service_one> s1; + s1.start().get(); + auto stop_s1 = seastar::deferred_stop(s1); + + auto calculate_half_capacity = [] (service_one& s1) { + return s1.get_capacity() / 2; + }; + + // Launch service_two, passing it per-shard dependencies from s1 + seastar::sharded<service_two> s2; + // Start s2, passing two parameters to service_two's constructor + s2.start( + // Each service_two instance will get a reference to a service_one instance on the same shard + std::ref(s1), + // This calculation will be performed on each shard + seastar::sharded_parameter(calculate_half_capacity, std::ref(s1)) + ).get(); + auto stop_s2 = seastar::deferred_stop(s2); + + s2.invoke_on_all([] (service_two& s2) { + assert(s2.get_resource_allocation() == 3); + }).get(); + }); + }); +} diff --git a/src/seastar/demos/tcp_demo.cc b/src/seastar/demos/tcp_demo.cc new file mode 100644 index 000000000..34ba69d7f --- /dev/null +++ b/src/seastar/demos/tcp_demo.cc @@ -0,0 +1,75 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/net/ip.hh> +#include <seastar/net/virtio.hh> +#include <seastar/net/tcp.hh> +#include <seastar/net/native-stack.hh> +#include <seastar/core/reactor.hh> +#include <fmt/printf.h> + +using namespace seastar; +using namespace net; + +struct tcp_test { + ipv4& inet; + using tcp = net::tcp<ipv4_traits>; + tcp::listener _listener; + struct connection { + tcp::connection tcp_conn; + explicit connection(tcp::connection tc) : tcp_conn(std::move(tc)) {} + void run() { + // Read packets and echo back in the background. + (void)tcp_conn.wait_for_data().then([this] { + auto p = tcp_conn.read(); + if (!p.len()) { + tcp_conn.close_write(); + return; + } + fmt::print("read {:d} bytes\n", p.len()); + (void)tcp_conn.send(std::move(p)); + run(); + }); + } + }; + tcp_test(ipv4& inet) : inet(inet), _listener(inet.get_tcp().listen(10000)) {} + void run() { + // Run all connections in the background. + (void)_listener.accept().then([this] (tcp::connection conn) { + (new connection(std::move(conn)))->run(); + run(); + }); + } +}; + +int main(int ac, char** av) { + native_stack_options opts; + + auto vnet = create_virtio_net_device(opts.virtio_opts, opts.lro); + interface netif(std::move(vnet)); + ipv4 inet(&netif); + inet.set_host_address(ipv4_address("192.168.122.2")); + tcp_test tt(inet); + (void)engine().when_started().then([&tt] { tt.run(); }); + engine().run(); +} + + diff --git a/src/seastar/demos/tcp_sctp_client_demo.cc b/src/seastar/demos/tcp_sctp_client_demo.cc new file mode 100644 index 000000000..3de72776e --- /dev/null +++ b/src/seastar/demos/tcp_sctp_client_demo.cc @@ -0,0 +1,279 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <iostream> +#include <seastar/core/app-template.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/print.hh> +#include <seastar/core/units.hh> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +static int rx_msg_size = 4_KiB; +static int tx_msg_total_size = 100_MiB; +static int tx_msg_size = 4_KiB; +static int tx_msg_nr = tx_msg_total_size / tx_msg_size; +static std::string str_txbuf(tx_msg_size, 'X'); + +class client; +distributed<client> clients; + +transport protocol = transport::TCP; + +class client { +private: + static constexpr unsigned _pings_per_connection = 10000; + unsigned _total_pings; + unsigned _concurrent_connections; + ipv4_addr _server_addr; + std::string _test; + lowres_clock::time_point _earliest_started = lowres_clock::time_point::max(); + lowres_clock::time_point _latest_finished = lowres_clock::time_point::min(); + size_t _processed_bytes; + unsigned _num_reported; +public: + class connection { + connected_socket _fd; + input_stream<char> _read_buf; + output_stream<char> _write_buf; + size_t _bytes_read = 0; + size_t _bytes_write = 0; + public: + connection(connected_socket&& fd) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) {} + + future<> do_read() { + return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { + _bytes_read += buf.size(); + if (buf.size() == 0) { + return make_ready_future(); + } else { + return do_read(); + } + }); + } + + future<> do_write(int end) { + if (end == 0) { + return make_ready_future(); + } + return _write_buf.write(str_txbuf).then([this] { + _bytes_write += tx_msg_size; + return _write_buf.flush(); + }).then([this, end] { + return do_write(end - 1); + }); + } + + future<> ping(int times) { + return _write_buf.write("ping").then([this] { + return _write_buf.flush(); + }).then([this, times] { + return _read_buf.read_exactly(4).then([this, times] (temporary_buffer<char> buf) { + if (buf.size() != 4) { + fmt::print(std::cerr, "illegal packet received: {}\n", buf.size()); + return make_ready_future(); + } + auto str = std::string(buf.get(), buf.size()); + if (str != "pong") { + fmt::print(std::cerr, "illegal packet received: {}\n", buf.size()); + return make_ready_future(); + } + if (times > 0) { + return ping(times - 1); + } else { + return make_ready_future(); + } + }); + }); + } + + future<size_t> rxrx() { + return _write_buf.write("rxrx").then([this] { + return _write_buf.flush(); + }).then([this] { + return do_write(tx_msg_nr).then([this] { + return _write_buf.close(); + }).then([this] { + return make_ready_future<size_t>(_bytes_write); + }); + }); + } + + future<size_t> txtx() { + return _write_buf.write("txtx").then([this] { + return _write_buf.flush(); + }).then([this] { + return do_read().then([this] { + return make_ready_future<size_t>(_bytes_read); + }); + }); + } + }; + + future<> ping_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->ping(_pings_per_connection).then([started] { + auto finished = lowres_clock::now(); + (void)clients.invoke_on(0, &client::ping_report, started, finished); + }); + } + + future<> rxrx_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->rxrx().then([started] (size_t bytes) { + auto finished = lowres_clock::now(); + (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); + }); + } + + future<> txtx_test(connection *conn) { + auto started = lowres_clock::now(); + return conn->txtx().then([started] (size_t bytes) { + auto finished = lowres_clock::now(); + (void)clients.invoke_on(0, &client::rxtx_report, started, finished, bytes); + }); + } + + void ping_report(lowres_clock::time_point started, lowres_clock::time_point finished) { + if (_earliest_started > started) + _earliest_started = started; + if (_latest_finished < finished) + _latest_finished = finished; + if (++_num_reported == _concurrent_connections) { + auto elapsed = _latest_finished - _earliest_started; + auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); + auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); + fmt::print(std::cout, "========== ping ============\n"); + fmt::print(std::cout, "Server: {}\n", _server_addr); + fmt::print(std::cout,"Connections: {}\n", _concurrent_connections); + fmt::print(std::cout, "Total PingPong: {}\n", _total_pings); + fmt::print(std::cout, "Total Time(Secs): {}\n", secs); + fmt::print(std::cout, "Requests/Sec: {}\n", + static_cast<double>(_total_pings) / secs); + (void)clients.stop().then([] { + engine().exit(0); + }); + } + } + + void rxtx_report(lowres_clock::time_point started, lowres_clock::time_point finished, size_t bytes) { + if (_earliest_started > started) + _earliest_started = started; + if (_latest_finished < finished) + _latest_finished = finished; + _processed_bytes += bytes; + if (++_num_reported == _concurrent_connections) { + auto elapsed = _latest_finished - _earliest_started; + auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count(); + auto secs = static_cast<double>(usecs) / static_cast<double>(1000 * 1000); + fmt::print(std::cout, "========== {} ============\n", _test); + fmt::print(std::cout, "Server: {}\n", _server_addr); + fmt::print(std::cout, "Connections: {}\n", _concurrent_connections); + fmt::print(std::cout, "Bytes Received(MiB): {}\n", _processed_bytes / 1_MiB); + fmt::print(std::cout, "Total Time(Secs): {}\n", secs); + fmt::print(std::cout, "Bandwidth(Gbits/Sec): {}\n", + static_cast<double>((_processed_bytes * 8)) / (1000 * 1000 * 1000) / secs); + (void)clients.stop().then([] { + engine().exit(0); + }); + } + } + + future<> start(ipv4_addr server_addr, std::string test, unsigned ncon) { + _server_addr = server_addr; + _concurrent_connections = ncon * smp::count; + _total_pings = _pings_per_connection * _concurrent_connections; + _test = test; + + for (unsigned i = 0; i < ncon; i++) { + socket_address local = socket_address(::sockaddr_in{AF_INET, INADDR_ANY, {0}}); + (void)connect(make_ipv4_address(server_addr), local, protocol).then([this, test] (connected_socket fd) { + auto conn = new connection(std::move(fd)); + (void)(this->*tests.at(test))(conn).then_wrapped([conn] (auto&& f) { + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + fmt::print(std::cerr, "request error: {}\n", ex.what()); + } + }); + }); + } + return make_ready_future(); + } + future<> stop() { + return make_ready_future(); + } + + typedef future<> (client::*test_fn)(connection *conn); + static const std::map<std::string, test_fn> tests; +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + app_template app; + app.add_options() + ("server", bpo::value<std::string>()->required(), "Server address") + ("test", bpo::value<std::string>()->default_value("ping"), "test type(ping | rxrx | txtx)") + ("conn", bpo::value<unsigned>()->default_value(16), "nr connections per cpu") + ("proto", bpo::value<std::string>()->default_value("tcp"), "transport protocol tcp|sctp") + ; + + return app.run_deprecated(ac, av, [&app] { + auto&& config = app.configuration(); + auto server = config["server"].as<std::string>(); + auto test = config["test"].as<std::string>(); + auto ncon = config["conn"].as<unsigned>(); + auto proto = config["proto"].as<std::string>(); + + if (proto == "tcp") { + protocol = transport::TCP; + } else if (proto == "sctp") { + protocol = transport::SCTP; + } else { + fmt::print(std::cerr, "Error: --proto=tcp|sctp\n"); + return engine().exit(1); + } + + if (!client::tests.count(test)) { + fmt::print(std::cerr, "Error: -test=ping | rxrx | txtx\n"); + return engine().exit(1); + } + + (void)clients.start().then([server, test, ncon] () { + return clients.invoke_on_all(&client::start, ipv4_addr{server}, test, ncon); + }); + }); +} + +const std::map<std::string, client::test_fn> client::tests = { + {"ping", &client::ping_test}, + {"rxrx", &client::rxrx_test}, + {"txtx", &client::txtx_test}, +}; + diff --git a/src/seastar/demos/tcp_sctp_server_demo.cc b/src/seastar/demos/tcp_sctp_server_demo.cc new file mode 100644 index 000000000..6918ab2ec --- /dev/null +++ b/src/seastar/demos/tcp_sctp_server_demo.cc @@ -0,0 +1,205 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2014 Cloudius Systems + */ + +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/temporary_buffer.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/print.hh> +#include <vector> +#include <iostream> + +using namespace seastar; + +static std::string str_ping{"ping"}; +static std::string str_txtx{"txtx"}; +static std::string str_rxrx{"rxrx"}; +static std::string str_pong{"pong"}; +static std::string str_unknow{"unknow cmd"}; +static int tx_msg_total_size = 100 * 1024 * 1024; +static int tx_msg_size = 4 * 1024; +static int tx_msg_nr = tx_msg_total_size / tx_msg_size; +static int rx_msg_size = 4 * 1024; +static std::string str_txbuf(tx_msg_size, 'X'); +static bool enable_tcp = false; +static bool enable_sctp = false; + +class tcp_server { + std::vector<server_socket> _tcp_listeners; + std::vector<server_socket> _sctp_listeners; +public: + future<> listen(ipv4_addr addr) { + if (enable_tcp) { + listen_options lo; + lo.proto = transport::TCP; + lo.reuse_address = true; + _tcp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); + do_accepts(_tcp_listeners); + } + + if (enable_sctp) { + listen_options lo; + lo.proto = transport::SCTP; + lo.reuse_address = true; + _sctp_listeners.push_back(seastar::listen(make_ipv4_address(addr), lo)); + do_accepts(_sctp_listeners); + } + return make_ready_future<>(); + } + + // FIXME: We should properly tear down the service here. + future<> stop() { + return make_ready_future<>(); + } + + void do_accepts(std::vector<server_socket>& listeners) { + int which = listeners.size() - 1; + // Accept in the background. + (void)listeners[which].accept().then([this, &listeners] (accept_result ar) mutable { + connected_socket fd = std::move(ar.connection); + socket_address addr = std::move(ar.remote_address); + auto conn = new connection(*this, std::move(fd), addr); + (void)conn->process().then_wrapped([conn] (auto&& f) { + delete conn; + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "request error " << ex.what() << "\n"; + } + }); + do_accepts(listeners); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (std::exception& ex) { + std::cout << "accept failed: " << ex.what() << "\n"; + } + }); + } + class connection { + connected_socket _fd; + input_stream<char> _read_buf; + output_stream<char> _write_buf; + public: + connection(tcp_server& server, connected_socket&& fd, socket_address addr) + : _fd(std::move(fd)) + , _read_buf(_fd.input()) + , _write_buf(_fd.output()) {} + future<> process() { + return read(); + } + future<> read() { + if (_read_buf.eof()) { + return make_ready_future(); + } + // Expect 4 bytes cmd from client + size_t n = 4; + return _read_buf.read_exactly(n).then([this] (temporary_buffer<char> buf) { + if (buf.size() == 0) { + return make_ready_future(); + } + auto cmd = std::string(buf.get(), buf.size()); + // pingpong test + if (cmd == str_ping) { + return _write_buf.write(str_pong).then([this] { + return _write_buf.flush(); + }).then([this] { + return this->read(); + }); + // server tx test + } else if (cmd == str_txtx) { + return tx_test(); + // server tx test + } else if (cmd == str_rxrx) { + return rx_test(); + // unknow test + } else { + return _write_buf.write(str_unknow).then([this] { + return _write_buf.flush(); + }).then([] { + return make_ready_future(); + }); + } + }); + } + future<> do_write(int end) { + if (end == 0) { + return make_ready_future<>(); + } + return _write_buf.write(str_txbuf).then([this] { + return _write_buf.flush(); + }).then([this, end] { + return do_write(end - 1); + }); + } + future<> tx_test() { + return do_write(tx_msg_nr).then([this] { + return _write_buf.close(); + }).then([] { + return make_ready_future<>(); + }); + } + future<> do_read() { + return _read_buf.read_exactly(rx_msg_size).then([this] (temporary_buffer<char> buf) { + if (buf.size() == 0) { + return make_ready_future(); + } else { + return do_read(); + } + }); + } + future<> rx_test() { + return do_read().then([] { + return make_ready_future<>(); + }); + } + }; +}; + +namespace bpo = boost::program_options; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "TCP server port") + ("tcp", bpo::value<std::string>()->default_value("yes"), "tcp listen") + ("sctp", bpo::value<std::string>()->default_value("no"), "sctp listen") ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + enable_tcp = config["tcp"].as<std::string>() == "yes"; + enable_sctp = config["sctp"].as<std::string>() == "yes"; + if (!enable_tcp && !enable_sctp) { + fmt::print(std::cerr, "Error: no protocols enabled. Use \"--tcp yes\" and/or \"--sctp yes\" to enable\n"); + return engine().exit(1); + } + auto server = new distributed<tcp_server>; + (void)server->start().then([server = std::move(server), port] () mutable { + engine().at_exit([server] { + return server->stop(); + }); + // Start listening in the background. + (void)server->invoke_on_all(&tcp_server::listen, ipv4_addr{port}); + }).then([port] { + std::cout << "Seastar TCP server listening on port " << port << " ...\n"; + }); + }); +} diff --git a/src/seastar/demos/tls_echo_server.hh b/src/seastar/demos/tls_echo_server.hh new file mode 100644 index 000000000..e7185dac9 --- /dev/null +++ b/src/seastar/demos/tls_echo_server.hh @@ -0,0 +1,121 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2017 ScyllaDB + */ +#pragma once + +#include <seastar/core/sstring.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/sharded.hh> +#include <seastar/core/gate.hh> +#include <seastar/core/loop.hh> +#include <seastar/net/tls.hh> +#include <seastar/util/log.hh> +#include <iostream> + +using namespace seastar; + +struct streams { + connected_socket s; + input_stream<char> in; + output_stream<char> out; + + streams(connected_socket cs) : s(std::move(cs)), in(s.input()), out(s.output()) + {} +}; + +class echoserver { + server_socket _socket; + shared_ptr<tls::server_credentials> _certs; + seastar::gate _gate; + bool _stopped = false; + bool _verbose = false; +public: + echoserver(bool verbose = false) + : _certs(make_shared<tls::server_credentials>(make_shared<tls::dh_params>())) + , _verbose(verbose) + {} + + future<> listen(socket_address addr, sstring crtfile, sstring keyfile, tls::client_auth ca = tls::client_auth::NONE) { + _certs->set_client_auth(ca); + return _certs->set_x509_key_file(crtfile, keyfile, tls::x509_crt_format::PEM).then([this, addr] { + ::listen_options opts; + opts.reuse_address = true; + + _socket = tls::listen(_certs, addr, opts); + + // Listen in background. + (void)repeat([this] { + if (_stopped) { + return make_ready_future<stop_iteration>(stop_iteration::yes); + } + return with_gate(_gate, [this] { + return _socket.accept().then([this](accept_result ar) { + ::connected_socket s = std::move(ar.connection); + socket_address a = std::move(ar.remote_address); + if (_verbose) { + std::cout << "Got connection from "<< a << std::endl; + } + auto strms = make_lw_shared<streams>(std::move(s)); + return repeat([strms, this]() { + return strms->in.read().then([this, strms](temporary_buffer<char> buf) { + if (buf.empty()) { + if (_verbose) { + std::cout << "EOM" << std::endl; + } + return make_ready_future<stop_iteration>(stop_iteration::yes); + } + sstring tmp(buf.begin(), buf.end()); + if (_verbose) { + std::cout << "Read " << tmp.size() << "B" << std::endl; + } + return strms->out.write(tmp).then([strms]() { + return strms->out.flush(); + }).then([] { + return make_ready_future<stop_iteration>(stop_iteration::no); + }); + }); + }).then([strms]{ + return strms->out.close(); + }).handle_exception([](auto ep) { + }).finally([this, strms]{ + if (_verbose) { + std::cout << "Ending session" << std::endl; + } + return strms->in.close(); + }); + }).handle_exception([this](auto ep) { + if (!_stopped) { + std::cerr << "Error: " << ep << std::endl; + } + }).then([this] { + return make_ready_future<stop_iteration>(_stopped ? stop_iteration::yes : stop_iteration::no); + }); + }); + }); + return make_ready_future(); + }); + } + + future<> stop() { + _stopped = true; + _socket.abort_accept(); + return _gate.close(); + } +}; diff --git a/src/seastar/demos/tls_echo_server_demo.cc b/src/seastar/demos/tls_echo_server_demo.cc new file mode 100644 index 000000000..222412019 --- /dev/null +++ b/src/seastar/demos/tls_echo_server_demo.cc @@ -0,0 +1,67 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/sleep.hh> +#include <seastar/net/dns.hh> +#include "tls_echo_server.hh" + +using namespace seastar; +namespace bpo = boost::program_options; + + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "Server port") + ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Server address") + ("cert,c", bpo::value<std::string>()->required(), "Server certificate file") + ("key,k", bpo::value<std::string>()->required(), "Certificate key") + ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose") + ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto crt = config["cert"].as<std::string>(); + auto key = config["key"].as<std::string>(); + auto addr = config["address"].as<std::string>(); + auto verbose = config["verbose"].as<bool>(); + + std::cout << "Starting..." << std::endl; + return net::dns::resolve_name(addr).then([=](net::inet_address a) { + ipv4_addr ia(a, port); + + auto server = ::make_shared<seastar::sharded<echoserver>>(); + return server->start(verbose).then([=]() { + return server->invoke_on_all(&echoserver::listen, socket_address(ia), sstring(crt), sstring(key), tls::client_auth::NONE); + }).handle_exception([=](auto e) { + std::cerr << "Error: " << e << std::endl; + engine().exit(1); + }).then([=] { + std::cout << "TLS echo server running at " << addr << ":" << port << std::endl; + engine().at_exit([server] { + return server->stop().finally([server] {}); + }); + }); + }); + }); +} diff --git a/src/seastar/demos/tls_simple_client_demo.cc b/src/seastar/demos/tls_simple_client_demo.cc new file mode 100644 index 000000000..d53461b0e --- /dev/null +++ b/src/seastar/demos/tls_simple_client_demo.cc @@ -0,0 +1,133 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include <cmath> + +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/loop.hh> +#include <seastar/net/dns.hh> +#include "tls_echo_server.hh" + +using namespace seastar; +namespace bpo = boost::program_options; + + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "Remote port") + ("address", bpo::value<std::string>()->default_value("127.0.0.1"), "Remote address") + ("trust,t", bpo::value<std::string>(), "Trust store") + ("msg,m", bpo::value<std::string>(), "Message to send") + ("bytes,b", bpo::value<size_t>()->default_value(512), "Use random bytes of length as message") + ("iterations,i", bpo::value<size_t>()->default_value(1), "Repeat X times") + ("read-response,r", bpo::value<bool>()->default_value(true)->implicit_value(true), "Read echoed message") + ("verbose,v", bpo::value<bool>()->default_value(false)->implicit_value(true), "Verbose operation") + ("check-name,c", bpo::value<bool>()->default_value(false)->implicit_value(true), "Check server name") + ("server-name,s", bpo::value<std::string>(), "Expected server name") + ; + + + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto addr = config["address"].as<std::string>(); + auto n = config["bytes"].as<size_t>(); + auto i = config["iterations"].as<size_t>(); + auto do_read = config["read-response"].as<bool>(); + auto verbose = config["verbose"].as<bool>(); + auto check = config["check-name"].as<bool>(); + + std::cout << "Starting..." << std::endl; + + auto certs = ::make_shared<tls::certificate_credentials>(); + auto f = make_ready_future(); + + if (config.count("trust")) { + f = certs->set_x509_trust_file(config["trust"].as<std::string>(), tls::x509_crt_format::PEM); + } + + seastar::shared_ptr<sstring> msg; + + if (config.count("msg")) { + msg = seastar::make_shared<sstring>(config["msg"].as<std::string>()); + } else { + msg = seastar::make_shared<sstring>(uninitialized_string(n)); + for (size_t i = 0; i < n; ++i) { + (*msg)[i] = '0' + char(::rand() % 30); + } + } + + sstring server_name; + if (config.count("server-name")) { + server_name = config["server-name"].as<std::string>(); + } + if (verbose) { + std::cout << "Msg (" << msg->size() << "B):" << std::endl << *msg << std::endl; + } + return f.then([=]() { + return net::dns::get_host_by_name(addr).then([=](net::hostent e) { + ipv4_addr ia(e.addr_list.front(), port); + + sstring name; + if (check) { + name = server_name.empty() ? e.names.front() : server_name; + } + return tls::connect(certs, ia, name).then([=](::connected_socket s) { + auto strms = ::make_lw_shared<streams>(std::move(s)); + auto range = boost::irange(size_t(0), i); + return do_for_each(range, [=](auto) { + auto f = strms->out.write(*msg); + if (!do_read) { + return strms->out.close().then([f = std::move(f)]() mutable { + return std::move(f); + }); + } + return f.then([=]() { + return strms->out.flush().then([=] { + return strms->in.read_exactly(msg->size()).then([=](temporary_buffer<char> buf) { + sstring tmp(buf.begin(), buf.end()); + if (tmp != *msg) { + std::cerr << "Got garbled message!" << std::endl; + if (verbose) { + std::cout << "Got (" << tmp.size() << ") :" << std::endl << tmp << std::endl; + } + throw std::runtime_error("Got garbled message!"); + } + }); + }); + }); + }).then([strms, do_read]{ + return do_read ? strms->out.close() : make_ready_future<>(); + }).finally([strms]{ + return strms->in.close(); + }); + }); + }).handle_exception([](auto ep) { + std::cerr << "Error: " << ep << std::endl; + }); + }).finally([] { + engine().exit(0); + }); + }); +} diff --git a/src/seastar/demos/tutorial_examples.cc b/src/seastar/demos/tutorial_examples.cc new file mode 100644 index 000000000..0f67f5fd8 --- /dev/null +++ b/src/seastar/demos/tutorial_examples.cc @@ -0,0 +1,117 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2020 ScyllaDB. + */ + +#include <iostream> + +#include <seastar/core/seastar.hh> +#include <seastar/core/reactor.hh> +#include <seastar/core/future-util.hh> +#include <seastar/net/api.hh> + +seastar::future<> service_loop() { + return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234})), + [] (auto& listener) { + return seastar::keep_doing([&listener] () { + return listener.accept().then( + [] (seastar::accept_result res) { + std::cout << "Accepted connection from " << res.remote_address << "\n"; + }); + }); + }); +} + +const char* canned_response = "Seastar is the future!\n"; + +seastar::future<> service_loop_2() { + seastar::listen_options lo; + lo.reuse_address = true; + return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo), + [] (auto& listener) { + return seastar::keep_doing([&listener] () { + return listener.accept().then( + [] (seastar::accept_result res) { + auto s = std::move(res.connection); + auto out = s.output(); + return seastar::do_with(std::move(s), std::move(out), + [] (auto& s, auto& out) { + return out.write(canned_response).then([&out] { + return out.close(); + }); + }); + }); + }); + }); +} + +seastar::future<> handle_connection_3(seastar::connected_socket s, + seastar::socket_address a) { + auto out = s.output(); + auto in = s.input(); + return do_with(std::move(s), std::move(out), std::move(in), + [] (auto& s, auto& out, auto& in) { + return seastar::repeat([&out, &in] { + return in.read().then([&out] (auto buf) { + if (buf) { + return out.write(std::move(buf)).then([&out] { + return out.flush(); + }).then([] { + return seastar::stop_iteration::no; + }); + } else { + return seastar::make_ready_future<seastar::stop_iteration>( + seastar::stop_iteration::yes); + } + }); + }).then([&out] { + return out.close(); + }); + }); +} + +seastar::future<> service_loop_3() { + seastar::listen_options lo; + lo.reuse_address = true; + return seastar::do_with(seastar::listen(seastar::make_ipv4_address({1234}), lo), + [] (auto& listener) { + return seastar::keep_doing([&listener] () { + return listener.accept().then( + [] (seastar::accept_result res) { + // Note we ignore, not return, the future returned by + // handle_connection(), so we do not wait for one + // connection to be handled before accepting the next one. + (void)handle_connection_3(std::move(res.connection), std::move(res.remote_address)).handle_exception( + [] (std::exception_ptr ep) { + fmt::print(stderr, "Could not handle connection: {}\n", ep); + }); + }); + }); + }); +} + +#include <seastar/core/app-template.hh> + +int main(int ac, char** av) { + seastar::app_template app; + return app.run(ac, av, [] { + std::cout << "This is the tutorial examples demo. It is not running anything but rather makes sure the tutorial examples compile" << std::endl; + return seastar::make_ready_future<>(); + }); +} diff --git a/src/seastar/demos/udp_client_demo.cc b/src/seastar/demos/udp_client_demo.cc new file mode 100644 index 000000000..016b03c21 --- /dev/null +++ b/src/seastar/demos/udp_client_demo.cc @@ -0,0 +1,89 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/core/app-template.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/timer.hh> +#include <seastar/net/api.hh> +#include <iostream> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +class client { +private: + udp_channel _chan; + uint64_t n_sent {}; + uint64_t n_received {}; + uint64_t n_failed {}; + timer<> _stats_timer; +public: + void start(ipv4_addr server_addr) { + std::cout << "Sending to " << server_addr << std::endl; + + _chan = make_udp_channel(); + + _stats_timer.set_callback([this] { + std::cout << "Out: " << n_sent << " pps, \t"; + std::cout << "Err: " << n_failed << " pps, \t"; + std::cout << "In: " << n_received << " pps" << std::endl; + n_sent = 0; + n_received = 0; + n_failed = 0; + }); + _stats_timer.arm_periodic(1s); + + // Run sender in background. + (void)keep_doing([this, server_addr] { + return _chan.send(server_addr, "hello!\n") + .then_wrapped([this] (auto&& f) { + try { + f.get(); + n_sent++; + } catch (...) { + n_failed++; + } + }); + }); + + // Run receiver in background. + (void)keep_doing([this] { + return _chan.receive().then([this] (auto) { + n_received++; + }); + }); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + client _client; + app_template app; + app.add_options() + ("server", bpo::value<std::string>(), "Server address") + ; + return app.run_deprecated(ac, av, [&_client, &app] { + auto&& config = app.configuration(); + _client.start(config["server"].as<std::string>()); + }); +} diff --git a/src/seastar/demos/udp_server_demo.cc b/src/seastar/demos/udp_server_demo.cc new file mode 100644 index 000000000..ec25cce5d --- /dev/null +++ b/src/seastar/demos/udp_server_demo.cc @@ -0,0 +1,82 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <iostream> +#include <seastar/core/reactor.hh> +#include <seastar/core/distributed.hh> +#include <seastar/core/app-template.hh> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; + +class udp_server { +private: + udp_channel _chan; + timer<> _stats_timer; + uint64_t _n_sent {}; +public: + void start(uint16_t port) { + ipv4_addr listen_addr{port}; + _chan = make_udp_channel(listen_addr); + + _stats_timer.set_callback([this] { + std::cout << "Out: " << _n_sent << " pps" << std::endl; + _n_sent = 0; + }); + _stats_timer.arm_periodic(1s); + + // Run server in background. + (void)keep_doing([this] { + return _chan.receive().then([this] (udp_datagram dgram) { + return _chan.send(dgram.get_src(), std::move(dgram.get_data())).then([this] { + _n_sent++; + }); + }); + }); + } + // FIXME: we should properly tear down the service here. + future<> stop() { + return make_ready_future<>(); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + app_template app; + app.add_options() + ("port", bpo::value<uint16_t>()->default_value(10000), "UDP server port") ; + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as<uint16_t>(); + auto server = new distributed<udp_server>; + // Run server in background. + (void)server->start().then([server = std::move(server), port] () mutable { + engine().at_exit([server] { + return server->stop(); + }); + return server->invoke_on_all(&udp_server::start, port); + }).then([port] { + std::cout << "Seastar UDP server listening on port " << port << " ...\n"; + }); + }); +} diff --git a/src/seastar/demos/udp_zero_copy_demo.cc b/src/seastar/demos/udp_zero_copy_demo.cc new file mode 100644 index 000000000..a0d7733a3 --- /dev/null +++ b/src/seastar/demos/udp_zero_copy_demo.cc @@ -0,0 +1,153 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/core/seastar.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/scattered_message.hh> +#include <seastar/core/vector-data-sink.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/units.hh> +#include <seastar/core/timer.hh> +#include <seastar/net/api.hh> +#include <random> +#include <iomanip> +#include <iostream> + +using namespace seastar; +using namespace net; +using namespace std::chrono_literals; +namespace bpo = boost::program_options; + +template <typename Duration> +typename Duration::rep to_seconds(Duration d) { + return std::chrono::duration_cast<std::chrono::seconds>(d).count(); +} + +class server { +private: + udp_channel _chan; + timer<> _stats_timer; + uint64_t _n_sent {}; + size_t _chunk_size; + bool _copy; + std::vector<packet> _packets; + std::unique_ptr<output_stream<char>> _out; + steady_clock_type::time_point _last; + sstring _key; + size_t _packet_size = 8*KB; + char* _mem; + size_t _mem_size; + std::mt19937 _rnd; + std::random_device _randem_dev; + std::uniform_int_distribution<size_t> _chunk_distribution; +private: + char* next_chunk() { + return _mem + _chunk_distribution(_rnd); + } +public: + server() + : _rnd(std::random_device()()) { + } + future<> send(ipv4_addr dst, packet p) { + return _chan.send(dst, std::move(p)).then([this] { + _n_sent++; + }); + } + void start(int chunk_size, bool copy, size_t mem_size) { + ipv4_addr listen_addr{10000}; + _chan = make_udp_channel(listen_addr); + + std::cout << "Listening on " << listen_addr << std::endl; + + _last = steady_clock_type::now(); + _stats_timer.set_callback([this] { + auto now = steady_clock_type::now(); + std::cout << "Out: " + << std::setprecision(2) << std::fixed + << (double)_n_sent / to_seconds(now - _last) + << " pps" << std::endl; + _last = now; + _n_sent = 0; + }); + _stats_timer.arm_periodic(1s); + + _chunk_size = chunk_size; + _copy = copy; + _key = sstring(new char[64], 64); + + _out = std::make_unique<output_stream<char>>( + data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size); + + _mem = new char[mem_size]; + _mem_size = mem_size; + + _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3); + + assert(3 * _chunk_size <= _packet_size); + + // Run sender in background. + (void)keep_doing([this] { + return _chan.receive().then([this] (udp_datagram dgram) { + auto chunk = next_chunk(); + lw_shared_ptr<sstring> item; + if (_copy) { + _packets.clear(); + // FIXME: future is discarded + (void)_out->write(chunk, _chunk_size); + chunk += _chunk_size; + (void)_out->write(chunk, _chunk_size); + chunk += _chunk_size; + (void)_out->write(chunk, _chunk_size); + (void)_out->flush(); + assert(_packets.size() == 1); + return send(dgram.get_src(), std::move(_packets[0])); + } else { + auto chunk = next_chunk(); + scattered_message<char> msg; + msg.reserve(3); + msg.append_static(chunk, _chunk_size); + msg.append_static(chunk, _chunk_size); + msg.append_static(chunk, _chunk_size); + return send(dgram.get_src(), std::move(msg).release()); + } + }); + }); + } +}; + +int main(int ac, char ** av) { + server s; + app_template app; + app.add_options() + ("chunk-size", bpo::value<int>()->default_value(1024), + "Chunk size") + ("mem-size", bpo::value<int>()->default_value(512), + "Memory pool size in MiB") + ("copy", "Copy data rather than send via zero-copy") + ; + return app.run_deprecated(ac, av, [&app, &s] { + auto&& config = app.configuration(); + auto chunk_size = config["chunk-size"].as<int>(); + auto mem_size = (size_t)config["mem-size"].as<int>() * MB; + auto copy = config.count("copy"); + s.start(chunk_size, copy, mem_size); + }); +} diff --git a/src/seastar/demos/websocket_demo.cc b/src/seastar/demos/websocket_demo.cc new file mode 100644 index 000000000..c19cb0fc2 --- /dev/null +++ b/src/seastar/demos/websocket_demo.cc @@ -0,0 +1,66 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2021 ScyllaDB Ltd. + */ + +#include <iostream> +#include <seastar/websocket/server.hh> +#include <seastar/core/app-template.hh> +#include <seastar/core/fstream.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/seastar.hh> +#include <seastar/core/loop.hh> +#include <seastar/core/thread.hh> +#include <seastar/util/defer.hh> + +using namespace seastar; +using namespace seastar::experimental; + +int main(int argc, char** argv) { + seastar::app_template app; + app.run(argc, argv, [] () -> seastar::future<> { + return async([] { + static websocket::server ws; + ws.register_handler("echo", [] (input_stream<char>& in, + output_stream<char>& out) { + return repeat([&in, &out]() { + return in.read().then([&out](temporary_buffer<char> f) { + std::cerr << "f.size(): " << f.size() << "\n"; + if (f.empty()) { + return make_ready_future<stop_iteration>(stop_iteration::yes); + } else { + return out.write(std::move(f)).then([&out]() { + return out.flush().then([] { + return make_ready_future<stop_iteration>(stop_iteration::no); + }); + }); + } + }); + }); + }); + auto d = defer([] () noexcept { + ws.stop().get(); + }); + ws.listen(socket_address(ipv4_addr("127.0.0.1", 8123))); + std::cout << "Listening on 127.0.0.1:8123 for 1 hour (interruptible, hit Ctrl-C to stop)..." << std::endl; + seastar::sleep_abortable(std::chrono::hours(1)).get(); + std::cout << "Stopping the server, deepest thanks to all clients, hope we meet again" << std::endl; + }); + }); +} |