From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rgw/driver/dbstore/common/connection_pool.h | 147 ++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 src/rgw/driver/dbstore/common/connection_pool.h (limited to 'src/rgw/driver/dbstore/common/connection_pool.h') diff --git a/src/rgw/driver/dbstore/common/connection_pool.h b/src/rgw/driver/dbstore/common/connection_pool.h new file mode 100644 index 000000000..07f3c81c3 --- /dev/null +++ b/src/rgw/driver/dbstore/common/connection_pool.h @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2022 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "common/dout.h" + +namespace rgw::dbstore { + +template +class ConnectionHandle; + +/// A thread-safe base class that manages a fixed-size pool of generic database +/// connections and supports the reclamation of ConnectionHandles. This class +/// is the subset of ConnectionPool which doesn't depend on the Factory type. +template +class ConnectionPoolBase { + public: + ConnectionPoolBase(std::size_t max_connections) + : connections(max_connections) + {} + private: + friend class ConnectionHandle; + + // TODO: the caller may detect a connection error that prevents the connection + // from being reused. allow them to indicate these errors here + void put(std::unique_ptr connection) + { + auto lock = std::scoped_lock{mutex}; + connections.push_back(std::move(connection)); + + if (connections.size() == 1) { // was empty + cond.notify_one(); + } + } + protected: + std::mutex mutex; + std::condition_variable cond; + boost::circular_buffer> connections; +}; + +/// Handle to a database connection borrowed from the pool. Automatically +/// returns the connection to its pool on the handle's destruction. +template +class ConnectionHandle { + ConnectionPoolBase* pool = nullptr; + std::unique_ptr conn; + public: + ConnectionHandle() noexcept = default; + ConnectionHandle(ConnectionPoolBase* pool, + std::unique_ptr conn) noexcept + : pool(pool), conn(std::move(conn)) {} + + ~ConnectionHandle() { + if (conn) { + pool->put(std::move(conn)); + } + } + + ConnectionHandle(ConnectionHandle&&) = default; + ConnectionHandle& operator=(ConnectionHandle&& o) noexcept { + if (conn) { + pool->put(std::move(conn)); + } + conn = std::move(o.conn); + pool = o.pool; + return *this; + } + + explicit operator bool() const noexcept { return static_cast(conn); } + Connection& operator*() const noexcept { return *conn; } + Connection* operator->() const noexcept { return conn.get(); } + Connection* get() const noexcept { return conn.get(); } +}; + + +// factory_of concept requires the function signature: +// F(const DoutPrefixProvider*) -> std::unique_ptr +template +concept factory_of = requires (F factory, const DoutPrefixProvider* dpp) { + { factory(dpp) } -> std::same_as>; + requires std::move_constructible; +}; + + +/// Generic database connection pool that enforces a limit on open connections. +template Factory> +class ConnectionPool : public ConnectionPoolBase { + public: + ConnectionPool(Factory factory, std::size_t max_connections) + : ConnectionPoolBase(max_connections), + factory(std::move(factory)) + {} + + /// Borrow a connection from the pool. If all existing connections are in use, + /// use the connection factory to create another one. If we've reached the + /// limit on open connections, wait on a condition variable for the next one + /// returned to the pool. + auto get(const DoutPrefixProvider* dpp) + -> ConnectionHandle + { + auto lock = std::unique_lock{this->mutex}; + std::unique_ptr conn; + + if (!this->connections.empty()) { + // take an existing connection + conn = std::move(this->connections.front()); + this->connections.pop_front(); + } else if (total < this->connections.capacity()) { + // add another connection to the pool + conn = factory(dpp); + ++total; + } else { + // wait for the next put() + // TODO: support optional_yield + ldpp_dout(dpp, 4) << "ConnectionPool waiting on a connection" << dendl; + this->cond.wait(lock, [&] { return !this->connections.empty(); }); + ldpp_dout(dpp, 4) << "ConnectionPool done waiting" << dendl; + conn = std::move(this->connections.front()); + this->connections.pop_front(); + } + + return {this, std::move(conn)}; + } + private: + Factory factory; + std::size_t total = 0; +}; + +} // namespace rgw::dbstore -- cgit v1.2.3