From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/seastar/demos/rpc_demo.cc | 299 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 src/seastar/demos/rpc_demo.cc (limited to 'src/seastar/demos/rpc_demo.cc') diff --git a/src/seastar/demos/rpc_demo.cc b/src/seastar/demos/rpc_demo.cc new file mode 100644 index 00000000..899af1c1 --- /dev/null +++ b/src/seastar/demos/rpc_demo.cc @@ -0,0 +1,299 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#include +#include +#include +#include +#include +#include + +using namespace seastar; + +struct serializer { +}; + +template +inline +void write_arithmetic_type(Output& out, T v) { + static_assert(std::is_arithmetic::value, "must be arithmetic type"); + return out.write(reinterpret_cast(&v), sizeof(T)); +} + +template +inline +T read_arithmetic_type(Input& in) { + static_assert(std::is_arithmetic::value, "must be arithmetic type"); + T v; + in.read(reinterpret_cast(&v), sizeof(T)); + return v; +} + +template +inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } +template +inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } +template +inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } +template +inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } +template +inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } +template +inline int32_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } +template +inline uint32_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } +template +inline uint64_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } +template +inline uint64_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } +template +inline double read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } + +template +inline void write(serializer, Output& out, const sstring& v) { + write_arithmetic_type(out, uint32_t(v.size())); + out.write(v.c_str(), v.size()); +} + +template +inline sstring read(serializer, Input& in, rpc::type) { + auto size = read_arithmetic_type(in); + sstring ret(sstring::initialized_later(), size); + in.read(ret.begin(), size); + return ret; +} + +namespace bpo = boost::program_options; +using namespace std::chrono_literals; + +class mycomp : public rpc::compressor::factory { + const sstring _name = "LZ4"; +public: + virtual const sstring& supported() const override { + fmt::print("supported called\n"); + return _name; + } + virtual std::unique_ptr negotiate(sstring feature, bool is_server) const override { + fmt::print("negotiate called with {}\n", feature); + return feature == _name ? std::make_unique() : nullptr; + } +}; + +int main(int ac, char** av) { + app_template app; + app.add_options() + ("port", bpo::value()->default_value(10000), "RPC server port") + ("server", bpo::value(), "Server address") + ("compress", bpo::value()->default_value(false), "Compress RPC traffic"); + std::cout << "start "; + rpc::protocol myrpc(serializer{}); + static std::unique_ptr::server> server; + static std::unique_ptr::client> client; + static double x = 30.0; + + myrpc.set_logger([] (const sstring& log) { + fmt::print("{}", log); + std::cout << std::endl; + }); + + return app.run_deprecated(ac, av, [&] { + auto&& config = app.configuration(); + uint16_t port = config["port"].as(); + bool compress = config["compress"].as(); + static mycomp mc; + auto test1 = myrpc.register_handler(1, [x = 0](int i) mutable { fmt::print("test1 count {:d} got {:d}\n", ++x, i); }); + auto test2 = myrpc.register_handler(2, [](int a, int b){ fmt::print("test2 got {:d} {:d}\n", a, b); return make_ready_future(a+b); }); + auto test3 = myrpc.register_handler(3, [](double x){ fmt::print("test3 got {:f}\n", x); return std::make_unique(sin(x)); }); + auto test4 = myrpc.register_handler(4, [](){ fmt::print("test4 throw!\n"); throw std::runtime_error("exception!"); }); + auto test5 = myrpc.register_handler(5, [](){ fmt::print("test5 no wait\n"); return rpc::no_wait; }); + auto test6 = myrpc.register_handler(6, [](const rpc::client_info& info, int x){ fmt::print("test6 client {}, {:d}\n", inet_ntoa(info.addr.as_posix_sockaddr_in().sin_addr), x); }); + auto test8 = myrpc.register_handler(8, [](){ fmt::print("test8 sleep for 2 sec\n"); return sleep(2s); }); + auto test13 = myrpc.register_handler(13, [](){ fmt::print("test13 sleep for 1 msec\n"); return sleep(1ms); }); + auto test_message_to_big = myrpc.register_handler(14, [](sstring payload){ fmt::print("test message to bit, should not get here"); }); + + if (config.count("server")) { + std::cout << "client" << std::endl; + auto test7 = myrpc.make_client(7); + auto test9 = myrpc.make_client(9); // do not send optional + auto test9_1 = myrpc.make_client(9); // send optional + auto test9_2 = myrpc.make_client(9); // send more data than handler expects + auto test10 = myrpc.make_client(10); // receive less then replied + auto test10_1 = myrpc.make_client ()>(10); // receive all + auto test11 = myrpc.make_client> ()>(11); // receive more then replied + auto test12 = myrpc.make_client(12); // large payload vs. server limits + auto test_nohandler = myrpc.make_client(100000000); // non existing verb + auto test_nohandler_nowait = myrpc.make_client(100000000); // non existing verb, no_wait call + rpc::client_options co; + if (compress) { + co.compressor_factory = &mc; + } + + client = std::make_unique::client>(myrpc, co, ipv4_addr{config["server"].as()}); + + auto f = test8(*client, 1500ms).then_wrapped([](future<> f) { + try { + f.get(); + printf("test8 should not get here!\n"); + } catch (rpc::timeout_error&) { + printf("test8 timeout!\n"); + } + }); + for (auto i = 0; i < 100; i++) { + fmt::print("iteration={:d}\n", i); + test1(*client, 5).then([] (){ fmt::print("test1 ended\n");}); + test2(*client, 1, 2).then([] (int r) { fmt::print("test2 got {:d}\n", r); }); + test3(*client, x).then([](double x) { fmt::print("sin={:f}\n", x); }); + test4(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test4 your should not see this!\n"); + } catch (std::runtime_error& x){ + fmt::print("test4 {}\n", x.what()); + } + }); + test5(*client).then([] { fmt::print("test5 no wait ended\n"); }); + test6(*client, 1).then([] { fmt::print("test6 ended\n"); }); + test7(*client, 5, 6).then([] (long r) { fmt::print("test7 got {:d}\n", r); }); + test9(*client, 1, 2).then([] (long r) { fmt::print("test9 got {:d}\n", r); }); + test9_1(*client, 1, 2, 3).then([] (long r) { fmt::print("test9.1 got {:d}\n", r); }); + test9_2(*client, 1, 2, 3, 4).then([] (long r) { fmt::print("test9.2 got {:d}\n", r); }); + test10(*client).then([] (long r) { fmt::print("test10 got {:d}\n", r); }); + test10_1(*client).then([] (long r, int rr) { fmt::print("test10_1 got {:d} and {:d}\n", r, rr); }); + test11(*client).then([] (long r, rpc::optional rr) { fmt::print("test11 got {:d} and {:d}\n", r, bool(rr)); }); + test_nohandler(*client).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test_nohandler your should not see this!\n"); + } catch (rpc::unknown_verb_error& x){ + fmt::print("test_nohandle no such verb\n"); + } catch (...) { + fmt::print("incorrect exception!\n"); + } + }); + test_nohandler_nowait(*client); + auto c = make_lw_shared(); + test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + c->cancel(); + test13(*client, *c).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test13 shold not get here\n"); + } catch(rpc::canceled_error&) { + fmt::print("test13 canceled\n"); + } catch(...) { + fmt::print("test13 wrong exception\n"); + } + }); + sleep(500us).then([c] { c->cancel(); }); + test_message_to_big(*client, sstring(sstring::initialized_later(), 10'000'001)).then_wrapped([](future<> f) { + try { + f.get(); + fmt::print("test message to big shold not get here\n"); + } catch(std::runtime_error& err) { + fmt::print("test message to big get error {}\n", err.what()); + } catch(...) { + fmt::print("test message to big wrong exception\n"); + } + }); + } + // delay a little for a time-sensitive test + sleep(400ms).then([test12] () mutable { + // server is configured for 10MB max, throw 25MB worth of requests at it. + auto now = rpc::rpc_clock_type::now(); + return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable { + return test12(*client, 100, sstring(sstring::initialized_later(), 1'000'000)).then([idx, now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast(later - now); + fmt::print("idx {:d} completed after {:d} ms\n", idx, delta.count()); + }); + }).then([now] { + auto later = rpc::rpc_clock_type::now(); + auto delta = std::chrono::duration_cast(later - now); + fmt::print("test12 completed after {:d} ms (should be ~300)\n", delta.count()); + }); + }); + f.finally([] { + sleep(1s).then([] { + client->stop().then([] { + engine().exit(0); + }); + }); + }); + } else { + std::cout << "server on port " << port << std::endl; + myrpc.register_handler(7, [](long a, long b) mutable { + auto p = make_lw_shared>(); + auto t = make_lw_shared>(); + fmt::print("test7 got {:d} {:d}\n", a, b); + auto f = p->get_future().then([a, b, t] { + fmt::print("test7 calc res\n"); + return a - b; + }); + t->set_callback([p = std::move(p)] () mutable { p->set_value(); }); + t->arm(1s); + return f; + }); + myrpc.register_handler(9, [] (long a, long b, rpc::optional c) { + long r = 2; + fmt::print("test9 got {:d} {:d} ", a, b); + if (c) { + fmt::print("{:d}", c.value()); + r++; + } + fmt::print("\n"); + return r; + }); + myrpc.register_handler(10, [] { + fmt::print("test 10\n"); + return make_ready_future(1, 2); + }); + myrpc.register_handler(11, [] { + fmt::print("test 11\n"); + return 1ul; + }); + myrpc.register_handler(12, [] (int sleep_ms, sstring payload) { + return sleep(std::chrono::milliseconds(sleep_ms)).then([] { + return make_ready_future<>(); + }); + }); + + rpc::resource_limits limits; + limits.bloat_factor = 1; + limits.basic_request_size = 0; + limits.max_memory = 10'000'000; + rpc::server_options so; + if (compress) { + so.compressor_factory = &mc; + } + server = std::make_unique::server>(myrpc, so, ipv4_addr{port}, limits); + } + }); + +} -- cgit v1.2.3