summaryrefslogtreecommitdiffstats
path: root/src/seastar/demos/udp_zero_copy_demo.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/seastar/demos/udp_zero_copy_demo.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/demos/udp_zero_copy_demo.cc')
-rw-r--r--src/seastar/demos/udp_zero_copy_demo.cc153
1 files changed, 153 insertions, 0 deletions
diff --git a/src/seastar/demos/udp_zero_copy_demo.cc b/src/seastar/demos/udp_zero_copy_demo.cc
new file mode 100644
index 000000000..a0d7733a3
--- /dev/null
+++ b/src/seastar/demos/udp_zero_copy_demo.cc
@@ -0,0 +1,153 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2014 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/seastar.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/scattered_message.hh>
+#include <seastar/core/vector-data-sink.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/units.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/net/api.hh>
+#include <random>
+#include <iomanip>
+#include <iostream>
+
+using namespace seastar;
+using namespace net;
+using namespace std::chrono_literals;
+namespace bpo = boost::program_options;
+
+template <typename Duration>
+typename Duration::rep to_seconds(Duration d) {
+ return std::chrono::duration_cast<std::chrono::seconds>(d).count();
+}
+
+class server {
+private:
+ udp_channel _chan;
+ timer<> _stats_timer;
+ uint64_t _n_sent {};
+ size_t _chunk_size;
+ bool _copy;
+ std::vector<packet> _packets;
+ std::unique_ptr<output_stream<char>> _out;
+ steady_clock_type::time_point _last;
+ sstring _key;
+ size_t _packet_size = 8*KB;
+ char* _mem;
+ size_t _mem_size;
+ std::mt19937 _rnd;
+ std::random_device _randem_dev;
+ std::uniform_int_distribution<size_t> _chunk_distribution;
+private:
+ char* next_chunk() {
+ return _mem + _chunk_distribution(_rnd);
+ }
+public:
+ server()
+ : _rnd(std::random_device()()) {
+ }
+ future<> send(ipv4_addr dst, packet p) {
+ return _chan.send(dst, std::move(p)).then([this] {
+ _n_sent++;
+ });
+ }
+ void start(int chunk_size, bool copy, size_t mem_size) {
+ ipv4_addr listen_addr{10000};
+ _chan = make_udp_channel(listen_addr);
+
+ std::cout << "Listening on " << listen_addr << std::endl;
+
+ _last = steady_clock_type::now();
+ _stats_timer.set_callback([this] {
+ auto now = steady_clock_type::now();
+ std::cout << "Out: "
+ << std::setprecision(2) << std::fixed
+ << (double)_n_sent / to_seconds(now - _last)
+ << " pps" << std::endl;
+ _last = now;
+ _n_sent = 0;
+ });
+ _stats_timer.arm_periodic(1s);
+
+ _chunk_size = chunk_size;
+ _copy = copy;
+ _key = sstring(new char[64], 64);
+
+ _out = std::make_unique<output_stream<char>>(
+ data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size);
+
+ _mem = new char[mem_size];
+ _mem_size = mem_size;
+
+ _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3);
+
+ assert(3 * _chunk_size <= _packet_size);
+
+ // Run sender in background.
+ (void)keep_doing([this] {
+ return _chan.receive().then([this] (udp_datagram dgram) {
+ auto chunk = next_chunk();
+ lw_shared_ptr<sstring> item;
+ if (_copy) {
+ _packets.clear();
+ // FIXME: future is discarded
+ (void)_out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ (void)_out->write(chunk, _chunk_size);
+ chunk += _chunk_size;
+ (void)_out->write(chunk, _chunk_size);
+ (void)_out->flush();
+ assert(_packets.size() == 1);
+ return send(dgram.get_src(), std::move(_packets[0]));
+ } else {
+ auto chunk = next_chunk();
+ scattered_message<char> msg;
+ msg.reserve(3);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ msg.append_static(chunk, _chunk_size);
+ return send(dgram.get_src(), std::move(msg).release());
+ }
+ });
+ });
+ }
+};
+
+int main(int ac, char ** av) {
+ server s;
+ app_template app;
+ app.add_options()
+ ("chunk-size", bpo::value<int>()->default_value(1024),
+ "Chunk size")
+ ("mem-size", bpo::value<int>()->default_value(512),
+ "Memory pool size in MiB")
+ ("copy", "Copy data rather than send via zero-copy")
+ ;
+ return app.run_deprecated(ac, av, [&app, &s] {
+ auto&& config = app.configuration();
+ auto chunk_size = config["chunk-size"].as<int>();
+ auto mem_size = (size_t)config["mem-size"].as<int>() * MB;
+ auto copy = config.count("copy");
+ s.start(chunk_size, copy, mem_size);
+ });
+}