summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/dbstore/common/connection_pool.h
blob: 07f3c81c3df3b98badb078d05d247490f7743c95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2022 Red Hat, Inc.
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation. See file COPYING.
 *
 */

#pragma once

#include <concepts>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <boost/circular_buffer.hpp>
#include "common/dout.h"

namespace rgw::dbstore {

template <typename Connection>
class ConnectionHandle;

/// A thread-safe base class that manages a fixed-size pool of generic database
/// connections and supports the reclamation of ConnectionHandles. This class
/// is the subset of ConnectionPool which doesn't depend on the Factory type.
template <typename Connection>
class ConnectionPoolBase {
 public:
  ConnectionPoolBase(std::size_t max_connections)
      : connections(max_connections)
  {}
 private:
  friend class ConnectionHandle<Connection>;

  // TODO: the caller may detect a connection error that prevents the connection
  // from being reused. allow them to indicate these errors here
  void put(std::unique_ptr<Connection> connection)
  {
    auto lock = std::scoped_lock{mutex};
    connections.push_back(std::move(connection));

    if (connections.size() == 1) { // was empty
      cond.notify_one();
    }
  }
 protected:
  std::mutex mutex;
  std::condition_variable cond;
  boost::circular_buffer<std::unique_ptr<Connection>> connections;
};

/// Handle to a database connection borrowed from the pool. Automatically
/// returns the connection to its pool on the handle's destruction.
template <typename Connection>
class ConnectionHandle {
  ConnectionPoolBase<Connection>* pool = nullptr;
  std::unique_ptr<Connection> conn;
 public:
  ConnectionHandle() noexcept = default;
  ConnectionHandle(ConnectionPoolBase<Connection>* pool,
                   std::unique_ptr<Connection> conn) noexcept
    : pool(pool), conn(std::move(conn)) {}

  ~ConnectionHandle() {
    if (conn) {
      pool->put(std::move(conn));
    }
  }

  ConnectionHandle(ConnectionHandle&&) = default;
  ConnectionHandle& operator=(ConnectionHandle&& o) noexcept {
    if (conn) {
      pool->put(std::move(conn));
    }
    conn = std::move(o.conn);
    pool = o.pool;
    return *this;
  }

  explicit operator bool() const noexcept { return static_cast<bool>(conn); }
  Connection& operator*() const noexcept { return *conn; }
  Connection* operator->() const noexcept { return conn.get(); }
  Connection* get() const noexcept { return conn.get(); }
};


// factory_of concept requires the function signature:
//   F(const DoutPrefixProvider*) -> std::unique_ptr<T>
template <typename F, typename T>
concept factory_of = requires (F factory, const DoutPrefixProvider* dpp) {
  { factory(dpp) } -> std::same_as<std::unique_ptr<T>>;
  requires std::move_constructible<F>;
};


/// Generic database connection pool that enforces a limit on open connections.
template <typename Connection, factory_of<Connection> Factory>
class ConnectionPool : public ConnectionPoolBase<Connection> {
 public:
  ConnectionPool(Factory factory, std::size_t max_connections)
      : ConnectionPoolBase<Connection>(max_connections),
        factory(std::move(factory))
  {}

  /// Borrow a connection from the pool. If all existing connections are in use,
  /// use the connection factory to create another one. If we've reached the
  /// limit on open connections, wait on a condition variable for the next one
  /// returned to the pool.
  auto get(const DoutPrefixProvider* dpp)
      -> ConnectionHandle<Connection>
  {
    auto lock = std::unique_lock{this->mutex};
    std::unique_ptr<Connection> conn;

    if (!this->connections.empty()) {
      // take an existing connection
      conn = std::move(this->connections.front());
      this->connections.pop_front();
    } else if (total < this->connections.capacity()) {
      // add another connection to the pool
      conn = factory(dpp);
      ++total;
    } else {
      // wait for the next put()
      // TODO: support optional_yield
      ldpp_dout(dpp, 4) << "ConnectionPool waiting on a connection" << dendl;
      this->cond.wait(lock, [&] { return !this->connections.empty(); });
      ldpp_dout(dpp, 4) << "ConnectionPool done waiting" << dendl;
      conn = std::move(this->connections.front());
      this->connections.pop_front();
    }

    return {this, std::move(conn)};
  }
 private:
  Factory factory;
  std::size_t total = 0;
};

} // namespace rgw::dbstore