diff options
Diffstat (limited to 'servers/lloadd/operation.c')
-rw-r--r-- | servers/lloadd/operation.c | 699 |
1 files changed, 699 insertions, 0 deletions
diff --git a/servers/lloadd/operation.c b/servers/lloadd/operation.c new file mode 100644 index 0000000..9074404 --- /dev/null +++ b/servers/lloadd/operation.c @@ -0,0 +1,699 @@ +/* $OpenLDAP$ */ +/* This work is part of OpenLDAP Software <http://www.openldap.org/>. + * + * Copyright 1998-2022 The OpenLDAP Foundation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted only as authorized by the OpenLDAP + * Public License. + * + * A copy of this license is available in the file LICENSE in the + * top-level directory of the distribution or, alternatively, at + * <http://www.OpenLDAP.org/license.html>. + */ + +#include "portable.h" + +#include "lutil.h" +#include "lload.h" + +ldap_pvt_thread_mutex_t lload_pin_mutex; +unsigned long lload_next_pin = 1; + +ber_tag_t +slap_req2res( ber_tag_t tag ) +{ + switch ( tag ) { + case LDAP_REQ_ADD: + case LDAP_REQ_BIND: + case LDAP_REQ_COMPARE: + case LDAP_REQ_EXTENDED: + case LDAP_REQ_MODIFY: + case LDAP_REQ_MODRDN: + tag++; + break; + + case LDAP_REQ_DELETE: + tag = LDAP_RES_DELETE; + break; + + case LDAP_REQ_ABANDON: + case LDAP_REQ_UNBIND: + tag = LBER_SEQUENCE; + break; + + case LDAP_REQ_SEARCH: + tag = LDAP_RES_SEARCH_RESULT; + break; + + default: + tag = LBER_SEQUENCE; + } + + return tag; +} + +const char * +lload_msgtype2str( ber_tag_t tag ) +{ + switch ( tag ) { + case LDAP_REQ_ABANDON: return "abandon request"; + case LDAP_REQ_ADD: return "add request"; + case LDAP_REQ_BIND: return "bind request"; + case LDAP_REQ_COMPARE: return "compare request"; + case LDAP_REQ_DELETE: return "delete request"; + case LDAP_REQ_EXTENDED: return "extended request"; + case LDAP_REQ_MODIFY: return "modify request"; + case LDAP_REQ_RENAME: return "rename request"; + case LDAP_REQ_SEARCH: return "search request"; + case LDAP_REQ_UNBIND: return "unbind request"; + + case LDAP_RES_ADD: return "add result"; + case LDAP_RES_BIND: return "bind result"; + case LDAP_RES_COMPARE: return "compare result"; + case LDAP_RES_DELETE: return "delete result"; + case LDAP_RES_EXTENDED: return "extended result"; + case LDAP_RES_INTERMEDIATE: return "intermediate response"; + case LDAP_RES_MODIFY: return "modify result"; + case LDAP_RES_RENAME: return "rename result"; + case LDAP_RES_SEARCH_ENTRY: return "search-entry response"; + case LDAP_RES_SEARCH_REFERENCE: return "search-reference response"; + case LDAP_RES_SEARCH_RESULT: return "search result"; + } + return "unknown message"; +} + +int +operation_client_cmp( const void *left, const void *right ) +{ + const LloadOperation *l = left, *r = right; + + assert( l->o_client_connid == r->o_client_connid ); + if ( l->o_client_msgid || r->o_client_msgid ) { + return ( l->o_client_msgid < r->o_client_msgid ) ? + -1 : + ( l->o_client_msgid > r->o_client_msgid ); + } else { + return ( l->o_pin_id < r->o_pin_id ) ? -1 : + ( l->o_pin_id > r->o_pin_id ); + } +} + +int +operation_upstream_cmp( const void *left, const void *right ) +{ + const LloadOperation *l = left, *r = right; + + assert( l->o_upstream_connid == r->o_upstream_connid ); + if ( l->o_upstream_msgid || r->o_upstream_msgid ) { + return ( l->o_upstream_msgid < r->o_upstream_msgid ) ? + -1 : + ( l->o_upstream_msgid > r->o_upstream_msgid ); + } else { + return ( l->o_pin_id < r->o_pin_id ) ? -1 : + ( l->o_pin_id > r->o_pin_id ); + } +} + +/* + * Entered holding c_mutex for now. + */ +LloadOperation * +operation_init( LloadConnection *c, BerElement *ber ) +{ + LloadOperation *op; + ber_tag_t tag; + ber_len_t len; + int rc; + + if ( !IS_ALIVE( c, c_live ) ) { + return NULL; + } + + op = ch_calloc( 1, sizeof(LloadOperation) ); + op->o_client = c; + op->o_client_connid = c->c_connid; + op->o_ber = ber; + op->o_start = slap_get_time(); + + ldap_pvt_thread_mutex_init( &op->o_link_mutex ); + + op->o_refcnt = 1; + + tag = ber_get_int( ber, &op->o_client_msgid ); + if ( tag != LDAP_TAG_MSGID ) { + goto fail; + } + + if ( !op->o_client_msgid ) { + goto fail; + } + + CONNECTION_ASSERT_LOCKED(c); + rc = ldap_tavl_insert( &c->c_ops, op, operation_client_cmp, ldap_avl_dup_error ); + if ( rc ) { + Debug( LDAP_DEBUG_PACKETS, "operation_init: " + "several operations with same msgid=%d in-flight " + "from client connid=%lu\n", + op->o_client_msgid, op->o_client_connid ); + goto fail; + } + + tag = op->o_tag = ber_skip_element( ber, &op->o_request ); + switch ( tag ) { + case LBER_ERROR: + rc = -1; + break; + } + if ( rc ) { + ldap_tavl_delete( &c->c_ops, op, operation_client_cmp ); + goto fail; + } + + tag = ber_peek_tag( ber, &len ); + if ( tag == LDAP_TAG_CONTROLS ) { + ber_skip_element( ber, &op->o_ctrls ); + } + + switch ( op->o_tag ) { + case LDAP_REQ_BIND: + lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received++; + break; + default: + lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++; + break; + } + + Debug( LDAP_DEBUG_STATS, "operation_init: " + "received a new operation, %s with msgid=%d for client " + "connid=%lu\n", + lload_msgtype2str( op->o_tag ), op->o_client_msgid, + op->o_client_connid ); + + c->c_n_ops_executing++; + return op; + +fail: + ch_free( op ); + return NULL; +} + +void +operation_destroy( LloadOperation *op ) +{ + Debug( LDAP_DEBUG_TRACE, "operation_destroy: " + "op=%p destroyed operation from client connid=%lu, " + "client msgid=%d\n", + op, op->o_client_connid, op->o_client_msgid ); + + assert( op->o_refcnt == 0 ); + assert( op->o_client == NULL ); + assert( op->o_upstream == NULL ); + + ber_free( op->o_ber, 1 ); + ldap_pvt_thread_mutex_destroy( &op->o_link_mutex ); + ch_free( op ); +} + +int +operation_unlink( LloadOperation *op ) +{ + LloadConnection *client, *upstream; + uintptr_t prev_refcnt; + int result = 0; + + if ( !( prev_refcnt = try_release_ref( + &op->o_refcnt, op, (dispose_cb *)operation_destroy ) ) ) { + return result; + } + + assert( prev_refcnt == 1 ); + + Debug( LDAP_DEBUG_TRACE, "operation_unlink: " + "unlinking operation between client connid=%lu and upstream " + "connid=%lu " + "client msgid=%d\n", + op->o_client_connid, op->o_upstream_connid, op->o_client_msgid ); + + checked_lock( &op->o_link_mutex ); + client = op->o_client; + upstream = op->o_upstream; + + op->o_client = NULL; + op->o_upstream = NULL; + checked_unlock( &op->o_link_mutex ); + + assert( client || upstream ); + + if ( client ) { + result |= operation_unlink_client( op, client ); + operation_update_global_rejected( op ); + } + + if ( upstream ) { + result |= operation_unlink_upstream( op, upstream ); + } + + return result; +} + +int +operation_unlink_client( LloadOperation *op, LloadConnection *client ) +{ + LloadOperation *removed; + int result = 0; + + Debug( LDAP_DEBUG_TRACE, "operation_unlink_client: " + "unlinking operation op=%p msgid=%d client connid=%lu\n", + op, op->o_client_msgid, op->o_client_connid ); + + CONNECTION_LOCK(client); + if ( (removed = ldap_tavl_delete( + &client->c_ops, op, operation_client_cmp )) ) { + result = LLOAD_OP_DETACHING_CLIENT; + + assert( op == removed ); + client->c_n_ops_executing--; + + if ( op->o_tag == LDAP_REQ_BIND && + client->c_state == LLOAD_C_BINDING ) { + client->c_state = LLOAD_C_READY; + if ( !BER_BVISNULL( &client->c_auth ) ) { + ber_memfree( client->c_auth.bv_val ); + BER_BVZERO( &client->c_auth ); + } + if ( !BER_BVISNULL( &client->c_sasl_bind_mech ) ) { + ber_memfree( client->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &client->c_sasl_bind_mech ); + } + if ( op->o_pin_id ) { + client->c_pin_id = 0; + } + } + } + if ( client->c_state == LLOAD_C_CLOSING && !client->c_ops ) { + CONNECTION_DESTROY(client); + } else { + CONNECTION_UNLOCK(client); + } + + return result; +} + +int +operation_unlink_upstream( LloadOperation *op, LloadConnection *upstream ) +{ + LloadOperation *removed; + LloadBackend *b = NULL; + int result = 0; + + Debug( LDAP_DEBUG_TRACE, "operation_unlink_upstream: " + "unlinking operation op=%p msgid=%d upstream connid=%lu\n", + op, op->o_upstream_msgid, op->o_upstream_connid ); + + CONNECTION_LOCK(upstream); + if ( (removed = ldap_tavl_delete( + &upstream->c_ops, op, operation_upstream_cmp )) ) { + result |= LLOAD_OP_DETACHING_UPSTREAM; + + assert( op == removed ); + upstream->c_n_ops_executing--; + + if ( upstream->c_state == LLOAD_C_BINDING ) { + assert( op->o_tag == LDAP_REQ_BIND && upstream->c_ops == NULL ); + upstream->c_state = LLOAD_C_READY; + if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) { + ber_memfree( upstream->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &upstream->c_sasl_bind_mech ); + } + } + operation_update_conn_counters( op, upstream ); + b = upstream->c_backend; + } + if ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) { + CONNECTION_DESTROY(upstream); + } else { + CONNECTION_UNLOCK(upstream); + } + + if ( b ) { + checked_lock( &b->b_mutex ); + b->b_n_ops_executing--; + operation_update_backend_counters( op, b ); + checked_unlock( &b->b_mutex ); + } + + return result; +} + +int +operation_send_abandon( LloadOperation *op, LloadConnection *upstream ) +{ + BerElement *ber; + int rc = -1; + + if ( !IS_ALIVE( upstream, c_live ) ) { + return rc; + } + + checked_lock( &upstream->c_io_mutex ); + ber = upstream->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "operation_send_abandon: " + "ber_alloc failed\n" ); + goto done; + } + upstream->c_pendingber = ber; + + Debug( LDAP_DEBUG_TRACE, "operation_send_abandon: " + "abandoning %s msgid=%d on connid=%lu\n", + lload_msgtype2str( op->o_tag ), op->o_upstream_msgid, + op->o_upstream_connid ); + + if ( op->o_tag == LDAP_REQ_BIND ) { + rc = ber_printf( ber, "t{tit{ist{s}}}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, upstream->c_next_msgid++, + LDAP_REQ_BIND, LDAP_VERSION3, "", LDAP_AUTH_SASL, "" ); + } else { + rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, upstream->c_next_msgid++, + LDAP_REQ_ABANDON, op->o_upstream_msgid ); + } + + if ( rc < 0 ) { + ber_free( ber, 1 ); + upstream->c_pendingber = NULL; + goto done; + } + rc = LDAP_SUCCESS; + +done: + checked_unlock( &upstream->c_io_mutex ); + return rc; +} + +/* + * Will remove the operation from its upstream and if it was still there, + * sends an abandon request. + * + * Being called from client_reset or request_abandon, the following hold: + * - noone else is processing the read part of the client connection (no new + * operations come in there - relevant for the c_state checks) + * - op->o_client_refcnt > op->o_client_live (and it follows that op->o_client != NULL) + */ +void +operation_abandon( LloadOperation *op ) +{ + LloadConnection *c; + + checked_lock( &op->o_link_mutex ); + c = op->o_upstream; + checked_unlock( &op->o_link_mutex ); + if ( !c || !IS_ALIVE( c, c_live ) ) { + goto done; + } + + /* for now consider all abandoned operations completed, + * perhaps add a separate counter later */ + op->o_res = LLOAD_OP_COMPLETED; + if ( !operation_unlink_upstream( op, c ) ) { + /* The operation has already been abandoned or finished */ + Debug( LDAP_DEBUG_TRACE, "operation_abandon: " + "%s from connid=%lu msgid=%d not present in connid=%lu any " + "more\n", + lload_msgtype2str( op->o_tag ), op->o_client_connid, + op->o_client_msgid, op->o_upstream_connid ); + goto done; + } + + if ( operation_send_abandon( op, c ) == LDAP_SUCCESS ) { + connection_write_cb( -1, 0, c ); + } + +done: + operation_unlink( op ); +} + +void +operation_send_reject( + LloadOperation *op, + int result, + const char *msg, + int send_anyway ) +{ + LloadConnection *c; + BerElement *ber; + int found; + + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " + "rejecting %s from client connid=%lu with message: \"%s\"\n", + lload_msgtype2str( op->o_tag ), op->o_client_connid, msg ); + + checked_lock( &op->o_link_mutex ); + c = op->o_client; + checked_unlock( &op->o_link_mutex ); + if ( !c || !IS_ALIVE( c, c_live ) ) { + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " + "not sending msgid=%d, client connid=%lu is dead\n", + op->o_client_msgid, op->o_client_connid ); + + goto done; + } + + found = operation_unlink_client( op, c ); + if ( !found && !send_anyway ) { + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " + "msgid=%d not scheduled for client connid=%lu anymore, " + "not sending\n", + op->o_client_msgid, c->c_connid ); + goto done; + } + + if ( op->o_client_msgid == 0 ) { + assert( op->o_saved_msgid == 0 && op->o_pin_id ); + Debug( LDAP_DEBUG_TRACE, "operation_send_reject: " + "operation pin=%lu is just a pin, not sending\n", + op->o_pin_id ); + goto done; + } + + checked_lock( &c->c_io_mutex ); + ber = c->c_pendingber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + checked_unlock( &c->c_io_mutex ); + Debug( LDAP_DEBUG_ANY, "operation_send_reject: " + "ber_alloc failed, closing connid=%lu\n", + c->c_connid ); + CONNECTION_LOCK_DESTROY(c); + goto done; + } + c->c_pendingber = ber; + + ber_printf( ber, "t{tit{ess}}", LDAP_TAG_MESSAGE, + LDAP_TAG_MSGID, op->o_client_msgid, + slap_req2res( op->o_tag ), result, "", msg ); + + checked_unlock( &c->c_io_mutex ); + + connection_write_cb( -1, 0, c ); + +done: + operation_unlink( op ); +} + +/* + * Upstream is shutting down, signal the client if necessary, but we have to + * call operation_destroy_from_upstream ourselves to detach upstream from the + * op. + * + * Only called from upstream_destroy. + */ +void +operation_lost_upstream( LloadOperation *op ) +{ + operation_send_reject( op, LDAP_OTHER, + "connection to the remote server has been severed", 0 ); +} + +int +connection_timeout( LloadConnection *upstream, void *arg ) +{ + LloadOperation *op; + TAvlnode *ops = NULL, *node, *next; + LloadBackend *b = upstream->c_backend; + time_t threshold = *(time_t *)arg; + int rc, nops = 0; + + CONNECTION_LOCK(upstream); + for ( node = ldap_tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node && + ((LloadOperation *)node->avl_data)->o_start < + threshold; /* shortcut */ + node = next ) { + LloadOperation *found_op; + + next = ldap_tavl_next( node, TAVL_DIR_RIGHT ); + op = node->avl_data; + + /* Have we received another response since? */ + if ( op->o_last_response && op->o_last_response >= threshold ) { + continue; + } + + op->o_res = LLOAD_OP_FAILED; + found_op = ldap_tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ); + assert( op == found_op ); + + if ( upstream->c_state == LLOAD_C_BINDING ) { + assert( op->o_tag == LDAP_REQ_BIND && upstream->c_ops == NULL ); + upstream->c_state = LLOAD_C_READY; + if ( !BER_BVISNULL( &upstream->c_sasl_bind_mech ) ) { + ber_memfree( upstream->c_sasl_bind_mech.bv_val ); + BER_BVZERO( &upstream->c_sasl_bind_mech ); + } + } + + rc = ldap_tavl_insert( &ops, op, operation_upstream_cmp, ldap_avl_dup_error ); + assert( rc == LDAP_SUCCESS ); + + Debug( LDAP_DEBUG_STATS2, "connection_timeout: " + "timing out %s from connid=%lu msgid=%d sent to connid=%lu as " + "msgid=%d\n", + lload_msgtype2str( op->o_tag ), op->o_client_connid, + op->o_client_msgid, op->o_upstream_connid, + op->o_upstream_msgid ); + nops++; + } + + if ( nops == 0 ) { + CONNECTION_UNLOCK(upstream); + return LDAP_SUCCESS; + } + upstream->c_n_ops_executing -= nops; + upstream->c_counters.lc_ops_failed += nops; + Debug( LDAP_DEBUG_STATS, "connection_timeout: " + "timing out %d operations for connid=%lu\n", + nops, upstream->c_connid ); + CONNECTION_UNLOCK(upstream); + + checked_lock( &b->b_mutex ); + b->b_n_ops_executing -= nops; + checked_unlock( &b->b_mutex ); + + for ( node = ldap_tavl_end( ops, TAVL_DIR_LEFT ); node; + node = ldap_tavl_next( node, TAVL_DIR_RIGHT ) ) { + op = node->avl_data; + + operation_send_reject( op, + op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED : + LDAP_ADMINLIMIT_EXCEEDED, + "upstream did not respond in time", 0 ); + + if ( upstream->c_type != LLOAD_C_BIND && rc == LDAP_SUCCESS ) { + rc = operation_send_abandon( op, upstream ); + } + operation_unlink( op ); + } + + if ( rc == LDAP_SUCCESS ) { + connection_write_cb( -1, 0, upstream ); + } + + CONNECTION_LOCK(upstream); + /* ITS#9799: If a Bind timed out, connection is in an unknown state */ + if ( upstream->c_type == LLOAD_C_BIND || rc != LDAP_SUCCESS || + ( upstream->c_state == LLOAD_C_CLOSING && !upstream->c_ops ) ) { + CONNECTION_DESTROY(upstream); + } else { + CONNECTION_UNLOCK(upstream); + } + + /* just dispose of the AVL, most operations should already be gone */ + ldap_tavl_free( ops, NULL ); + return LDAP_SUCCESS; +} + +void +operations_timeout( evutil_socket_t s, short what, void *arg ) +{ + struct event *self = arg; + LloadBackend *b; + time_t threshold; + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "running timeout task\n" ); + if ( !lload_timeout_api ) goto done; + + threshold = slap_get_time() - lload_timeout_api->tv_sec; + + LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) { + epoch_t epoch; + + checked_lock( &b->b_mutex ); + if ( b->b_n_ops_executing == 0 ) { + checked_unlock( &b->b_mutex ); + continue; + } + + epoch = epoch_join(); + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timing out binds for backend uri=%s\n", + b->b_uri.bv_val ); + connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn, + connection_timeout, &threshold ); + + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timing out other operations for backend uri=%s\n", + b->b_uri.bv_val ); + connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn, + connection_timeout, &threshold ); + + epoch_leave( epoch ); + checked_unlock( &b->b_mutex ); + } +done: + Debug( LDAP_DEBUG_TRACE, "operations_timeout: " + "timeout task finished\n" ); + evtimer_add( self, lload_timeout_api ); +} + +void +operation_update_global_rejected( LloadOperation *op ) +{ + if ( op->o_res == LLOAD_OP_REJECTED ) { + assert( op->o_upstream_connid == 0 ); + switch ( op->o_tag ) { + case LDAP_REQ_BIND: + lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_rejected++; + break; + default: + lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_rejected++; + break; + } + } +} + +void +operation_update_conn_counters( LloadOperation *op, LloadConnection *upstream ) +{ + if ( op->o_res == LLOAD_OP_COMPLETED ) { + upstream->c_counters.lc_ops_completed++; + } else { + upstream->c_counters.lc_ops_failed++; + } +} + +void +operation_update_backend_counters( LloadOperation *op, LloadBackend *b ) +{ + int stat_type = op->o_tag == LDAP_REQ_BIND ? LLOAD_STATS_OPS_BIND : + LLOAD_STATS_OPS_OTHER; + + assert( b != NULL ); + if ( op->o_res == LLOAD_OP_COMPLETED ) { + b->b_counters[stat_type].lc_ops_completed++; + } else { + b->b_counters[stat_type].lc_ops_failed++; + } +} |