diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:35:32 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 16:35:32 +0000 |
commit | 5ea77a75dd2d2158401331879f3c8f47940a732c (patch) | |
tree | d89dc06e9f4850a900f161e25f84e922c4f86cc8 /servers/lloadd/connection.c | |
parent | Initial commit. (diff) | |
download | openldap-5ea77a75dd2d2158401331879f3c8f47940a732c.tar.xz openldap-5ea77a75dd2d2158401331879f3c8f47940a732c.zip |
Adding upstream version 2.5.13+dfsg.upstream/2.5.13+dfsgupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | servers/lloadd/connection.c | 620 |
1 files changed, 620 insertions, 0 deletions
diff --git a/servers/lloadd/connection.c b/servers/lloadd/connection.c new file mode 100644 index 0000000..967c6c5 --- /dev/null +++ b/servers/lloadd/connection.c @@ -0,0 +1,620 @@ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software <http://www.openldap.org/>. + * + * Copyright 1998-2022 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * <http://www.OpenLDAP.org/license.html>. + */ +/* Portions Copyright (c) 1995 Regents of the University of Michigan. + * All rights reserved. + * + * Redistribution and use in source and binary forms are permitted + * provided that this notice is preserved and that due credit is given + * to the University of Michigan at Ann Arbor. The name of the University + * may not be used to endorse or promote products derived from this + * software without specific prior written permission. This software + * is provided ``as is'' without express or implied warranty. + */ + +#include "portable.h" + +#include <stdio.h> +#ifdef HAVE_LIMITS_H +#include <limits.h> +#endif + +#include <ac/socket.h> +#include <ac/errno.h> +#include <ac/string.h> +#include <ac/time.h> +#include <ac/unistd.h> + +#include "lload.h" + +#include "lutil.h" +#include "lutil_ldap.h" + +static unsigned long conn_nextid = 0; + +static void +lload_connection_assign_nextid( LloadConnection *conn ) +{ + conn->c_connid = __atomic_fetch_add( &conn_nextid, 1, __ATOMIC_RELAXED ); +} + +/* + * We start off with the connection muted and c_currentber holding the pdu we + * received. + * + * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait + * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to + * maintain fairness and not hog the worker thread forever. + * + * If we've run out of pdus immediately available from the stream or hit the + * budget, we unmute the connection. + * + * c->c_pdu_cb might return an 'error' and not free the connection. That can + * happen when changing the state or when client is blocked on writing and + * already has a pdu pending on the same operation, it's their job to make sure + * we're woken up again. + */ +void * +handle_pdus( void *ctx, void *arg ) +{ + LloadConnection *c = arg; + int pdus_handled = 0; + epoch_t epoch; + + /* A reference was passed on to us */ + assert( IS_ALIVE( c, c_refcnt ) ); + + epoch = epoch_join(); + for ( ;; ) { + BerElement *ber; + ber_tag_t tag; + ber_len_t len; + + if ( c->c_pdu_cb( c ) ) { + /* Error/reset, get rid ouf our reference and bail */ + goto done; + } + + if ( !IS_ALIVE( c, c_live ) ) { + break; + } + + if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) { + /* Do not read now, re-enable read event instead */ + break; + } + + ber = c->c_currentber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "handle_pdus: " + "connid=%lu, ber_alloc failed\n", + c->c_connid ); + CONNECTION_LOCK_DESTROY(c); + goto done; + } + c->c_currentber = ber; + + checked_lock( &c->c_io_mutex ); + if ( (lload_features & LLOAD_FEATURE_PAUSE) && + (c->c_io_state & LLOAD_C_READ_PAUSE) ) { + goto pause; + } + tag = ber_get_next( c->c_sb, &len, ber ); + checked_unlock( &c->c_io_mutex ); + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + if ( err || tag == LBER_ERROR ) { + char ebuf[128]; + Debug( LDAP_DEBUG_ANY, "handle_pdus: " + "ber_get_next on fd=%d failed errno=%d (%s)\n", + c->c_fd, err, + sock_errstr( err, ebuf, sizeof(ebuf) ) ); + } else { + Debug( LDAP_DEBUG_STATS, "handle_pdus: " + "ber_get_next on fd=%d connid=%lu received " + "a strange PDU tag=%lx\n", + c->c_fd, c->c_connid, tag ); + } + + c->c_currentber = NULL; + ber_free( ber, 1 ); + CONNECTION_LOCK_DESTROY(c); + goto done; + } + break; + } + + assert( IS_ALIVE( c, c_refcnt ) ); + epoch_leave( epoch ); + epoch = epoch_join(); + assert( IS_ALIVE( c, c_refcnt ) ); + } + + checked_lock( &c->c_io_mutex ); + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "handle_pdus: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } +pause: + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + +done: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + epoch_leave( epoch ); + return NULL; +} + +/* + * Initial read on the connection, if we get an LDAP PDU, submit the + * processing of this and successive ones to the work queue. + * + * If we can't submit it to the queue (overload), process this one and return + * to the event loop immediately after. + */ +void +connection_read_cb( evutil_socket_t s, short what, void *arg ) +{ + LloadConnection *c = arg; + BerElement *ber; + ber_tag_t tag; + ber_len_t len; + epoch_t epoch; + int pause; + + if ( !IS_ALIVE( c, c_live ) ) { + event_del( c->c_read_event ); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "suspended read event on a dead connid=%lu\n", + c->c_connid ); + return; + } + + if ( what & EV_TIMEOUT ) { + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "connid=%lu, timeout reached, destroying\n", + c->c_connid ); + /* Make sure the connection stays around for us to unlock it */ + epoch = epoch_join(); + CONNECTION_LOCK_DESTROY(c); + epoch_leave( epoch ); + return; + } + + if ( !acquire_ref( &c->c_refcnt ) ) { + return; + } + epoch = epoch_join(); + + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "connection connid=%lu ready to read\n", + c->c_connid ); + + ber = c->c_currentber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "connection_read_cb: " + "connid=%lu, ber_alloc failed\n", + c->c_connid ); + goto out; + } + c->c_currentber = ber; + + checked_lock( &c->c_io_mutex ); + assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ); + tag = ber_get_next( c->c_sb, &len, ber ); + pause = c->c_io_state & LLOAD_C_READ_PAUSE; + checked_unlock( &c->c_io_mutex ); + + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + if ( err || tag == LBER_ERROR ) { + char ebuf[128]; + Debug( LDAP_DEBUG_STATS, "connection_read_cb: " + "ber_get_next on fd=%d failed errno=%d (%s)\n", + c->c_fd, err, + sock_errstr( err, ebuf, sizeof(ebuf) ) ); + } else { + Debug( LDAP_DEBUG_STATS, "connection_read_cb: " + "ber_get_next on fd=%d connid=%lu received " + "a strange PDU tag=%lx\n", + c->c_fd, c->c_connid, tag ); + } + + c->c_currentber = NULL; + ber_free( ber, 1 ); + + event_del( c->c_read_event ); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "suspended read event on dying connid=%lu\n", + c->c_connid ); + CONNECTION_LOCK_DESTROY(c); + goto out; + } + if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) { + event_add( c->c_read_event, c->c_read_timeout ); + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "re-enabled read event on connid=%lu\n", + c->c_connid ); + } + goto out; + } + + checked_lock( &c->c_io_mutex ); + c->c_io_state |= LLOAD_C_READ_HANDOVER; + checked_unlock( &c->c_io_mutex ); + event_del( c->c_read_event ); + + if ( !lload_conn_max_pdus_per_cycle || + ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) { + /* If we're overloaded or configured as such, process one and resume in + * the next cycle. */ + int rc = c->c_pdu_cb( c ); + + checked_lock( &c->c_io_mutex ); + c->c_io_state &= ~LLOAD_C_READ_HANDOVER; + if ( rc == LDAP_SUCCESS && + ( !(lload_features & LLOAD_FEATURE_PAUSE) || + !(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + checked_unlock( &c->c_io_mutex ); + goto out; + } + + Debug( LDAP_DEBUG_CONNS, "connection_read_cb: " + "suspended read event on connid=%lu\n", + c->c_connid ); + + /* + * We have scheduled a call to handle_pdus to take care of handling this + * and further requests, its reference is now owned by that task. + */ + epoch_leave( epoch ); + return; + +out: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + epoch_leave( epoch ); +} + +void +connection_write_cb( evutil_socket_t s, short what, void *arg ) +{ + LloadConnection *c = arg; + epoch_t epoch; + + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "considering writing to%s connid=%lu what=%hd\n", + c->c_live ? " live" : " dead", c->c_connid, what ); + if ( !IS_ALIVE( c, c_live ) ) { + return; + } + + if ( what & EV_TIMEOUT ) { + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "connid=%lu, timeout reached, destroying\n", + c->c_connid ); + /* Make sure the connection stays around for us to unlock it */ + epoch = epoch_join(); + CONNECTION_LOCK_DESTROY(c); + epoch_leave( epoch ); + return; + } + + /* Before we acquire any locks */ + event_del( c->c_write_event ); + + if ( !acquire_ref( &c->c_refcnt ) ) { + return; + } + + /* If what == 0, we have a caller as opposed to being a callback */ + if ( what ) { + epoch = epoch_join(); + } + + checked_lock( &c->c_io_mutex ); + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "have something to write to connection connid=%lu\n", + c->c_connid ); + + /* We might have been beaten to flushing the data by another thread */ + if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + char ebuf[128]; + checked_unlock( &c->c_io_mutex ); + Debug( LDAP_DEBUG_ANY, "connection_write_cb: " + "ber_flush on fd=%d failed errno=%d (%s)\n", + c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); + CONNECTION_LOCK_DESTROY(c); + goto done; + } + + if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) { + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "connection connid=%lu blocked on writing, marking " + "paused\n", + c->c_connid ); + } + c->c_io_state |= LLOAD_C_READ_PAUSE; + + /* TODO: Do not reset write timeout unless we wrote something */ + event_add( c->c_write_event, lload_write_timeout ); + } else { + c->c_pendingber = NULL; + if ( c->c_io_state & LLOAD_C_READ_PAUSE ) { + c->c_io_state ^= LLOAD_C_READ_PAUSE; + Debug( LDAP_DEBUG_CONNS, "connection_write_cb: " + "Unpausing connection connid=%lu\n", + c->c_connid ); + if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) { + event_add( c->c_read_event, c->c_read_timeout ); + } + } + } + checked_unlock( &c->c_io_mutex ); + +done: + RELEASE_REF( c, c_refcnt, c->c_destroy ); + if ( what ) { + epoch_leave( epoch ); + } +} + +void +connection_destroy( LloadConnection *c ) +{ + assert( c ); + Debug( LDAP_DEBUG_CONNS, "connection_destroy: " + "destroying connection connid=%lu\n", + c->c_connid ); + + CONNECTION_ASSERT_LOCKED(c); + assert( c->c_live == 0 ); + assert( c->c_refcnt == 0 ); + assert( c->c_state == LLOAD_C_INVALID ); + + ber_sockbuf_free( c->c_sb ); + + if ( c->c_currentber ) { + ber_free( c->c_currentber, 1 ); + c->c_currentber = NULL; + } + if ( c->c_pendingber ) { + ber_free( c->c_pendingber, 1 ); + c->c_pendingber = NULL; + } + + if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { + ber_memfree( c->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &c->c_sasl_bind_mech ); + } +#ifdef HAVE_CYRUS_SASL + if ( c->c_sasl_defaults ) { + lutil_sasl_freedefs( c->c_sasl_defaults ); + c->c_sasl_defaults = NULL; + } + if ( c->c_sasl_authctx ) { +#ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */ + if ( c->c_sasl_cbinding ) { + ch_free( c->c_sasl_cbinding ); + } +#endif + sasl_dispose( &c->c_sasl_authctx ); + } +#endif /* HAVE_CYRUS_SASL */ + + CONNECTION_UNLOCK(c); + + ldap_pvt_thread_mutex_destroy( &c->c_io_mutex ); + ldap_pvt_thread_mutex_destroy( &c->c_mutex ); + + ch_free( c ); + + listeners_reactivate(); +} + +/* + * Called holding mutex, will walk cq calling cb on all connections whose + * c_connid <= cq_last->c_connid that still exist at the time we get to them. + */ +void +connections_walk_last( + ldap_pvt_thread_mutex_t *cq_mutex, + lload_c_head *cq, + LloadConnection *cq_last, + CONNCB cb, + void *arg ) +{ + LloadConnection *c = cq_last; + uintptr_t last_connid; + + if ( LDAP_CIRCLEQ_EMPTY( cq ) ) { + return; + } + assert_locked( cq_mutex ); + + last_connid = c->c_connid; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + + while ( !acquire_ref( &c->c_refcnt ) ) { + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + if ( c->c_connid >= last_connid ) { + assert_locked( cq_mutex ); + return; + } + } + + /* + * Notes: + * - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid + * order + * - the connection with the highest c_connid is passed in cq_last + * - we can only use cq when we hold cq_mutex + * - connections might be added to or removed from cq while we're busy + * processing connections + * - we need a way to detect we've finished looping around cq for some + * definition of looping around + */ + do { + int rc; + + checked_unlock( cq_mutex ); + + rc = cb( c, arg ); + RELEASE_REF( c, c_refcnt, c->c_destroy ); + + checked_lock( cq_mutex ); + if ( rc || LDAP_CIRCLEQ_EMPTY( cq ) ) { + break; + } + + do { + LloadConnection *old = c; + c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next ); + if ( c->c_connid <= old->c_connid || c->c_connid > last_connid ) { + assert_locked( cq_mutex ); + return; + } + } while ( !acquire_ref( &c->c_refcnt ) ); + } while ( c->c_connid <= last_connid ); + assert_locked( cq_mutex ); +} + +void +connections_walk( + ldap_pvt_thread_mutex_t *cq_mutex, + lload_c_head *cq, + CONNCB cb, + void *arg ) +{ + LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq ); + return connections_walk_last( cq_mutex, cq, cq_last, cb, arg ); +} + +int +lload_connection_close( LloadConnection *c, void *arg ) +{ + int gentle = *(int *)arg; + LloadOperation *op; + + Debug( LDAP_DEBUG_CONNS, "lload_connection_close: " + "marking connection connid=%lu closing\n", + c->c_connid ); + + /* We were approached from the connection list */ + assert( IS_ALIVE( c, c_refcnt ) ); + + CONNECTION_LOCK(c); + if ( !gentle || !c->c_ops ) { + CONNECTION_DESTROY(c); + return LDAP_SUCCESS; + } + + /* The first thing we do is make sure we don't get new Operations in */ + c->c_state = LLOAD_C_CLOSING; + + do { + TAvlnode *node = ldap_tavl_end( c->c_ops, TAVL_DIR_LEFT ); + op = node->avl_data; + + /* Close operations that would need client action to resolve, + * only SASL binds in progress do that right now */ + if ( op->o_client_msgid || op->o_upstream_msgid ) { + break; + } + + CONNECTION_UNLOCK(c); + operation_unlink( op ); + CONNECTION_LOCK(c); + } while ( c->c_ops ); + + CONNECTION_UNLOCK(c); + return LDAP_SUCCESS; +} + +LloadConnection * +lload_connection_init( ber_socket_t s, const char *peername, int flags ) +{ + LloadConnection *c; + + assert( peername != NULL ); + + if ( s == AC_SOCKET_INVALID ) { + Debug( LDAP_DEBUG_ANY, "lload_connection_init: " + "init of socket fd=%ld invalid\n", + (long)s ); + return NULL; + } + + assert( s >= 0 ); + + c = ch_calloc( 1, sizeof(LloadConnection) ); + + c->c_fd = s; + c->c_sb = ber_sockbuf_alloc(); + ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s ); + +#ifdef LDAP_PF_LOCAL + if ( flags & CONN_IS_IPC ) { +#ifdef LDAP_DEBUG + ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug, + LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" ); +#endif + ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd, + LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); + } else +#endif /* LDAP_PF_LOCAL */ + { +#ifdef LDAP_DEBUG + ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug, + LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" ); +#endif + ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp, + LBER_SBIOD_LEVEL_PROVIDER, (void *)&s ); + } + +#ifdef LDAP_DEBUG + ber_sockbuf_add_io( + c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" ); +#endif + + c->c_next_msgid = 1; + c->c_refcnt = c->c_live = 1; + c->c_destroy = connection_destroy; + + LDAP_CIRCLEQ_ENTRY_INIT( c, c_next ); + + ldap_pvt_thread_mutex_init( &c->c_mutex ); + ldap_pvt_thread_mutex_init( &c->c_io_mutex ); + + lload_connection_assign_nextid( c ); + + Debug( LDAP_DEBUG_CONNS, "lload_connection_init: " + "connection connid=%lu allocated for socket fd=%d peername=%s\n", + c->c_connid, s, peername ); + + c->c_state = LLOAD_C_ACTIVE; + + return c; +} |