summaryrefslogtreecommitdiffstats
path: root/src/main/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/connection.c')
-rw-r--r--src/main/connection.c1520
1 files changed, 1520 insertions, 0 deletions
diff --git a/src/main/connection.c b/src/main/connection.c
new file mode 100644
index 0000000..7ae4a2a
--- /dev/null
+++ b/src/main/connection.c
@@ -0,0 +1,1520 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+/**
+ * @file connection.c
+ * @brief Handle pools of connections (threads, sockets, etc.)
+ * @note This API must be used by all modules in the public distribution that
+ * maintain pools of connections.
+ *
+ * @copyright 2012 The FreeRADIUS server project
+ * @copyright 2012 Alan DeKok <aland@deployingradius.com>
+ */
+RCSID("$Id$")
+
+#include <freeradius-devel/radiusd.h>
+#include <freeradius-devel/heap.h>
+#include <freeradius-devel/modpriv.h>
+#include <freeradius-devel/rad_assert.h>
+
+typedef struct fr_connection fr_connection_t;
+
+static int fr_connection_pool_check(fr_connection_pool_t *pool);
+
+#ifndef NDEBUG
+#ifdef HAVE_PTHREAD_H
+/* #define PTHREAD_DEBUG (1) */
+#endif
+#endif
+
+/*
+ * We don't need to pollute the logs with "open / close"
+ * connection information. Instead, only print these messages
+ * when debugging.
+ */
+#undef INFO
+#define INFO(fmt, ...) if (rad_debug_lvl) radlog(L_INFO, fmt, ## __VA_ARGS__)
+
+/** An individual connection within the connection pool
+ *
+ * Defines connection counters, timestamps, and holds a pointer to the
+ * connection handle itself.
+ *
+ * @see fr_connection_pool_t
+ */
+struct fr_connection {
+ fr_connection_t *prev; //!< Previous connection in list.
+ fr_connection_t *next; //!< Next connection in list.
+
+ time_t created; //!< Time connection was created.
+ struct timeval last_reserved; //!< Last time the connection was reserved.
+
+ struct timeval last_released; //!< Time the connection was released.
+
+ uint32_t num_uses; //!< Number of times the connection has been reserved.
+ uint64_t number; //!< Unique ID assigned when the connection is created,
+ //!< these will monotonically increase over the
+ //!< lifetime of the connection pool.
+ void *connection; //!< Pointer to whatever the module uses for a connection
+ //!< handle.
+ bool in_use; //!< Whether the connection is currently reserved.
+
+ int heap; //!< For the next connection heap.
+
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id; //!< When 'in_use == true'.
+#endif
+};
+
+/** A connection pool
+ *
+ * Defines the configuration of the connection pool, all the counters and
+ * timestamps related to the connection pool, the mutex that stops multiple
+ * threads leaving the pool in an inconsistent state, and the callbacks
+ * required to open, close and check the status of connections within the pool.
+ *
+ * @see fr_connection
+ */
+struct fr_connection_pool_t {
+ int ref; //!< Reference counter to prevent connection
+ //!< pool being freed multiple times.
+ uint32_t start; //!< Number of initial connections.
+ uint32_t min; //!< Minimum number of concurrent connections to keep open.
+ uint32_t max; //!< Maximum number of concurrent connections to allow.
+ uint32_t spare; //!< Number of spare connections to try.
+ uint32_t pending; //!< Number of pending open connections.
+ uint32_t retry_delay; //!< seconds to delay re-open after a failed open.
+ uint32_t max_retries; //!< Maximum number of retries to attempt for any given
+ //!< operation (e.g. query or bind)
+ uint32_t cleanup_interval; //!< Initial timer for how often we sweep the pool
+ //!< for free connections. (0 is infinite).
+ int delay_interval; //!< When we next do a cleanup. Initialized to
+ //!< cleanup_interval, and increase from there based
+ //!< on the delay.
+ int next_delay; //!< The next delay time. cleanup. Initialized to
+ //!< cleanup_interval, and decays from there.
+ uint64_t max_uses; //!< Maximum number of times a connection can be used
+ //!< before being closed.
+ uint32_t lifetime; //!< How long a connection can be open before being
+ //!< closed (irrespective of whether it's idle or not).
+ uint32_t idle_timeout; //!< How long a connection can be idle before
+ //!< being closed.
+
+ uint32_t max_pending; //!< Max number of connections to open.
+
+ bool spread; //!< If true we spread requests over the connections,
+ //!< using the connection released longest ago, first.
+
+ fr_connection_pool_stats_t stats; //!< various statistics
+
+ fr_heap_t *heap; //!< For the next connection heap
+
+ fr_connection_t *head; //!< Start of the connection list.
+ fr_connection_t *tail; //!< End of the connection list.
+
+#ifdef HAVE_PTHREAD_H
+ pthread_mutex_t mutex; //!< Mutex used to keep consistent state when making
+ //!< modifications in threaded mode.
+#endif
+
+ CONF_SECTION *cs; //!< Configuration section holding the section of parsed
+ //!< config file that relates to this pool.
+ void *opaque; //!< Pointer to context data that will be passed to callbacks.
+
+ char const *log_prefix; //!< Log prefix to prepend to all log messages created
+ //!< by the connection pool code.
+
+ char const *trigger_prefix; //!< Prefix to prepend to names of all triggers
+ //!< fired by the connection pool code.
+
+ fr_connection_create_t create; //!< Function used to create new connections.
+ fr_connection_alive_t alive; //!< Function used to check status of connections.
+};
+
+#ifndef HAVE_PTHREAD_H
+# define pthread_mutex_lock(_x)
+# define pthread_mutex_unlock(_x)
+#endif
+
+static const CONF_PARSER connection_config[] = {
+ { "start", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, start), "5" },
+ { "min", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, min), "5" },
+ { "max", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max), "10" },
+ { "spare", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, spare), "3" },
+ { "uses", FR_CONF_OFFSET(PW_TYPE_INTEGER64, fr_connection_pool_t, max_uses), "0" },
+ { "lifetime", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, lifetime), "0" },
+ { "cleanup_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), NULL},
+ { "cleanup_interval", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, cleanup_interval), "30" },
+ { "idle_timeout", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, idle_timeout), "60" },
+ { "retry_delay", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, retry_delay), "1" },
+ { "max_retries", FR_CONF_OFFSET(PW_TYPE_INTEGER, fr_connection_pool_t, max_retries), "5" },
+ { "spread", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, fr_connection_pool_t, spread), "no" },
+ CONF_PARSER_TERMINATOR
+};
+
+/** Order connections by reserved most recently
+ */
+static int last_reserved_cmp(void const *one, void const *two)
+{
+ fr_connection_t const *a = one;
+ fr_connection_t const *b = two;
+
+ if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1;
+ if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1;
+
+ if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1;
+ if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1;
+
+ return 0;
+}
+
+/** Order connections by released longest ago
+ */
+static int last_released_cmp(void const *one, void const *two)
+{
+ fr_connection_t const *a = one;
+ fr_connection_t const *b = two;
+
+ if (b->last_released.tv_sec < a->last_released.tv_sec) return -1;
+ if (b->last_released.tv_sec > a->last_released.tv_sec) return +1;
+
+ if (b->last_released.tv_usec < a->last_released.tv_usec) return -1;
+ if (b->last_released.tv_usec > a->last_released.tv_usec) return +1;
+
+ return 0;
+}
+
+/** Removes a connection from the connection list
+ *
+ * @note Must be called with the mutex held.
+ *
+ * @param[in,out] pool to modify.
+ * @param[in] this Connection to delete.
+ */
+static void fr_connection_unlink(fr_connection_pool_t *pool, fr_connection_t *this)
+{
+ if (this->prev) {
+ rad_assert(pool->head != this);
+ this->prev->next = this->next;
+ } else {
+ rad_assert(pool->head == this);
+ pool->head = this->next;
+ }
+ if (this->next) {
+ rad_assert(pool->tail != this);
+ this->next->prev = this->prev;
+ } else {
+ rad_assert(pool->tail == this);
+ pool->tail = this->prev;
+ }
+
+ this->prev = this->next = NULL;
+}
+
+/** Adds a connection to the head of the connection list
+ *
+ * @note Must be called with the mutex held.
+ *
+ * @param[in,out] pool to modify.
+ * @param[in] this Connection to add.
+ */
+static void fr_connection_link_head(fr_connection_pool_t *pool, fr_connection_t *this)
+{
+ rad_assert(pool != NULL);
+ rad_assert(this != NULL);
+ rad_assert(pool->head != this);
+ rad_assert(pool->tail != this);
+
+ if (pool->head) {
+ pool->head->prev = this;
+ }
+
+ this->next = pool->head;
+ this->prev = NULL;
+ pool->head = this;
+ if (!pool->tail) {
+ rad_assert(this->next == NULL);
+ pool->tail = this;
+ } else {
+ rad_assert(this->next != NULL);
+ }
+}
+
+/** Send a connection pool trigger.
+ *
+ * @param[in] pool to send trigger for.
+ * @param[in] name_suffix trigger name suffix.
+ */
+static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *name_suffix)
+{
+ char name[64];
+ rad_assert(pool != NULL);
+ rad_assert(name_suffix != NULL);
+ snprintf(name, sizeof(name), "%s%s", pool->trigger_prefix, name_suffix);
+ exec_trigger(NULL, pool->cs, name, true);
+}
+
+/** Find a connection handle in the connection list
+ *
+ * Walks over the list of connections searching for a specified connection
+ * handle and returns the first connection that contains that pointer.
+ *
+ * @note Will lock mutex and only release mutex if connection handle
+ * is not found, so will usually return will mutex held.
+ * @note Must be called with the mutex free.
+ *
+ * @param[in] pool to search in.
+ * @param[in] conn handle to search for.
+ * @return
+ * - Connection containing the specified handle.
+ * - NULL if non if connection was found.
+ */
+static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn)
+{
+ fr_connection_t *this;
+
+ if (!pool || !conn) return NULL;
+
+ pthread_mutex_lock(&pool->mutex);
+
+ /*
+ * FIXME: This loop could be avoided if we passed a 'void
+ * **connection' instead. We could use "offsetof" in
+ * order to find top of the parent structure.
+ */
+ for (this = pool->head; this != NULL; this = this->next) {
+ if (this->connection == conn) {
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id;
+
+ pthread_id = pthread_self();
+ rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
+#endif
+
+ rad_assert(this->in_use == true);
+ return this;
+ }
+ }
+
+ pthread_mutex_unlock(&pool->mutex);
+ return NULL;
+}
+
+/** Spawns a new connection
+ *
+ * Spawns a new connection using the create callback, and returns it for
+ * adding to the connection list.
+ *
+ * @note Will call the 'open' trigger.
+ * @note Must be called with the mutex free.
+ *
+ * @param[in] pool to modify.
+ * @param[in] now Current time.
+ * @param[in] in_use whether the new connection should be "in_use" or not
+ * @return
+ * - New connection struct.
+ * - NULL on error.
+ */
+static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t now, bool in_use)
+{
+ uint64_t number;
+ uint32_t max_pending;
+ TALLOC_CTX *ctx;
+
+ fr_connection_t *this;
+ void *conn;
+
+ rad_assert(pool != NULL);
+
+ /*
+ * If we have NO connections, and we've previously failed
+ * opening connections, don't open multiple connections until
+ * we successfully open at least one.
+ */
+ if ((pool->stats.num == 0) && pool->pending && pool->stats.last_failed) return NULL;
+
+ pthread_mutex_lock(&pool->mutex);
+ rad_assert(pool->stats.num <= pool->max);
+
+ /*
+ * Don't spawn too many connections at the same time.
+ */
+ if ((pool->stats.num + pool->pending) >= pool->max) {
+ pthread_mutex_unlock(&pool->mutex);
+
+ ERROR("%s: Cannot open new connection, already at max", pool->log_prefix);
+ return NULL;
+ }
+
+ /*
+ * If the last attempt failed, wait a bit before
+ * retrying.
+ */
+ if (pool->stats.last_failed && ((pool->stats.last_failed + pool->retry_delay) > now)) {
+ bool complain = false;
+
+ if (pool->stats.last_throttled != now) {
+ complain = true;
+
+ pool->stats.last_throttled = now;
+ }
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ if (!RATE_LIMIT_ENABLED || complain) {
+ ERROR("%s: Last connection attempt failed, waiting %d seconds before retrying",
+ pool->log_prefix, pool->retry_delay);
+ }
+
+ return NULL;
+ }
+
+ /*
+ * We limit the rate of new connections after a failed attempt.
+ */
+ if (pool->pending > pool->max_pending) {
+ pthread_mutex_unlock(&pool->mutex);
+ RATE_LIMIT(WARN("%s: Cannot open a new connection due to rate limit after failure",
+ pool->log_prefix));
+ return NULL;
+ }
+
+ pool->pending++;
+ number = pool->stats.opened++;
+
+ /*
+ * Unlock the mutex while we try to open a new
+ * connection. If there are issues with the back-end,
+ * opening a new connection may take a LONG time. In
+ * that case, we want the other connections to continue
+ * to be used.
+ */
+ pthread_mutex_unlock(&pool->mutex);
+
+ /*
+ * The true value for max_pending is the smaller of
+ * free connection slots, or pool->max_pending.
+ */
+ max_pending = (pool->max - pool->stats.num);
+ if (pool->max_pending < max_pending) max_pending = pool->max_pending;
+ INFO("%s: Opening additional connection (%" PRIu64 "), %u of %u pending slots used",
+ pool->log_prefix, number, pool->pending, max_pending);
+
+ /*
+ * Allocate a new top level ctx for the create callback
+ * to hang its memory off of.
+ */
+ ctx = talloc_init("fr_connection_ctx");
+ if (!ctx) return NULL;
+
+ /*
+ * This may take a long time, which prevents other
+ * threads from releasing connections. We don't care
+ * about other threads opening new connections, as we
+ * already have no free connections.
+ */
+ conn = pool->create(ctx, pool->opaque);
+ if (!conn) {
+ ERROR("%s: Opening connection failed (%" PRIu64 ")", pool->log_prefix, number);
+
+ pool->stats.last_failed = now;
+ pthread_mutex_lock(&pool->mutex);
+ pool->max_pending = 1;
+ pool->pending--;
+ pool->stats.failed++;
+ pthread_mutex_unlock(&pool->mutex);
+
+ talloc_free(ctx);
+
+ return NULL;
+ }
+
+ /*
+ * And lock the mutex again while we link the new
+ * connection back into the pool.
+ */
+ pthread_mutex_lock(&pool->mutex);
+
+ this = talloc_zero(pool, fr_connection_t);
+ if (!this) {
+ pthread_mutex_unlock(&pool->mutex);
+ talloc_free(ctx);
+
+ return NULL;
+ }
+ fr_link_talloc_ctx_free(this, ctx);
+
+ this->created = now;
+ this->connection = conn;
+ this->in_use = in_use;
+
+ this->number = number;
+ gettimeofday(&this->last_reserved, NULL);
+ this->last_released = this->last_reserved;
+
+ /*
+ * The connection pool is starting up. Insert the
+ * connection into the heap.
+ */
+ if (!in_use) fr_heap_insert(pool->heap, this);
+
+ fr_connection_link_head(pool, this);
+
+ /*
+ * Do NOT insert the connection into the heap. That's
+ * done when the connection is released.
+ */
+
+ pool->stats.num++;
+
+ rad_assert(pool->pending > 0);
+ pool->pending--;
+
+ /*
+ * We've successfully opened one more connection. Allow
+ * more connections to open in parallel.
+ */
+ if (pool->max_pending < pool->max) pool->max_pending++;
+
+ pool->stats.last_opened = time(NULL);
+ pool->delay_interval = pool->cleanup_interval;
+ pool->next_delay = pool->cleanup_interval;
+ pool->stats.last_failed = 0;
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ fr_connection_exec_trigger(pool, "open");
+
+ return this;
+}
+
+/** Close an existing connection.
+ *
+ * Removes the connection from the list, calls the delete callback to close
+ * the connection, then frees memory allocated to the connection.
+ *
+ * @note Will call the 'close' trigger.
+ * @note Must be called with the mutex held.
+ *
+ * @param[in,out] pool to modify.
+ * @param[in] this Connection to delete.
+ * @param[in] reason to close the connection
+ * @param[in] msg optional message
+ */
+static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this,
+ char const *reason, char const *msg)
+{
+ if (!msg) {
+ INFO("%s: %s (%" PRIu64 ")", pool->log_prefix, reason, this->number);
+ } else {
+ INFO("%s: %s (%" PRIu64 ") - %s", pool->log_prefix, reason, this->number, msg);
+ }
+
+
+ /*
+ * If it's in use, release it.
+ */
+ if (this->in_use) {
+#ifdef PTHREAD_DEBUG
+ pthread_t pthread_id = pthread_self();
+ rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0);
+#endif
+
+ this->in_use = false;
+
+ rad_assert(pool->stats.active != 0);
+ pool->stats.active--;
+
+ } else {
+ /*
+ * Connection isn't used, remove it from the heap.
+ */
+ fr_heap_extract(pool->heap, this);
+ }
+
+ fr_connection_exec_trigger(pool, "close");
+
+ fr_connection_unlink(pool, this);
+
+ rad_assert(pool->stats.num > 0);
+ pool->stats.num--;
+ pool->stats.closed++;
+ pool->stats.last_closed = time(NULL);
+ talloc_free(this);
+}
+
+/** Check whether a connection needs to be removed from the pool
+ *
+ * Will verify that the connection is within idle_timeout, max_uses, and
+ * lifetime values. If it is not, the connection will be closed.
+ *
+ * @note Will only close connections not in use.
+ * @note Must be called with the mutex held.
+ *
+ * @param[in,out] pool to modify.
+ * @param[in,out] this Connection to manage.
+ * @param[in] now Current time.
+ * @param[in] get whether we want to get a connection
+ * @return
+ * - 0 if connection was closed.
+ * - 1 if connection handle was left open.
+ */
+static int fr_connection_manage(fr_connection_pool_t *pool,
+ fr_connection_t *this,
+ time_t now, bool get)
+{
+ rad_assert(pool != NULL);
+ rad_assert(this != NULL);
+ char const *reason = "Closing expired connection";
+ char const *msg = NULL;
+
+ /*
+ * Don't terminate in-use connections
+ */
+ if (this->in_use) return 1;
+
+ if ((pool->max_uses > 0) &&
+ (this->num_uses >= pool->max_uses)) {
+ msg = "Hit max_uses limit";
+
+ do_delete:
+ if (pool->stats.num <= pool->min) {
+ DEBUG("%s: You probably need to lower \"min\"", pool->log_prefix);
+ }
+ fr_connection_close_internal(pool, this, reason, msg);
+ return 0;
+ }
+
+ if ((pool->lifetime > 0) &&
+ ((this->created + pool->lifetime) < now)) {
+ msg = "Hit lifetime limit";
+ goto do_delete;
+ }
+
+ /*
+ * The connection WAS idle, but the caller is interested
+ * in getting a new one. Instead of closing the old one
+ * and opening a new one, we just return the old one.
+ */
+ if (get) return 1;
+
+ if ((pool->idle_timeout > 0) &&
+ ((this->last_released.tv_sec + pool->idle_timeout) < now)) {
+ msg = "Hit idle_timeout limit";
+ goto do_delete;
+ }
+
+ return 1;
+}
+
+
+/** Check whether any connections need to be removed from the pool
+ *
+ * Maintains the number of connections in the pool as per the configuration
+ * parameters for the connection pool.
+ *
+ * @note Will only run checks the first time it's called in a given second,
+ * to throttle connection spawning/closing.
+ * @note Will only close connections not in use.
+ * @note Must be called with the mutex held, will release mutex before
+ * returning.
+ *
+ * @param[in,out] pool to manage.
+ * @return 1
+ */
+static int fr_connection_pool_check(fr_connection_pool_t *pool)
+{
+ uint32_t num, spare;
+ time_t now = time(NULL);
+ fr_connection_t *this, *next;
+
+ if (pool->stats.last_checked == now) {
+ pthread_mutex_unlock(&pool->mutex);
+ return 1;
+ }
+
+ /*
+ * Get "real" number of connections, and count pending
+ * connections as spare.
+ */
+ num = pool->stats.num + pool->pending;
+ spare = pool->pending + (pool->stats.num - pool->stats.active);
+
+ /*
+ * The other end can close connections. If so, we'll
+ * have fewer than "min". When that happens, open more
+ * connections to enforce "min".
+ *
+ * The code for spawning connections enforces that
+ * num + pending <= max.
+ */
+ if (num < pool->min) {
+ INFO("Need %u more connections to reach min connections (%i)", pool->min - num, pool->min);
+ goto add_connection;
+ }
+
+ /*
+ * On the odd chance that we've opened too many
+ * connections, take care of that.
+ */
+ if (num > pool->max) {
+ /*
+ * Pending connections don't get closed as "spare".
+ */
+ if (pool->pending > 0) goto manage_connections;
+
+ /*
+ * Otherwise close one of the connections to
+ * bring us down to "max".
+ */
+ goto close_connection;
+ }
+
+ /*
+ * Now that we've enforced min/max connections, try to
+ * keep the "spare" connections at the correct number.
+ */
+
+ /*
+ * Nothing to do? Go check all of the connections for
+ * timeouts, etc.
+ */
+ if (spare == pool->spare) goto manage_connections;
+
+ /*
+ * Too many spare connections, delete some.
+ */
+ if (spare > pool->spare) {
+ fr_connection_t *found;
+
+ /*
+ * Pending connections don't get closed as "spare".
+ */
+ if (pool->pending > 0) goto manage_connections;
+
+ /*
+ * Don't close too many connections, even they
+ * are spare.
+ */
+ if (num <= pool->min) goto manage_connections;
+
+ /*
+ * Too many spares, go close one.
+ */
+
+ close_connection:
+ /*
+ * Don't close connections too often, in order to
+ * prevent flapping.
+ */
+ if (now < (pool->stats.last_opened + pool->delay_interval)) goto manage_connections;
+
+ /*
+ * Find a connection to close.
+ */
+ found = NULL;
+ for (this = pool->tail; this != NULL; this = this->prev) {
+ if (this->in_use) continue;
+
+ if (!found ||
+ timercmp(&this->last_reserved, &found->last_reserved, <)) {
+ found = this;
+ }
+ }
+
+ rad_assert(found != NULL);
+
+ fr_connection_close_internal(pool, found, "Closing connection", "Too many unused connections.");
+
+ /*
+ * Decrease the delay for the next time we clean
+ * up.
+ */
+ pool->next_delay >>= 1;
+ if (pool->next_delay == 0) pool->next_delay = 1;
+ pool->delay_interval += pool->next_delay;
+
+ goto manage_connections;
+ }
+
+ /*
+ * Too few connections, open some more.
+ */
+ if (spare < pool->spare) {
+ /*
+ * Don't open too many pending connections.
+ */
+ if (pool->pending >= pool->max_pending) goto manage_connections;
+
+ /*
+ * Don't open too many connections, even if we
+ * need more spares.
+ */
+ if (num >= pool->max) goto manage_connections;
+
+ /*
+ * Too few spares, go add one.
+ */
+
+ add_connection:
+ INFO("Need more connections to reach %i spares", pool->spare);
+
+ /*
+ * Only try to open spares if we're not already attempting to open
+ * a connection. Avoids spurious log messages.
+ */
+ pthread_mutex_unlock(&pool->mutex);
+ fr_connection_spawn(pool, now, false); /* ignore return code */
+ pthread_mutex_lock(&pool->mutex);
+ goto manage_connections;
+ }
+
+ /*
+ * Pass over all of the connections in the pool, limiting
+ * lifetime, idle time, max requests, etc.
+ */
+manage_connections:
+ for (this = pool->head; this != NULL; this = next) {
+ next = this->next;
+ fr_connection_manage(pool, this, now, false);
+ }
+
+ pool->stats.last_checked = now;
+ pthread_mutex_unlock(&pool->mutex);
+
+ return 1;
+}
+
+/** Get a connection from the connection pool
+ *
+ * @note Must be called with the mutex free.
+ *
+ * @param[in,out] pool to reserve the connection from.
+ * @param[in] spawn whether to spawn a new connection
+ * @return
+ * - A pointer to the connection handle.
+ * - NULL on error.
+ */
+static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn)
+{
+ time_t now;
+ fr_connection_t *this;
+
+ if (!pool) return NULL;
+
+ /*
+ * Allow CTRL-C to kill the server in debugging mode.
+ */
+ if (main_config.exiting) return NULL;
+
+#ifdef HAVE_PTHREAD_H
+ if (spawn) pthread_mutex_lock(&pool->mutex);
+#endif
+
+ now = time(NULL);
+
+ /*
+ * Grab the link with the lowest latency, and check it
+ * for limits. If "connection manage" says the link is
+ * no longer usable, go grab another one.
+ */
+ do {
+ this = fr_heap_peek(pool->heap);
+ if (!this) break;
+
+ fr_assert(!this->in_use);
+ } while (!fr_connection_manage(pool, this, now, true));
+
+ /*
+ * We have a working connection. Extract it from the
+ * heap and use it.
+ */
+ if (this) {
+ fr_heap_extract(pool->heap, this);
+ goto do_return;
+ }
+
+ /*
+ * We were asked to avoid spawning a new connection, by
+ * fr_connection_reconnect_internal(). So we just return
+ * here.
+ */
+ if (!spawn) return NULL;
+
+ if (pool->stats.num == pool->max) {
+ bool complain = false;
+
+ /*
+ * Rate-limit complaints.
+ */
+ if (pool->stats.last_at_max != now) {
+ complain = true;
+ pool->stats.last_at_max = now;
+ }
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ if (!RATE_LIMIT_ENABLED || complain) {
+ ERROR("%s: No connections available and at max connection limit", pool->log_prefix);
+ }
+
+ return NULL;
+ }
+
+ pthread_mutex_unlock(&pool->mutex);
+
+ DEBUG("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix,
+ pool->stats.active, pool->stats.num);
+ this = fr_connection_spawn(pool, now, true); /* MY connection! */
+ if (!this) return NULL;
+
+ pthread_mutex_lock(&pool->mutex);
+
+do_return:
+ pool->stats.active++;
+ this->num_uses++;
+ gettimeofday(&this->last_reserved, NULL);
+ this->in_use = true;
+
+#ifdef PTHREAD_DEBUG
+ this->pthread_id = pthread_self();
+#endif
+
+#ifdef HAVE_PTHREAD_H
+ if (spawn) pthread_mutex_unlock(&pool->mutex);
+#endif
+
+ DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number);
+
+ return this->connection;
+}
+
+/** Reconnect a suspected inviable connection
+ *
+ * @note Must be called with the mutex held, will not release mutex.
+ *
+ * @see fr_connection_get
+ * @param[in,out] pool to reconnect the connection in.
+ * @param[in,out] conn to reconnect.
+ * @return new connection handle if successful else NULL.
+ */
+static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn)
+{
+ void *new_conn;
+ uint64_t conn_number;
+ TALLOC_CTX *ctx;
+
+ conn_number = conn->number;
+
+ /*
+ * Destroy any handles associated with the fr_connection_t
+ */
+ talloc_free_children(conn);
+
+ DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number);
+
+ /*
+ * Allocate a new top level ctx for the create callback
+ * to hang its memory off of.
+ */
+ ctx = talloc_init("fr_connection_ctx");
+ if (!ctx) return NULL;
+ fr_link_talloc_ctx_free(conn, ctx);
+
+ new_conn = pool->create(ctx, pool->opaque);
+ if (!new_conn) {
+ /*
+ * We can't create a new connection, so close the current one.
+ */
+ fr_connection_close_internal(pool, conn, "Closing connection", "Failed to reconnect");
+
+ /*
+ * Maybe there's a connection which is unused and
+ * available. If so, return it.
+ */
+ new_conn = fr_connection_get_internal(pool, false);
+ if (new_conn) return new_conn;
+
+ RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available",
+ pool->log_prefix, conn_number));
+
+ return NULL;
+ }
+
+ fr_connection_exec_trigger(pool, "close");
+ conn->connection = new_conn;
+
+ return new_conn;
+}
+
+/** Create a new connection pool
+ *
+ * Allocates structures used by the connection pool, initialises the various
+ * configuration options and counters, and sets the callback functions.
+ *
+ * Will also spawn the number of connections specified by the 'start'
+ * configuration options.
+ *
+ * @note Will call the 'start' trigger.
+ *
+ * @param[in] ctx Context to link pool's destruction to.
+ * @param[in] cs pool section.
+ * @param[in] opaque data pointer to pass to callbacks.
+ * @param[in] c Callback to create new connections.
+ * @param[in] a Callback to check the status of connections.
+ * @param[in] log_prefix prefix to prepend to all log messages.
+ * @param[in] trigger_prefix prefix to prepend to all trigger names.
+ * @return
+ * - New connection pool.
+ * - NULL on error.
+ */
+static fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx,
+ CONF_SECTION *cs,
+ void *opaque,
+ fr_connection_create_t c,
+ fr_connection_alive_t a,
+ char const *log_prefix,
+ char const *trigger_prefix)
+{
+ uint32_t i;
+ fr_connection_pool_t *pool;
+ fr_connection_t *this;
+ time_t now;
+
+ if (!cs || !opaque || !c) return NULL;
+
+ now = time(NULL);
+
+ /*
+ * Pool is allocated in the NULL context as
+ * threads are likely to allocate memory
+ * beneath the pool.
+ */
+ pool = talloc_zero(NULL, fr_connection_pool_t);
+ if (!pool) return NULL;
+
+ /*
+ * Ensure the pool is freed at the same time
+ * as its parent.
+ */
+ if (fr_link_talloc_ctx_free(ctx, pool) < 0) {
+ talloc_free(pool);
+
+ return NULL;
+ }
+
+ pool->cs = cs;
+ pool->opaque = opaque;
+ pool->create = c;
+ pool->alive = a;
+
+ pool->head = pool->tail = NULL;
+
+ /*
+ * We keep a heap of connections, sorted by the last time
+ * we STARTED using them. Newly opened connections
+ * aren't in the heap. They're only inserted in the list
+ * once they're released.
+ *
+ * We do "most recently started" instead of "most
+ * recently used", because MRU is done as most recently
+ * *released*. We want to order connections by
+ * responsiveness, and MRU prioritizes high latency
+ * connections.
+ *
+ * We want most recently *started*, which gives
+ * preference to low latency links, and pushes high
+ * latency links down in the priority heap.
+ *
+ * https://code.facebook.com/posts/1499322996995183/solving-the-mystery-of-link-imbalance-a-metastable-failure-state-at-scale/
+ */
+ if (!pool->spread) {
+ pool->heap = fr_heap_create(last_reserved_cmp, offsetof(fr_connection_t, heap));
+ /*
+ * For some types of connections we need to used a different
+ * algorithm, because load balancing benefits are secondary
+ * to maintaining a cache of open connections.
+ *
+ * With libcurl's multihandle, connections can only be reused
+ * if all handles that make up the multhandle are done processing
+ * their requests.
+ *
+ * We can't tell when that's happened using libcurl, and even
+ * if we could, blocking until all servers had responded
+ * would have huge cost.
+ *
+ * The solution is to order the heap so that the connection that
+ * was released longest ago is at the top.
+ *
+ * That way we maximise time between connection use.
+ */
+ } else {
+ pool->heap = fr_heap_create(last_released_cmp, offsetof(fr_connection_t, heap));
+ }
+ if (!pool->heap) {
+ talloc_free(pool);
+ return NULL;
+ }
+
+ pool->log_prefix = log_prefix ? talloc_typed_strdup(pool, log_prefix) : "core";
+ pool->trigger_prefix = trigger_prefix ? talloc_typed_strdup(pool, trigger_prefix) : "";
+
+#ifdef HAVE_PTHREAD_H
+ pthread_mutex_init(&pool->mutex, NULL);
+#endif
+
+ DEBUG("%s: Initialising connection pool", pool->log_prefix);
+
+ if (cf_section_parse(cs, pool, connection_config) < 0) goto error;
+
+ /*
+ * Some simple limits
+ */
+ if (pool->max == 0) {
+ cf_log_err_cs(cs, "Cannot set 'max' to zero");
+ goto error;
+ }
+ pool->max_pending = pool->max; /* can open all connections now */
+
+ if (pool->min > pool->max) {
+ cf_log_err_cs(cs, "Cannot set 'min' to more than 'max'");
+ goto error;
+ }
+
+ FR_INTEGER_BOUND_CHECK("max", pool->max, <=, 1024);
+ FR_INTEGER_BOUND_CHECK("start", pool->start, <=, pool->max);
+ FR_INTEGER_BOUND_CHECK("spare", pool->spare, <=, (pool->max - pool->min));
+
+ if (pool->lifetime > 0) {
+ FR_INTEGER_COND_CHECK("idle_timeout", pool->idle_timeout, (pool->idle_timeout <= pool->lifetime), 0);
+ }
+
+ if (pool->idle_timeout > 0) {
+ FR_INTEGER_BOUND_CHECK("cleanup_interval", pool->cleanup_interval, <=, pool->idle_timeout);
+ }
+
+ /*
+ * Don't open any connections. Instead, force the limits
+ * to only 1 connection.
+ *
+ */
+ if (check_config) {
+ pool->start = pool->min = pool->max = 1;
+ return pool;
+ }
+
+ /*
+ * Create all of the connections, unless the admin says
+ * not to.
+ */
+ for (i = 0; i < pool->start; i++) {
+ this = fr_connection_spawn(pool, now, false);
+ if (!this) {
+ error:
+ fr_connection_pool_free(pool);
+ return NULL;
+ }
+ }
+
+ fr_connection_exec_trigger(pool, "start");
+
+ return pool;
+}
+
+/** Initialise a module specific connection pool
+ *
+ * @see fr_connection_pool_init
+ *
+ * @param[in] module section.
+ * @param[in] opaque data pointer to pass to callbacks.
+ * @param[in] c Callback to create new connections.
+ * @param[in] a Callback to check the status of connections.
+ * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION.
+ * @return
+ * - New connection pool.
+ * - NULL on error.
+ */
+fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module,
+ void *opaque,
+ fr_connection_create_t c,
+ fr_connection_alive_t a,
+ char const *log_prefix)
+{
+ CONF_SECTION *cs, *mycs;
+ char buff[128];
+ char trigger_prefix[64];
+
+ fr_connection_pool_t *pool;
+ char const *cs_name1, *cs_name2;
+
+ int ret;
+
+#define CONNECTION_POOL_CF_KEY "connection_pool"
+#define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x)))
+
+ cs_name1 = cf_section_name1(module);
+ cs_name2 = cf_section_name2(module);
+ if (!cs_name2) cs_name2 = cs_name1;
+
+ snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1);
+
+ if (!log_prefix) {
+ snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2);
+ log_prefix = buff;
+ }
+
+ /*
+ * Get sibling's pool config section
+ */
+ ret = find_module_sibling_section(&cs, module, "pool");
+ switch (ret) {
+ case -1:
+ return NULL;
+
+ case 1:
+ DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs));
+ break;
+
+ case 0:
+ DEBUG4("%s: Using local pool section", log_prefix);
+ break;
+ }
+
+ /*
+ * Get our pool config section
+ */
+ mycs = cf_section_sub_find(module, "pool");
+ if (!mycs) {
+ DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix,
+ cf_section_name(module));
+
+ mycs = cf_section_alloc(module, "pool", NULL);
+ cf_section_add(module, mycs);
+ }
+
+ /*
+ * Sibling didn't have a pool config section
+ * Use our own local pool.
+ */
+ if (!cs) {
+ DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix,
+ parent_name(cs), parent_name(mycs));
+ cs = mycs;
+ }
+
+ /*
+ * If fr_connection_pool_init has already been called
+ * for this config section, reuse the previous instance.
+ *
+ * This allows modules to pass in the config sections
+ * they would like to use the connection pool from.
+ */
+ pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY);
+ if (!pool) {
+ DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs));
+ pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix);
+ if (!pool) return NULL;
+
+ DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
+ cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL);
+ return pool;
+ }
+ pool->ref++;
+
+ DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs));
+
+ /*
+ * We're reusing pool data add it to our local config
+ * section. This allows other modules to transitively
+ * re-use a pool through this module.
+ */
+ if (mycs != cs) {
+ DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"",
+ log_prefix, pool, parent_name(cs), parent_name(mycs));
+ cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL);
+ }
+
+ return pool;
+}
+
+/** Get the number of connections currently in the pool
+ *
+ * @param pool to count connections for.
+ * @return the number of connections in the pool
+ */
+int fr_connection_pool_get_num(fr_connection_pool_t *pool)
+{
+ return pool->stats.num;
+}
+
+/** Get the number of times an operation should be retried
+ *
+ * The lower of either the number of available connections or
+ * the configured max_retries.
+ *
+ * @param pool to get the retry count for.
+ * @return the number of times an operation can be retried.
+ */
+int fr_connection_pool_get_retries(fr_connection_pool_t *pool)
+{
+ return (pool->max_retries < pool->stats.num) ? pool->max_retries : pool->stats.num;
+}
+
+/** Get the number of connections currently in the pool
+ *
+ * @param module the module configuration which should contain the pool
+ * @return the stats, or NULL on "not found"
+ */
+fr_connection_pool_stats_t const *fr_connection_pool_stats(CONF_SECTION *module)
+{
+ fr_connection_pool_t *pool = NULL;
+ CONF_SECTION *cs;
+
+ cs = cf_section_sub_find(module, "pool");
+ if (!cs) {
+ CONF_PAIR *cp;
+ module_instance_t *mi;
+ char const *value;
+
+ /*
+ * This is the name of the module, not a
+ * reference. <sigh>.
+ */
+ cp = cf_pair_find(module, "pool");
+ if (!cp) return NULL;
+
+ value = cf_pair_value(cp);
+ if (!value) return NULL;
+
+ mi = module_find(cf_item_parent(cf_section_to_item(module)), value);
+ if (!mi) return NULL;
+
+ cs = cf_section_sub_find(mi->cs, "pool");
+ if (!cs) return NULL;
+ }
+
+ pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY);
+ if (!pool) return NULL;
+
+ return &pool->stats;
+}
+
+
+/** Delete a connection pool
+ *
+ * Closes, unlinks and frees all connections in the connection pool, then frees
+ * all memory used by the connection pool.
+ *
+ * @note Will call the 'stop' trigger.
+ * @note Must be called with the mutex free.
+ *
+ * @param[in,out] pool to delete.
+ */
+void fr_connection_pool_free(fr_connection_pool_t *pool)
+{
+ fr_connection_t *this;
+
+ if (!pool) return;
+
+ /*
+ * More modules hold a reference to this pool, don't free
+ * it yet.
+ */
+ if (pool->ref > 0) {
+ pool->ref--;
+ return;
+ }
+
+ DEBUG("%s: Removing connection pool", pool->log_prefix);
+
+ pthread_mutex_lock(&pool->mutex);
+
+ /*
+ * Don't loop over the list. Just keep removing the head
+ * until they're all gone.
+ */
+ while ((this = pool->head) != NULL) {
+ fr_connection_close_internal(pool, this, "Closing connection", "Shutting down connection pool");
+ }
+
+ fr_heap_delete(pool->heap);
+
+ fr_connection_exec_trigger(pool, "stop");
+
+ rad_assert(pool->head == NULL);
+ rad_assert(pool->tail == NULL);
+ rad_assert(pool->stats.num == 0);
+
+#ifdef HAVE_PTHREAD_H
+ pthread_mutex_destroy(&pool->mutex);
+#endif
+
+ talloc_free(pool);
+}
+
+/** Reserve a connection in the connection pool
+ *
+ * Will attempt to find an unused connection in the connection pool, if one is
+ * found, will mark it as in in use increment the number of active connections
+ * and return the connection handle.
+ *
+ * If no free connections are found will attempt to spawn a new one, conditional
+ * on a connection spawning not already being in progress, and not being at the
+ * 'max' connection limit.
+ *
+ * @note fr_connection_release must be called once the caller has finished
+ * using the connection.
+ *
+ * @see fr_connection_release
+ * @param[in,out] pool to reserve the connection from.
+ * @return
+ * - A pointer to the connection handle.
+ * - NULL on error.
+ */
+void *fr_connection_get(fr_connection_pool_t *pool)
+{
+ return fr_connection_get_internal(pool, true);
+}
+
+/** Release a connection
+ *
+ * Will mark a connection as unused and decrement the number of active
+ * connections.
+ *
+ * @see fr_connection_get
+ * @param[in,out] pool to release the connection in.
+ * @param[in,out] conn to release.
+ */
+void fr_connection_release(fr_connection_pool_t *pool, void *conn)
+{
+ fr_connection_t *this;
+
+ this = fr_connection_find(pool, conn);
+ if (!this) return;
+
+ this->in_use = false;
+
+ /*
+ * Record when the connection was last released
+ */
+ gettimeofday(&this->last_released, NULL);
+
+ /*
+ * Insert the connection in the heap.
+ *
+ * This will either be based on when we *started* using it
+ * (allowing fast links to be re-used, and slow links to be
+ * gradually expired), or when we released it (allowing
+ * the maximum amount of time between connection use).
+ */
+ fr_heap_insert(pool->heap, this);
+
+ rad_assert(pool->stats.active != 0);
+ pool->stats.active--;
+
+ DEBUG("%s: Released connection (%" PRIu64 ")", pool->log_prefix, this->number);
+
+ /*
+ * We mirror the "spawn on get" functionality by having
+ * "delete on release". If there are too many spare
+ * connections, go manage the pool && clean some up.
+ */
+ fr_connection_pool_check(pool);
+}
+
+/** Reconnect a suspected inviable connection
+ *
+ * This should be called by the module if it suspects that a connection is
+ * not viable (e.g. the server has closed it).
+ *
+ * Will attempt to create a new connection handle using the create callback,
+ * and if this is successful the new handle will be assigned to the existing
+ * pool connection.
+ *
+ * If this is not successful, the connection will be removed from the pool.
+ *
+ * When implementing a module that uses the connection pool API, it is advisable
+ * to pass a pointer to the pointer to the handle (void **conn)
+ * to all functions which may call reconnect. This is so that if a new handle
+ * is created and returned, the handle pointer can be updated up the callstack,
+ * and a function higher up the stack doesn't attempt to use a now invalid
+ * connection handle.
+ *
+ * @note Will free any talloced memory hung off the context of the connection,
+ * being reconnected.
+ *
+ * @warning After calling reconnect the caller *MUST NOT* attempt to use
+ * the old handle in any other operations, as its memory will have been
+ * freed.
+ *
+ * @see fr_connection_get
+ * @param[in,out] pool to reconnect the connection in.
+ * @param[in,out] conn to reconnect.
+ * @return new connection handle if successful else NULL.
+ */
+void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn)
+{
+ void *new_conn;
+ fr_connection_t *this;
+
+ if (!pool || !conn) return NULL;
+
+ /*
+ * Don't allow opening of new connections if we're trying
+ * to exit.
+ */
+ if (main_config.exiting) {
+ fr_connection_release(pool, conn);
+ return NULL;
+ }
+
+ /*
+ * If fr_connection_find is successful the pool is now locked
+ */
+ this = fr_connection_find(pool, conn);
+ if (!this) return NULL;
+
+ new_conn = fr_connection_reconnect_internal(pool, this);
+ pthread_mutex_unlock(&pool->mutex);
+
+ return new_conn;
+}
+
+/** Delete a connection from the connection pool.
+ *
+ * Resolves the connection handle to a connection, then (if found)
+ * closes, unlinks and frees that connection.
+ *
+ * @note Must be called with the mutex free.
+ *
+ * @param[in,out] pool Connection pool to modify.
+ * @param[in] conn to delete.
+ * @param[in] msg why the connection was closed.
+ * @return
+ * - 0 If the connection could not be found.
+ * - 1 if the connection was deleted.
+ */
+int fr_connection_close(fr_connection_pool_t *pool, void *conn, char const *msg)
+{
+ fr_connection_t *this;
+
+ this = fr_connection_find(pool, conn);
+ if (!this) return 0;
+
+ fr_connection_close_internal(pool, this, "Deleting connection", msg);
+ fr_connection_pool_check(pool);
+ return 1;
+}