summaryrefslogtreecommitdiffstats
path: root/src/seastar/tests/unit/distributed_test.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/seastar/tests/unit/distributed_test.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/seastar/tests/unit/distributed_test.cc')
-rw-r--r--src/seastar/tests/unit/distributed_test.cc181
1 files changed, 181 insertions, 0 deletions
diff --git a/src/seastar/tests/unit/distributed_test.cc b/src/seastar/tests/unit/distributed_test.cc
new file mode 100644
index 00000000..b5c1ccb9
--- /dev/null
+++ b/src/seastar/tests/unit/distributed_test.cc
@@ -0,0 +1,181 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/distributed.hh>
+#include <seastar/core/future-util.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/thread.hh>
+
+using namespace seastar;
+
+struct async_service : public seastar::async_sharded_service<async_service> {
+ thread_local static bool deleted;
+ ~async_service() {
+ deleted = true;
+ }
+ void run() {
+ auto ref = shared_from_this();
+ sleep(std::chrono::milliseconds(100 + 100 * engine().cpu_id())).then([this, ref] {
+ check();
+ });
+ }
+ virtual void check() {
+ assert(!deleted);
+ }
+ future<> stop() { return make_ready_future<>(); }
+};
+
+thread_local bool async_service::deleted = false;
+
+struct X {
+ sstring echo(sstring arg) {
+ return arg;
+ }
+ int cpu_id_squared() const {
+ auto id = engine().cpu_id();
+ return id * id;
+ }
+ future<> stop() { return make_ready_future<>(); }
+};
+
+template <typename T, typename Func>
+future<> do_with_distributed(Func&& func) {
+ auto x = make_shared<distributed<T>>();
+ return func(*x).finally([x] {
+ return x->stop();
+ }).finally([x]{});
+}
+
+future<> test_that_each_core_gets_the_arguments() {
+ return do_with_distributed<X>([] (auto& x) {
+ return x.start().then([&x] {
+ return x.map_reduce([] (sstring msg){
+ if (msg != "hello") {
+ throw std::runtime_error("wrong message");
+ }
+ }, &X::echo, sstring("hello"));
+ });
+ });
+}
+
+future<> test_functor_version() {
+ return do_with_distributed<X>([] (auto& x) {
+ return x.start().then([&x] {
+ return x.map_reduce([] (sstring msg){
+ if (msg != "hello") {
+ throw std::runtime_error("wrong message");
+ }
+ }, [] (X& x) { return x.echo("hello"); });
+ });
+ });
+}
+
+struct Y {
+ sstring s;
+ Y(sstring s) : s(std::move(s)) {}
+ future<> stop() { return make_ready_future<>(); }
+};
+
+future<> test_constructor_argument_is_passed_to_each_core() {
+ return do_with_distributed<Y>([] (auto& y) {
+ return y.start(sstring("hello")).then([&y] {
+ return y.invoke_on_all([] (Y& y) {
+ if (y.s != "hello") {
+ throw std::runtime_error(format("expected message mismatch, is \"%s\"", y.s));
+ }
+ });
+ });
+ });
+}
+
+future<> test_map_reduce() {
+ return do_with_distributed<X>([] (distributed<X>& x) {
+ return x.start().then([&x] {
+ return x.map_reduce0(std::mem_fn(&X::cpu_id_squared),
+ 0,
+ std::plus<int>()).then([] (int result) {
+ int n = smp::count - 1;
+ if (result != (n * (n + 1) * (2*n + 1)) / 6) {
+ throw std::runtime_error("map_reduce failed");
+ }
+ });
+ });
+ });
+}
+
+future<> test_async() {
+ return do_with_distributed<async_service>([] (distributed<async_service>& x) {
+ return x.start().then([&x] {
+ return x.invoke_on_all(&async_service::run);
+ });
+ }).then([] {
+ return sleep(std::chrono::milliseconds(100 * (smp::count + 1)));
+ });
+}
+
+future<> test_invoke_on_others() {
+ return seastar::async([] {
+ struct my_service {
+ int counter = 0;
+ void up() { ++counter; }
+ future<> stop() { return make_ready_future<>(); }
+ };
+ for (unsigned c = 0; c < smp::count; ++c) {
+ smp::submit_to(c, [c] {
+ return seastar::async([c] {
+ sharded<my_service> s;
+ s.start().get();
+ s.invoke_on_others([](auto& s) { s.up(); }).get();
+ if (s.local().counter != 0) {
+ throw std::runtime_error("local modified");
+ }
+ s.invoke_on_all([c](auto& remote) {
+ if (engine().cpu_id() != c) {
+ if (remote.counter != 1) {
+ throw std::runtime_error("remote not modified");
+ }
+ }
+ }).get();
+ s.stop().get();
+ });
+ }).get();
+ }
+ });
+}
+
+int main(int argc, char** argv) {
+ app_template app;
+ return app.run(argc, argv, [] {
+ return test_that_each_core_gets_the_arguments().then([] {
+ return test_functor_version();
+ }).then([] {
+ return test_constructor_argument_is_passed_to_each_core();
+ }).then([] {
+ return test_map_reduce();
+ }).then([] {
+ return test_async();
+ }).then([] {
+ return test_invoke_on_others();
+ });
+ });
+}