From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/test/common/test_async_shared_mutex.cc | 428 +++++++++++++++++++++++++++++ 1 file changed, 428 insertions(+) create mode 100644 src/test/common/test_async_shared_mutex.cc (limited to 'src/test/common/test_async_shared_mutex.cc') diff --git a/src/test/common/test_async_shared_mutex.cc b/src/test/common/test_async_shared_mutex.cc new file mode 100644 index 000000000..aed6e7b00 --- /dev/null +++ b/src/test/common/test_async_shared_mutex.cc @@ -0,0 +1,428 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/async/shared_mutex.h" +#include +#include + +namespace ceph::async { + +using executor_type = boost::asio::io_context::executor_type; +using unique_lock = std::unique_lock>; +using shared_lock = std::shared_lock>; + +// return a lambda that captures its error code and lock +auto capture(std::optional& ec, unique_lock& lock) +{ + return [&] (boost::system::error_code e, unique_lock l) { + ec = e; + lock = std::move(l); + }; +} +auto capture(std::optional& ec, shared_lock& lock) +{ + return [&] (boost::system::error_code e, shared_lock l) { + ec = e; + lock = std::move(l); + }; +} + +TEST(SharedMutex, async_exclusive) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2, ec3; + unique_lock lock1, lock2, lock3; + + // request three exclusive locks + mutex.async_lock(capture(ec1, lock1)); + mutex.async_lock(capture(ec2, lock2)); + mutex.async_lock(capture(ec3, lock3)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + + context.poll(); + EXPECT_FALSE(context.stopped()); // second lock still pending + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + + lock1.unlock(); + + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_FALSE(context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); + EXPECT_FALSE(ec3); + + lock2.unlock(); + + EXPECT_FALSE(ec3); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec3); + EXPECT_EQ(boost::system::errc::success, *ec3); + ASSERT_TRUE(lock3); +} + +TEST(SharedMutex, async_shared) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2; + shared_lock lock1, lock2; + + // request two shared locks + mutex.async_lock_shared(capture(ec1, lock1)); + mutex.async_lock_shared(capture(ec2, lock2)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); +} + +TEST(SharedMutex, async_exclusive_while_shared) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2; + shared_lock lock1; + unique_lock lock2; + + // request a shared and exclusive lock + mutex.async_lock_shared(capture(ec1, lock1)); + mutex.async_lock(capture(ec2, lock2)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_FALSE(context.stopped()); // second lock still pending + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + + lock1.unlock(); + + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); +} + +TEST(SharedMutex, async_shared_while_exclusive) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2; + unique_lock lock1; + shared_lock lock2; + + // request an exclusive and shared lock + mutex.async_lock(capture(ec1, lock1)); + mutex.async_lock_shared(capture(ec2, lock2)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_FALSE(context.stopped()); // second lock still pending + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + + lock1.unlock(); + + EXPECT_FALSE(ec2); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); +} + +TEST(SharedMutex, async_prioritize_exclusive) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2, ec3; + shared_lock lock1, lock3; + unique_lock lock2; + + // acquire a shared lock, then request an exclusive and another shared lock + mutex.async_lock_shared(capture(ec1, lock1)); + mutex.async_lock(capture(ec2, lock2)); + mutex.async_lock_shared(capture(ec3, lock3)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + + context.poll(); + EXPECT_FALSE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + // exclusive waiter blocks the second shared lock + EXPECT_FALSE(ec3); + + lock1.unlock(); + + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + + context.poll(); + EXPECT_FALSE(context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); + EXPECT_FALSE(ec3); +} + +TEST(SharedMutex, async_cancel) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::optional ec1, ec2, ec3, ec4; + unique_lock lock1, lock2; + shared_lock lock3, lock4; + + // request 2 exclusive and shared locks + mutex.async_lock(capture(ec1, lock1)); + mutex.async_lock(capture(ec2, lock2)); + mutex.async_lock_shared(capture(ec3, lock3)); + mutex.async_lock_shared(capture(ec4, lock4)); + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + EXPECT_FALSE(ec4); + + context.poll(); + EXPECT_FALSE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + EXPECT_FALSE(ec4); + + mutex.cancel(); + + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + EXPECT_FALSE(ec4); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec2); + EXPECT_FALSE(lock2); + ASSERT_TRUE(ec3); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec3); + EXPECT_FALSE(lock3); + ASSERT_TRUE(ec4); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec4); + EXPECT_FALSE(lock4); +} + +TEST(SharedMutex, async_destruct) +{ + boost::asio::io_context context; + + std::optional ec1, ec2, ec3, ec4; + unique_lock lock1, lock2; + shared_lock lock3, lock4; + + { + SharedMutex mutex(context.get_executor()); + + // request 2 exclusive and shared locks + mutex.async_lock(capture(ec1, lock1)); + mutex.async_lock(capture(ec2, lock2)); + mutex.async_lock_shared(capture(ec3, lock3)); + mutex.async_lock_shared(capture(ec4, lock4)); + } + + EXPECT_FALSE(ec1); // no callbacks until poll() + EXPECT_FALSE(ec2); + EXPECT_FALSE(ec3); + EXPECT_FALSE(ec4); + + context.poll(); + EXPECT_TRUE(context.stopped()); + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec2); + EXPECT_FALSE(lock2); + ASSERT_TRUE(ec3); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec3); + EXPECT_FALSE(lock3); + ASSERT_TRUE(ec4); + EXPECT_EQ(boost::asio::error::operation_aborted, *ec4); + EXPECT_FALSE(lock4); +} + +// return a capture() lambda that's bound to the given executor +template +auto capture_ex(const Executor& ex, Args&& ...args) +{ + return boost::asio::bind_executor(ex, capture(std::forward(args)...)); +} + +TEST(SharedMutex, cross_executor) +{ + boost::asio::io_context mutex_context; + SharedMutex mutex(mutex_context.get_executor()); + + boost::asio::io_context callback_context; + auto ex2 = callback_context.get_executor(); + + std::optional ec1, ec2; + unique_lock lock1, lock2; + + // request two exclusive locks + mutex.async_lock(capture_ex(ex2, ec1, lock1)); + mutex.async_lock(capture_ex(ex2, ec2, lock2)); + + EXPECT_FALSE(ec1); + EXPECT_FALSE(ec2); + + mutex_context.poll(); + EXPECT_FALSE(mutex_context.stopped()); // maintains work on both executors + + EXPECT_FALSE(ec1); // no callbacks until poll() on callback_context + EXPECT_FALSE(ec2); + + callback_context.poll(); + EXPECT_FALSE(callback_context.stopped()); // second lock still pending + + ASSERT_TRUE(ec1); + EXPECT_EQ(boost::system::errc::success, *ec1); + ASSERT_TRUE(lock1); + EXPECT_FALSE(ec2); + + lock1.unlock(); + + mutex_context.poll(); + EXPECT_TRUE(mutex_context.stopped()); + + EXPECT_FALSE(ec2); + + callback_context.poll(); + EXPECT_TRUE(callback_context.stopped()); + + ASSERT_TRUE(ec2); + EXPECT_EQ(boost::system::errc::success, *ec2); + ASSERT_TRUE(lock2); +} + +TEST(SharedMutex, try_exclusive) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + { + std::lock_guard lock{mutex}; + ASSERT_FALSE(mutex.try_lock()); // fail during exclusive + } + { + std::shared_lock lock{mutex}; + ASSERT_FALSE(mutex.try_lock()); // fail during shared + } + ASSERT_TRUE(mutex.try_lock()); + mutex.unlock(); +} + +TEST(SharedMutex, try_shared) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + { + std::lock_guard lock{mutex}; + ASSERT_FALSE(mutex.try_lock_shared()); // fail during exclusive + } + { + std::shared_lock lock{mutex}; + ASSERT_TRUE(mutex.try_lock_shared()); // succeed during shared + mutex.unlock_shared(); + } + ASSERT_TRUE(mutex.try_lock_shared()); + mutex.unlock_shared(); +} + +TEST(SharedMutex, cancel) +{ + boost::asio::io_context context; + SharedMutex mutex(context.get_executor()); + + std::lock_guard l{mutex}; // exclusive lock blocks others + + // make synchronous lock calls in other threads + auto f1 = std::async(std::launch::async, [&] { mutex.lock(); }); + auto f2 = std::async(std::launch::async, [&] { mutex.lock_shared(); }); + + // this will race with spawned threads. just keep canceling until the + // futures are ready + const auto t = std::chrono::milliseconds(1); + do { mutex.cancel(); } while (f1.wait_for(t) != std::future_status::ready); + do { mutex.cancel(); } while (f2.wait_for(t) != std::future_status::ready); + + EXPECT_THROW(f1.get(), boost::system::system_error); + EXPECT_THROW(f2.get(), boost::system::system_error); +} + +} // namespace ceph::async -- cgit v1.2.3