/* lload.h - load balancer include file */ /* $OpenLDAP$ */ /* This work is part of OpenLDAP Software . * * Copyright 1998-2022 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted only as authorized by the OpenLDAP * Public License. * * A copy of this license is available in the file LICENSE in the * top-level directory of the distribution or, alternatively, at * . */ /* Portions Copyright (c) 1995 Regents of the University of Michigan. * All rights reserved. * * Redistribution and use in source and binary forms are permitted * provided that this notice is preserved and that due credit is given * to the University of Michigan at Ann Arbor. The name of the University * may not be used to endorse or promote products derived from this * software without specific prior written permission. This software * is provided ``as is'' without express or implied warranty. */ #ifndef _LLOAD_H_ #define _LLOAD_H_ #include "ldap_defaults.h" #include #include #include #include #include #include #include #include #include #include "ldap_avl.h" #include "../servers/slapd/slap.h" #include "../slapd/back-monitor/back-monitor.h" #ifndef ldap_debug #define ldap_debug slap_debug #endif #include "ldap_log.h" #include #include #include "lber_pvt.h" #include "ldap_pvt.h" #include "ldap_pvt_thread.h" #include "ldap_queue.h" #include #ifdef HAVE_CYRUS_SASL #ifdef HAVE_SASL_SASL_H #include #else #include #endif #endif /* HAVE_CYRUS_SASL */ LDAP_BEGIN_DECL #ifdef SERVICE_NAME #undef SERVICE_NAME #endif #define SERVICE_NAME OPENLDAP_PACKAGE "-lloadd" #define LLOAD_SB_MAX_INCOMING_CLIENT ( ( 1 << 24 ) - 1 ) #define LLOAD_SB_MAX_INCOMING_UPSTREAM ( ( 1 << 24 ) - 1 ) #define LLOAD_CONN_MAX_PDUS_PER_CYCLE_DEFAULT 10 #define BER_BV_OPTIONAL( bv ) ( BER_BVISNULL( bv ) ? NULL : ( bv ) ) #include #define checked_lock( mutex ) \ if ( ldap_pvt_thread_mutex_lock( mutex ) != 0 ) assert(0) #define checked_unlock( mutex ) \ if ( ldap_pvt_thread_mutex_unlock( mutex ) != 0 ) assert(0) #ifdef LDAP_THREAD_DEBUG #define assert_locked( mutex ) \ if ( ldap_pvt_thread_mutex_trylock( mutex ) == 0 ) assert(0) #else #define assert_locked( mutex ) ( (void)0 ) #endif typedef struct LloadTier LloadTier; typedef struct LloadBackend LloadBackend; typedef struct LloadPendingConnection LloadPendingConnection; typedef struct LloadConnection LloadConnection; typedef struct LloadOperation LloadOperation; typedef struct LloadChange LloadChange; /* end of forward declarations */ typedef LDAP_STAILQ_HEAD(TierSt, LloadTier) lload_t_head; typedef LDAP_CIRCLEQ_HEAD(BeSt, LloadBackend) lload_b_head; typedef LDAP_CIRCLEQ_HEAD(ConnSt, LloadConnection) lload_c_head; LDAP_SLAPD_V (lload_t_head) tiers; LDAP_SLAPD_V (lload_c_head) clients; LDAP_SLAPD_V (struct slap_bindconf) bindconf; LDAP_SLAPD_V (struct berval) lloadd_identity; /* Used to coordinate server (un)pause, shutdown */ LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_wait_mutex; LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_pause_cond; LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_wait_cond; typedef int lload_cf_aux_table_parse_x( struct berval *val, void *bc, slap_cf_aux_table *tab0, const char *tabmsg, int unparse ); typedef struct LloadListener LloadListener; enum lc_type { LLOAD_CHANGE_UNDEFINED = 0, LLOAD_CHANGE_MODIFY, LLOAD_CHANGE_ADD, LLOAD_CHANGE_DEL, }; enum lc_object { LLOAD_UNDEFINED = 0, LLOAD_DAEMON, /* LLOAD_BINDCONF, */ LLOAD_TIER, LLOAD_BACKEND, }; enum lcf_daemon { LLOAD_DAEMON_MOD_THREADS = 1 << 0, LLOAD_DAEMON_MOD_FEATURES = 1 << 1, LLOAD_DAEMON_MOD_TLS = 1 << 2, LLOAD_DAEMON_MOD_LISTENER_ADD = 1 << 3, LLOAD_DAEMON_MOD_LISTENER_REPLACE = 1 << 4, LLOAD_DAEMON_MOD_BINDCONF = 1 << 5, }; enum lcf_tier { LLOAD_TIER_MOD_TYPE = 1 << 0, }; enum lcf_backend { LLOAD_BACKEND_MOD_OTHER = 1 << 0, LLOAD_BACKEND_MOD_CONNS = 1 << 1, }; struct LloadChange { enum lc_type type; enum lc_object object; union { int generic; enum lcf_daemon daemon; enum lcf_tier tier; enum lcf_backend backend; } flags; void *target; }; typedef enum { #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS LLOAD_FEATURE_VC = 1 << 0, #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ LLOAD_FEATURE_PROXYAUTHZ = 1 << 1, LLOAD_FEATURE_PAUSE = 1 << 2, } lload_features_t; #define LLOAD_FEATURE_SUPPORTED_MASK ( \ LLOAD_FEATURE_PROXYAUTHZ | \ 0 ) #ifdef BALANCER_MODULE #define LLOAD_TLS_CTX ( lload_use_slap_tls_ctx ? slap_tls_ctx : lload_tls_ctx ) #else #define LLOAD_TLS_CTX ( lload_tls_ctx ) #endif enum lload_tls_type { LLOAD_CLEARTEXT = 0, LLOAD_LDAPS, LLOAD_STARTTLS_OPTIONAL, LLOAD_STARTTLS, LLOAD_TLS_ESTABLISHED, }; struct LloadPendingConnection { LloadBackend *backend; struct event *event; ber_socket_t fd; LDAP_LIST_ENTRY(LloadPendingConnection) next; }; typedef struct lload_counters_t { ldap_pvt_mp_t lc_ops_completed; ldap_pvt_mp_t lc_ops_received; ldap_pvt_mp_t lc_ops_forwarded; ldap_pvt_mp_t lc_ops_rejected; ldap_pvt_mp_t lc_ops_failed; } lload_counters_t; enum { LLOAD_STATS_OPS_BIND = 0, LLOAD_STATS_OPS_OTHER, LLOAD_STATS_OPS_LAST }; typedef struct lload_global_stats_t { ldap_pvt_mp_t global_incoming; ldap_pvt_mp_t global_outgoing; lload_counters_t counters[LLOAD_STATS_OPS_LAST]; } lload_global_stats_t; typedef LloadTier *(LloadTierInit)( void ); typedef int (LloadTierConfigCb)( LloadTier *tier, char *arg ); typedef int (LloadTierBackendConfigCb)( LloadTier *tier, LloadBackend *b, char *arg ); typedef int (LloadTierCb)( LloadTier *tier ); typedef int (LloadTierResetCb)( LloadTier *tier, int shutdown ); typedef int (LloadTierBackendCb)( LloadTier *tier, LloadBackend *b ); typedef void (LloadTierChange)( LloadTier *tier, LloadChange *change ); typedef int (LloadTierSelect)( LloadTier *tier, LloadOperation *op, LloadConnection **cp, int *res, char **message ); struct lload_tier_type { char *tier_name; struct berval tier_oc, tier_backend_oc; LloadTierInit *tier_init; LloadTierConfigCb *tier_config; LloadTierBackendConfigCb *tier_backend_config; LloadTierCb *tier_startup; LloadTierCb *tier_update; LloadTierResetCb *tier_reset; LloadTierCb *tier_destroy; LloadTierBackendCb *tier_add_backend; LloadTierBackendCb *tier_remove_backend; LloadTierChange *tier_change; LloadTierSelect *tier_select; }; struct LloadTier { struct lload_tier_type t_type; ldap_pvt_thread_mutex_t t_mutex; lload_b_head t_backends; int t_nbackends; enum { LLOAD_TIER_EXCLUSIVE = 1 << 0, /* Reject if busy */ } t_flags; struct berval t_name; #ifdef BALANCER_MODULE monitor_subsys_t *t_monitor; #endif /* BALANCER_MODULE */ void *t_private; LDAP_STAILQ_ENTRY(LloadTier) t_next; }; /* Can hold mutex when locking a linked connection */ struct LloadBackend { ldap_pvt_thread_mutex_t b_mutex; struct berval b_name, b_uri; int b_proto, b_port; enum lload_tls_type b_tls, b_tls_conf; char *b_host; int b_retry_timeout, b_failed; struct event *b_retry_event; struct timeval b_retry_tv; int b_numconns, b_numbindconns; int b_bindavail, b_active, b_opening; lload_c_head b_conns, b_bindconns, b_preparing; LDAP_LIST_HEAD(ConnectingSt, LloadPendingConnection) b_connecting; LloadConnection *b_last_conn, *b_last_bindconn; long b_max_pending, b_max_conn_pending; long b_n_ops_executing; lload_counters_t b_counters[LLOAD_STATS_OPS_LAST]; LloadTier *b_tier; time_t b_last_update; uintptr_t b_fitness; int b_weight; uintptr_t b_operation_count; uintptr_t b_operation_time; #ifdef BALANCER_MODULE monitor_subsys_t *b_monitor; #endif /* BALANCER_MODULE */ struct evdns_getaddrinfo_request *b_dns_req; void *b_cookie; LDAP_CIRCLEQ_ENTRY(LloadBackend) b_next; }; typedef int (*LloadOperationHandler)( LloadConnection *client, LloadOperation *op, BerElement *ber ); typedef int (*RequestHandler)( LloadConnection *c, LloadOperation *op ); typedef struct lload_exop_handlers_t { struct berval oid; RequestHandler func; } ExopHandler; typedef int (*CONNECTION_PDU_CB)( LloadConnection *c ); typedef void (*CONNECTION_DESTROY_CB)( LloadConnection *c ); /* connection state (protected by c_mutex) */ enum sc_state { LLOAD_C_INVALID = 0, /* MUST BE ZERO (0) */ LLOAD_C_READY, /* ready */ LLOAD_C_CLOSING, /* closing */ LLOAD_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */ LLOAD_C_BINDING, /* binding */ LLOAD_C_DYING, /* part-processed dead waiting to be freed, someone * might still be observing it */ }; enum sc_type { LLOAD_C_OPEN = 0, /* regular connection */ LLOAD_C_PREPARING, /* upstream connection not assigned yet */ LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */ LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */ }; enum sc_io_state { LLOAD_C_OPERATIONAL = 0, /* all is good */ LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or * running, do not re-enable c_read_event */ LLOAD_C_READ_PAUSE = 1 << 1, /* We want to pause reading until the client * has sufficiently caught up with what we * sent */ }; /* Tracking whether an operation might cause a client to restrict which * upstreams are eligible */ enum op_restriction { LLOAD_OP_NOT_RESTRICTED, /* no restrictions in place */ LLOAD_OP_RESTRICTED_WRITE, /* client is restricted to a certain backend with * a timeout attached */ LLOAD_OP_RESTRICTED_BACKEND, /* client is restricted to a certain backend, * without a timeout */ LLOAD_OP_RESTRICTED_UPSTREAM, /* client is restricted to a certain upstream */ LLOAD_OP_RESTRICTED_ISOLATE, /* TODO: client is restricted to a certain * upstream and removes the upstream from the * pool */ LLOAD_OP_RESTRICTED_REJECT, /* operation should not be forwarded to any * backend, either it is processed internally * or rejected */ }; /* * represents a connection from an ldap client/to ldap server */ struct LloadConnection { enum sc_state c_state; /* connection state */ enum sc_type c_type; enum sc_io_state c_io_state; ber_socket_t c_fd; /* * LloadConnection reference counting: * - connection has a reference counter in c_refcnt * - also a liveness/validity token is added to c_refcnt during * lload_connection_init, its existence is tracked in c_live and is usually the * only one that prevents it from being destroyed * - anyone who needs to be able to relock the connection after unlocking it has * to use acquire_ref(), they need to make sure a matching * RELEASE_REF( c, c_refcnt, c->c_destroy ); is run eventually * - when a connection is considered dead, use CONNECTION_DESTROY on a locked * connection, it will be made unreachable from normal places and either * scheduled for reclamation when safe to do so or if anyone still holds a * reference, it just gets unlocked and reclaimed after the last ref is * released * - CONNECTION_LOCK_DESTROY is a shorthand for locking and CONNECTION_DESTROY */ ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */ uintptr_t c_refcnt, c_live; CONNECTION_DESTROY_CB c_unlink; CONNECTION_DESTROY_CB c_destroy; CONNECTION_PDU_CB c_pdu_cb; #define CONNECTION_ASSERT_LOCKED(c) assert_locked( &(c)->c_mutex ) #define CONNECTION_LOCK(c) \ do { \ checked_lock( &(c)->c_mutex ); \ } while (0) #define CONNECTION_UNLOCK(c) \ do { \ checked_unlock( &(c)->c_mutex ); \ } while (0) #define CONNECTION_UNLINK_(c) \ do { \ if ( __atomic_exchange_n( &(c)->c_live, 0, __ATOMIC_ACQ_REL ) ) { \ (c)->c_unlink( (c) ); \ RELEASE_REF( (c), c_refcnt, c->c_destroy ); \ } \ } while (0) #define CONNECTION_DESTROY(c) \ do { \ CONNECTION_UNLINK_(c); \ CONNECTION_UNLOCK(c); \ } while (0) #define CONNECTION_LOCK_DESTROY(c) \ do { \ CONNECTION_LOCK(c); \ CONNECTION_DESTROY(c); \ } while (0); Sockbuf *c_sb; /* ber connection stuff */ /* set by connection_init */ unsigned long c_connid; /* unique id of this connection */ struct berval c_peer_name; /* peer name (trans=addr:port) */ time_t c_starttime; /* when the connection was opened */ time_t c_activitytime; /* when the connection was last used */ ber_int_t c_next_msgid; /* msgid of the next message */ /* must not be used while holding either mutex */ struct event *c_read_event, *c_write_event; struct timeval *c_read_timeout; /* can only be changed by binding thread */ struct berval c_sasl_bind_mech; /* mech in progress */ struct berval c_auth; /* authcDN (possibly in progress) */ unsigned long c_pin_id; #ifdef HAVE_CYRUS_SASL sasl_conn_t *c_sasl_authctx; void *c_sasl_defaults; #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */ sasl_channel_binding_t *c_sasl_cbinding; /* Else cyrus-sasl would happily * leak it on sasl_dispose */ #endif /* SASL_CHANNEL_BINDING */ #endif /* HAVE_CYRUS_SASL */ #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS struct berval c_vc_cookie; #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ /* Can be held while acquiring c_mutex to inject things into c_ops or * destroy the connection */ ldap_pvt_thread_mutex_t c_io_mutex; /* only one pdu written at a time */ BerElement *c_currentber; /* ber we're attempting to read */ BerElement *c_pendingber; /* ber we're attempting to write */ TAvlnode *c_ops; /* Operations pending on the connection */ #ifdef HAVE_TLS enum lload_tls_type c_is_tls; /* true if this LDAP over raw TLS */ #endif long c_n_ops_executing; /* num of ops currently executing */ long c_n_ops_completed; /* num of ops completed */ lload_counters_t c_counters; /* per connection operation counters */ enum op_restriction c_restricted; uintptr_t c_restricted_inflight; time_t c_restricted_at; LloadBackend *c_backend; LloadConnection *c_linked_upstream; TAvlnode *c_linked; #ifdef BALANCER_MODULE struct berval c_monitor_dn; #endif /* BALANCER_MODULE */ /* * Protected by the CIRCLEQ mutex: * - Client: clients_mutex * - Upstream: b->b_mutex */ LDAP_CIRCLEQ_ENTRY(LloadConnection) c_next; }; enum op_state { LLOAD_OP_NOT_FREEING = 0, LLOAD_OP_DETACHING_CLIENT = 1 << 1, LLOAD_OP_DETACHING_UPSTREAM = 1 << 0, }; #define LLOAD_OP_DETACHING_MASK \ ( LLOAD_OP_DETACHING_UPSTREAM | LLOAD_OP_DETACHING_CLIENT ) /* operation result for monitoring purposes */ enum op_result { LLOAD_OP_REJECTED, /* operation was not forwarded */ LLOAD_OP_COMPLETED, /* operation sent and response received */ LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */ }; /* * Operation reference tracking: * - o_refcnt is set to 1, never incremented * - OPERATION_UNLINK sets it to 0 and on transition from 1 clears both * connection links (o_client, o_upstream) */ struct LloadOperation { uintptr_t o_refcnt; #define OPERATION_UNLINK(op) \ try_release_ref( &(op)->o_refcnt, (op), \ (dispose_cb *)operation_unlink, \ (dispose_cb *)operation_destroy ) LloadConnection *o_client; unsigned long o_client_connid; ber_int_t o_client_msgid; ber_int_t o_saved_msgid; enum op_restriction o_restricted; LloadConnection *o_upstream; unsigned long o_upstream_connid; ber_int_t o_upstream_msgid; struct timeval o_last_response; /* Protects o_client, o_upstream links */ ldap_pvt_thread_mutex_t o_link_mutex; ber_tag_t o_tag; struct timeval o_start; unsigned long o_pin_id; enum op_result o_res; BerElement *o_ber; BerValue o_request, o_ctrls; }; struct restriction_entry { struct berval oid; enum op_restriction action; }; /* * listener; need to access it from monitor backend */ struct LloadListener { struct berval sl_url; struct berval sl_name; mode_t sl_perms; #ifdef HAVE_TLS int sl_is_tls; #endif int sl_is_proxied; struct event_base *base; struct evconnlistener *listener; int sl_mute; /* Listener is temporarily disabled due to emfile */ int sl_busy; /* Listener is busy (accept thread activated) */ ber_socket_t sl_sd; Sockaddr sl_sa; #define sl_addr sl_sa.sa_in_addr #define LDAP_TCP_BUFFER #ifdef LDAP_TCP_BUFFER int sl_tcp_rmem; /* custom TCP read buffer size */ int sl_tcp_wmem; /* custom TCP write buffer size */ #endif }; typedef int (*CONNCB)( LloadConnection *c, void *arg ); /* config requires a bi_private with configuration data - dummy for now */ struct lload_conf_info { int dummy; }; LDAP_END_DECL #include "proto-lload.h" #endif /* _LLOAD_H_ */