summaryrefslogtreecommitdiffstats
path: root/src/hooks/dhcp/high_availability/ha_service.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/hooks/dhcp/high_availability/ha_service.cc')
-rw-r--r--src/hooks/dhcp/high_availability/ha_service.cc3146
1 files changed, 3146 insertions, 0 deletions
diff --git a/src/hooks/dhcp/high_availability/ha_service.cc b/src/hooks/dhcp/high_availability/ha_service.cc
new file mode 100644
index 0000000..f844b33
--- /dev/null
+++ b/src/hooks/dhcp/high_availability/ha_service.cc
@@ -0,0 +1,3146 @@
+// Copyright (C) 2018-2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <command_creator.h>
+#include <ha_log.h>
+#include <ha_service.h>
+#include <ha_service_states.h>
+#include <cc/command_interpreter.h>
+#include <cc/data.h>
+#include <config/cmd_response_creator.h>
+#include <config/timeouts.h>
+#include <dhcp/iface_mgr.h>
+#include <dhcpsrv/cfgmgr.h>
+#include <dhcpsrv/lease_mgr.h>
+#include <dhcpsrv/lease_mgr_factory.h>
+#include <exceptions/exceptions.h>
+#include <http/date_time.h>
+#include <http/response_json.h>
+#include <http/post_request_json.h>
+#include <util/multi_threading_mgr.h>
+#include <util/stopwatch.h>
+#include <boost/pointer_cast.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/weak_ptr.hpp>
+#include <functional>
+#include <sstream>
+
+using namespace isc::asiolink;
+using namespace isc::config;
+using namespace isc::data;
+using namespace isc::dhcp;
+using namespace isc::hooks;
+using namespace isc::http;
+using namespace isc::log;
+using namespace isc::util;
+namespace ph = std::placeholders;
+
+namespace {
+
+/// @brief Exception thrown when command sent to the partner is unsupported.
+class CommandUnsupportedError : public CtrlChannelError {
+public:
+ CommandUnsupportedError(const char* file, size_t line, const char* what) :
+ CtrlChannelError(file, line, what) {}
+};
+
+}
+
+namespace isc {
+namespace ha {
+
+const int HAService::HA_HEARTBEAT_COMPLETE_EVT;
+const int HAService::HA_LEASE_UPDATES_COMPLETE_EVT;
+const int HAService::HA_SYNCING_FAILED_EVT;
+const int HAService::HA_SYNCING_SUCCEEDED_EVT;
+const int HAService::HA_MAINTENANCE_NOTIFY_EVT;
+const int HAService::HA_MAINTENANCE_START_EVT;
+const int HAService::HA_MAINTENANCE_CANCEL_EVT;
+const int HAService::HA_CONTROL_RESULT_MAINTENANCE_NOT_ALLOWED;
+const int HAService::HA_SYNCED_PARTNER_UNAVAILABLE_EVT;
+
+HAService::HAService(const IOServicePtr& io_service, const NetworkStatePtr& network_state,
+ const HAConfigPtr& config, const HAServerType& server_type)
+ : io_service_(io_service), network_state_(network_state), config_(config),
+ server_type_(server_type), client_(), listener_(), communication_state_(),
+ query_filter_(config), mutex_(), pending_requests_(),
+ lease_update_backlog_(config->getDelayedUpdatesLimit()),
+ sync_complete_notified_(false) {
+
+ if (server_type == HAServerType::DHCPv4) {
+ communication_state_.reset(new CommunicationState4(io_service_, config));
+
+ } else {
+ communication_state_.reset(new CommunicationState6(io_service_, config));
+ }
+
+ network_state_->reset(NetworkState::Origin::HA_COMMAND);
+
+ startModel(HA_WAITING_ST);
+
+ // Create the client and(or) listener as appropriate.
+ if (!config_->getEnableMultiThreading()) {
+ // Not configured for multi-threading, start a client in ST mode.
+ client_.reset(new HttpClient(*io_service_, 0));
+ } else {
+ // Create an MT-mode client.
+ client_.reset(new HttpClient(*io_service_,
+ config_->getHttpClientThreads(), true));
+
+ // If we're configured to use our own listener create and start it.
+ if (config_->getHttpDedicatedListener()) {
+ // Get the server address and port from this server's URL.
+ auto my_url = config_->getThisServerConfig()->getUrl();
+ IOAddress server_address(IOAddress::IPV4_ZERO_ADDRESS());
+ try {
+ // Since we do not currently support hostname resolution,
+ // we need to make sure we have an IP address here.
+ server_address = IOAddress(my_url.getStrippedHostname());
+ } catch (const std::exception& ex) {
+ isc_throw(Unexpected, "server Url:" << my_url.getStrippedHostname()
+ << " is not a valid IP address");
+ }
+
+ // Fetch how many threads the listener will use.
+ uint32_t listener_threads = config_->getHttpListenerThreads();
+
+ // Fetch the TLS context.
+ auto tls_context = config_->getThisServerConfig()->getTlsContext();
+
+ // Instantiate the listener.
+ listener_.reset(new CmdHttpListener(server_address, my_url.getPort(),
+ listener_threads, tls_context));
+ // Set the command filter when enabled.
+ if (config_->getRestrictCommands()) {
+ if (server_type == HAServerType::DHCPv4) {
+ CmdResponseCreator::command_accept_list_ =
+ CommandCreator::ha_commands4_;
+ } else {
+ CmdResponseCreator::command_accept_list_ =
+ CommandCreator::ha_commands6_;
+ }
+ }
+ }
+ }
+
+ LOG_INFO(ha_logger, HA_SERVICE_STARTED)
+ .arg(HAConfig::HAModeToString(config->getHAMode()))
+ .arg(HAConfig::PeerConfig::roleToString(config->getThisServerConfig()->getRole()));
+}
+
+HAService::~HAService() {
+ // Stop client and/or listener.
+ stopClientAndListener();
+
+ network_state_->reset(NetworkState::Origin::HA_COMMAND);
+}
+
+void
+HAService::defineEvents() {
+ StateModel::defineEvents();
+
+ defineEvent(HA_HEARTBEAT_COMPLETE_EVT, "HA_HEARTBEAT_COMPLETE_EVT");
+ defineEvent(HA_LEASE_UPDATES_COMPLETE_EVT, "HA_LEASE_UPDATES_COMPLETE_EVT");
+ defineEvent(HA_SYNCING_FAILED_EVT, "HA_SYNCING_FAILED_EVT");
+ defineEvent(HA_SYNCING_SUCCEEDED_EVT, "HA_SYNCING_SUCCEEDED_EVT");
+ defineEvent(HA_MAINTENANCE_NOTIFY_EVT, "HA_MAINTENANCE_NOTIFY_EVT");
+ defineEvent(HA_MAINTENANCE_START_EVT, "HA_MAINTENANCE_START_EVT");
+ defineEvent(HA_MAINTENANCE_CANCEL_EVT, "HA_MAINTENANCE_CANCEL_EVT");
+ defineEvent(HA_SYNCED_PARTNER_UNAVAILABLE_EVT, "HA_SYNCED_PARTNER_UNAVAILABLE_EVT");
+}
+
+void
+HAService::verifyEvents() {
+ StateModel::verifyEvents();
+
+ getEvent(HA_HEARTBEAT_COMPLETE_EVT);
+ getEvent(HA_LEASE_UPDATES_COMPLETE_EVT);
+ getEvent(HA_SYNCING_FAILED_EVT);
+ getEvent(HA_SYNCING_SUCCEEDED_EVT);
+ getEvent(HA_MAINTENANCE_NOTIFY_EVT);
+ getEvent(HA_MAINTENANCE_START_EVT);
+ getEvent(HA_MAINTENANCE_CANCEL_EVT);
+ getEvent(HA_SYNCED_PARTNER_UNAVAILABLE_EVT);
+}
+
+void
+HAService::defineStates() {
+ StateModel::defineStates();
+
+ defineState(HA_BACKUP_ST, stateToString(HA_BACKUP_ST),
+ std::bind(&HAService::backupStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_BACKUP_ST)->getPausing());
+
+ defineState(HA_COMMUNICATION_RECOVERY_ST, stateToString(HA_COMMUNICATION_RECOVERY_ST),
+ std::bind(&HAService::communicationRecoveryHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_COMMUNICATION_RECOVERY_ST)->getPausing());
+
+ defineState(HA_HOT_STANDBY_ST, stateToString(HA_HOT_STANDBY_ST),
+ std::bind(&HAService::normalStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_HOT_STANDBY_ST)->getPausing());
+
+ defineState(HA_LOAD_BALANCING_ST, stateToString(HA_LOAD_BALANCING_ST),
+ std::bind(&HAService::normalStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_LOAD_BALANCING_ST)->getPausing());
+
+ defineState(HA_IN_MAINTENANCE_ST, stateToString(HA_IN_MAINTENANCE_ST),
+ std::bind(&HAService::inMaintenanceStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_IN_MAINTENANCE_ST)->getPausing());
+
+ defineState(HA_PARTNER_DOWN_ST, stateToString(HA_PARTNER_DOWN_ST),
+ std::bind(&HAService::partnerDownStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_PARTNER_DOWN_ST)->getPausing());
+
+ defineState(HA_PARTNER_IN_MAINTENANCE_ST, stateToString(HA_PARTNER_IN_MAINTENANCE_ST),
+ std::bind(&HAService::partnerInMaintenanceStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_PARTNER_IN_MAINTENANCE_ST)->getPausing());
+
+ defineState(HA_PASSIVE_BACKUP_ST, stateToString(HA_PASSIVE_BACKUP_ST),
+ std::bind(&HAService::passiveBackupStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_PASSIVE_BACKUP_ST)->getPausing());
+
+ defineState(HA_READY_ST, stateToString(HA_READY_ST),
+ std::bind(&HAService::readyStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_READY_ST)->getPausing());
+
+ defineState(HA_SYNCING_ST, stateToString(HA_SYNCING_ST),
+ std::bind(&HAService::syncingStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_SYNCING_ST)->getPausing());
+
+ defineState(HA_TERMINATED_ST, stateToString(HA_TERMINATED_ST),
+ std::bind(&HAService::terminatedStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_TERMINATED_ST)->getPausing());
+
+ defineState(HA_WAITING_ST, stateToString(HA_WAITING_ST),
+ std::bind(&HAService::waitingStateHandler, this),
+ config_->getStateMachineConfig()->getStateConfig(HA_WAITING_ST)->getPausing());
+}
+
+void
+HAService::backupStateHandler() {
+ if (doOnEntry()) {
+ query_filter_.serveNoScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ // There is nothing to do in that state. This server simply receives
+ // lease updates from the partners.
+ postNextEvent(NOP_EVT);
+}
+
+void
+HAService::communicationRecoveryHandler() {
+ if (doOnEntry()) {
+ query_filter_.serveDefaultScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ scheduleHeartbeat();
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ } else if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+
+ } else if (isPartnerStateInvalid()) {
+ verboseTransition(HA_WAITING_ST);
+
+ } else {
+
+ // Transitions based on the partner's state.
+ switch (communication_state_->getPartnerState()) {
+ case HA_IN_MAINTENANCE_ST:
+ verboseTransition(HA_PARTNER_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_PARTNER_DOWN_ST:
+ verboseTransition(HA_WAITING_ST);
+ break;
+
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ verboseTransition(HA_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_TERMINATED_ST:
+ verboseTransition(HA_TERMINATED_ST);
+ break;
+
+ case HA_UNAVAILABLE_ST:
+ if (shouldPartnerDown()) {
+ verboseTransition(HA_PARTNER_DOWN_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ case HA_WAITING_ST:
+ case HA_SYNCING_ST:
+ case HA_READY_ST:
+ // The partner seems to be waking up, perhaps after communication-recovery.
+ // If our backlog queue is overflown we need to synchronize our lease database.
+ // There is no need to send ha-reset to the partner because the partner is
+ // already synchronizing its lease database.
+ if (!communication_state_->isCommunicationInterrupted() &&
+ lease_update_backlog_.wasOverflown()) {
+ verboseTransition(HA_WAITING_ST);
+ } else {
+ // Backlog was not overflown, so there is no need to synchronize our
+ // lease database. Let's wait until our partner completes synchronization
+ // and transitions to the load-balancing state.
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ default:
+ // If the communication is still interrupted, let's continue sitting
+ // in this state until it is resumed or until the transition to the
+ // partner-down state, depending on what happens first.
+ if (communication_state_->isCommunicationInterrupted()) {
+ postNextEvent(NOP_EVT);
+ break;
+ }
+
+ // The communication has been resumed. The partner server must be in a state
+ // in which it can receive outstanding lease updates we collected. The number of
+ // outstanding lease updates must not exceed the configured limit. Finally, the
+ // lease updates must be successfully sent. If that all works, we will transition
+ // to the normal operation.
+ if ((communication_state_->getPartnerState() == getNormalState()) ||
+ (communication_state_->getPartnerState() == HA_COMMUNICATION_RECOVERY_ST)) {
+ if (lease_update_backlog_.wasOverflown() || !sendLeaseUpdatesFromBacklog()) {
+ // If our lease backlog was overflown or we were unable to send lease
+ // updates to the partner we should notify the partner that it should
+ // synchronize the lease database. We do it by sending ha-reset command.
+ if (sendHAReset()) {
+ verboseTransition(HA_WAITING_ST);
+ }
+ break;
+ }
+ // The backlog was not overflown and we successfully sent our lease updates.
+ // We can now transition to the normal operation state. If the partner
+ // fails to send his outstanding lease updates to us it should send the
+ // ha-reset command to us.
+ verboseTransition(getNormalState());
+ break;
+ }
+
+ // The partner appears to be in unexpected state, we have exceeded the number
+ // of lease updates in a backlog or an attempt to send lease updates failed.
+ // In all these cases we follow plan B and transition to the waiting state.
+ // The server will then attempt to synchronize the entire lease database.
+ verboseTransition(HA_WAITING_ST);
+ }
+ }
+
+ // When exiting this state we must ensure that lease updates backlog is cleared.
+ if (doOnExit()) {
+ lease_update_backlog_.clear();
+ }
+}
+
+void
+HAService::normalStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveDefaultScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ scheduleHeartbeat();
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ // Check if the partner state is valid per current configuration. If it is
+ // in an invalid state let's transition to the waiting state and stay there
+ // until the configuration is corrected.
+ if (isPartnerStateInvalid()) {
+ verboseTransition(HA_WAITING_ST);
+ return;
+ }
+
+ switch (communication_state_->getPartnerState()) {
+ case HA_IN_MAINTENANCE_ST:
+ verboseTransition(HA_PARTNER_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_PARTNER_DOWN_ST:
+ verboseTransition(HA_WAITING_ST);
+ break;
+
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ verboseTransition(HA_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_TERMINATED_ST:
+ verboseTransition(HA_TERMINATED_ST);
+ break;
+
+ case HA_UNAVAILABLE_ST:
+ if (shouldPartnerDown()) {
+ verboseTransition(HA_PARTNER_DOWN_ST);
+
+ } else if (config_->amAllowingCommRecovery()) {
+ verboseTransition(HA_COMMUNICATION_RECOVERY_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ default:
+ postNextEvent(NOP_EVT);
+ }
+
+ if (doOnExit()) {
+ // Do nothing here but doOnExit() call clears the "on exit" flag
+ // when transitioning to the communication-recovery state. In that
+ // state we need this flag to be cleared.
+ }
+}
+
+void
+HAService::inMaintenanceStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ // In this state the server remains silent and waits for being
+ // shutdown.
+ query_filter_.serveNoScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+
+ LOG_INFO(ha_logger, HA_MAINTENANCE_SHUTDOWN_SAFE);
+ }
+
+ scheduleHeartbeat();
+
+ // We don't transition out of this state unless explicitly mandated
+ // by the administrator via a dedicated command which cancels
+ // the maintenance.
+ postNextEvent(NOP_EVT);
+}
+
+void
+HAService::partnerDownStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+
+ bool maintenance = (getLastEvent() == HA_MAINTENANCE_START_EVT);
+
+ // It may be administratively disabled to handle partner's scope
+ // in case of failure. If this is the case we'll just handle our
+ // default scope (or no scope at all). The user will need to
+ // manually enable this server to handle partner's scope.
+ // If we're in the maintenance mode we serve all scopes because
+ // it is not a failover situation.
+ if (maintenance || config_->getThisServerConfig()->isAutoFailover()) {
+ query_filter_.serveFailoverScopes();
+ } else {
+ query_filter_.serveDefaultScopes();
+ }
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+
+ if (maintenance) {
+ // If we ended up in the partner-down state as a result of
+ // receiving the ha-maintenance-start command let's log it.
+ LOG_INFO(ha_logger, HA_MAINTENANCE_STARTED_IN_PARTNER_DOWN);
+ }
+
+ } else if (getLastEvent() == HA_SYNCED_PARTNER_UNAVAILABLE_EVT) {
+ // Partner sent the ha-sync-complete-notify command to indicate that
+ // it has successfully synchronized its lease database but this server
+ // was unable to send heartbeat to this server. Enable the DHCP service
+ // and continue serving the clients in the partner-down state until the
+ // communication with the partner is fixed.
+ adjustNetworkState();
+ }
+
+ scheduleHeartbeat();
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ // Check if the partner state is valid per current configuration. If it is
+ // in an invalid state let's transition to the waiting state and stay there
+ // until the configuration is corrected.
+ if (isPartnerStateInvalid()) {
+ verboseTransition(HA_WAITING_ST);
+ return;
+ }
+
+ switch (communication_state_->getPartnerState()) {
+ case HA_COMMUNICATION_RECOVERY_ST:
+ case HA_PARTNER_DOWN_ST:
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ verboseTransition(HA_WAITING_ST);
+ break;
+
+ case HA_READY_ST:
+ // If partner allocated new leases for which it didn't send lease updates
+ // to us we should synchronize our database.
+ if (communication_state_->hasPartnerNewUnsentUpdates()) {
+ verboseTransition(HA_WAITING_ST);
+ } else {
+ // We did not miss any lease updates. There is no need to synchronize
+ // the database.
+ verboseTransition(getNormalState());
+ }
+ break;
+
+ case HA_TERMINATED_ST:
+ verboseTransition(HA_TERMINATED_ST);
+ break;
+
+ default:
+ postNextEvent(NOP_EVT);
+ }
+}
+
+void
+HAService::partnerInMaintenanceStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveFailoverScopes();
+
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+
+ LOG_INFO(ha_logger, HA_MAINTENANCE_STARTED);
+ }
+
+ scheduleHeartbeat();
+
+ if (isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ switch (communication_state_->getPartnerState()) {
+ case HA_UNAVAILABLE_ST:
+ verboseTransition(HA_PARTNER_DOWN_ST);
+ break;
+ default:
+ postNextEvent(NOP_EVT);
+ }
+}
+
+void
+HAService::passiveBackupStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveDefaultScopes();
+ adjustNetworkState();
+
+ // In the passive-backup state we don't send heartbeat.
+ communication_state_->stopHeartbeat();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+ postNextEvent(NOP_EVT);
+}
+
+void
+HAService::readyStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveNoScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ scheduleHeartbeat();
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ // Check if the partner state is valid per current configuration. If it is
+ // in an invalid state let's transition to the waiting state and stay there
+ // until the configuration is corrected.
+ if (isPartnerStateInvalid()) {
+ verboseTransition(HA_WAITING_ST);
+ return;
+ }
+
+ switch (communication_state_->getPartnerState()) {
+ case HA_HOT_STANDBY_ST:
+ case HA_LOAD_BALANCING_ST:
+ case HA_COMMUNICATION_RECOVERY_ST:
+ verboseTransition(getNormalState());
+ break;
+
+ case HA_IN_MAINTENANCE_ST:
+ verboseTransition(HA_PARTNER_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ verboseTransition(HA_IN_MAINTENANCE_ST);
+ break;
+
+ case HA_READY_ST:
+ // If both servers are ready, the primary server "wins" and is
+ // transitioned first.
+ if (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::PRIMARY) {
+ verboseTransition((config_->getHAMode() == HAConfig::LOAD_BALANCING ?
+ HA_LOAD_BALANCING_ST : HA_HOT_STANDBY_ST));
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ case HA_TERMINATED_ST:
+ verboseTransition(HA_TERMINATED_ST);
+ break;
+
+ case HA_UNAVAILABLE_ST:
+ if (shouldPartnerDown()) {
+ verboseTransition(HA_PARTNER_DOWN_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ default:
+ postNextEvent(NOP_EVT);
+ }
+}
+
+void
+HAService::syncingStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveNoScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ // Check if the partner state is valid per current configuration. If it is
+ // in an invalid state let's transition to the waiting state and stay there
+ // until the configuration is corrected.
+ if (isPartnerStateInvalid()) {
+ verboseTransition(HA_WAITING_ST);
+ return;
+ }
+
+ // We don't want to perform synchronous attempt to synchronize with
+ // a partner until we know that the partner is responding. Therefore,
+ // we wait for the heartbeat to complete successfully before we
+ // initiate the synchronization.
+ switch (communication_state_->getPartnerState()) {
+ case HA_TERMINATED_ST:
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+
+ case HA_UNAVAILABLE_ST:
+ // If the partner appears to be offline, let's transition to the partner
+ // down state. Otherwise, we'd be stuck trying to synchronize with a
+ // dead partner.
+ if (shouldPartnerDown()) {
+ verboseTransition(HA_PARTNER_DOWN_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ default:
+ // We don't want the heartbeat to interfere with the synchronization,
+ // so let's temporarily stop it.
+ communication_state_->stopHeartbeat();
+
+ // Timeout is configured in milliseconds. Need to convert to seconds.
+ unsigned int dhcp_disable_timeout =
+ static_cast<unsigned int>(config_->getSyncTimeout() / 1000);
+ if (dhcp_disable_timeout == 0) {
+ ++dhcp_disable_timeout;
+ }
+
+ // Perform synchronous leases update.
+ std::string status_message;
+ int sync_status = synchronize(status_message,
+ config_->getFailoverPeerConfig()->getName(),
+ dhcp_disable_timeout);
+
+ // If the leases synchronization was successful, let's transition
+ // to the ready state.
+ if (sync_status == CONTROL_RESULT_SUCCESS) {
+ verboseTransition(HA_READY_ST);
+
+ } else {
+ // If the synchronization was unsuccessful we're back to the
+ // situation that the partner is unavailable and therefore
+ // we stay in the syncing state.
+ postNextEvent(NOP_EVT);
+ }
+ }
+
+ // Make sure that the heartbeat is re-enabled.
+ scheduleHeartbeat();
+}
+
+void
+HAService::terminatedStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveDefaultScopes();
+ adjustNetworkState();
+
+ // In the terminated state we don't send heartbeat.
+ communication_state_->stopHeartbeat();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+
+ LOG_ERROR(ha_logger, HA_TERMINATED);
+ }
+
+ postNextEvent(NOP_EVT);
+}
+
+void
+HAService::waitingStateHandler() {
+ // If we are transitioning from another state, we have to define new
+ // serving scopes appropriate for the new state. We don't do it if
+ // we remain in this state.
+ if (doOnEntry()) {
+ query_filter_.serveNoScopes();
+ adjustNetworkState();
+
+ // Log if the state machine is paused.
+ conditionalLogPausedState();
+ }
+
+ // Only schedule the heartbeat for non-backup servers.
+ if ((config_->getHAMode() != HAConfig::PASSIVE_BACKUP) &&
+ (config_->getThisServerConfig()->getRole() != HAConfig::PeerConfig::BACKUP)) {
+ scheduleHeartbeat();
+ }
+
+ if (isMaintenanceCanceled() || isModelPaused()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ // Backup server must remain in its own state.
+ if (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::BACKUP) {
+ verboseTransition(HA_BACKUP_ST);
+ return;
+ }
+
+ // We're not a backup server, so we're either primary or secondary. If this is
+ // a passive-backup mode of operation, we're primary and we should transition
+ // to the passive-backup state.
+ if (config_->getHAMode() == HAConfig::PASSIVE_BACKUP) {
+ verboseTransition(HA_PASSIVE_BACKUP_ST);
+ return;
+ }
+
+ // Check if the clock skew is still acceptable. If not, transition to
+ // the terminated state.
+ if (shouldTerminate()) {
+ verboseTransition(HA_TERMINATED_ST);
+ return;
+ }
+
+ // Check if the partner state is valid per current configuration. If it is
+ // in an invalid state let's sit in the waiting state until the configuration
+ // is corrected.
+ if (isPartnerStateInvalid()) {
+ postNextEvent(NOP_EVT);
+ return;
+ }
+
+ switch (communication_state_->getPartnerState()) {
+ case HA_COMMUNICATION_RECOVERY_ST:
+ case HA_HOT_STANDBY_ST:
+ case HA_LOAD_BALANCING_ST:
+ case HA_IN_MAINTENANCE_ST:
+ case HA_PARTNER_DOWN_ST:
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ case HA_READY_ST:
+ // If we're configured to not synchronize lease database, proceed directly
+ // to the "ready" state.
+ verboseTransition(config_->amSyncingLeases() ? HA_SYNCING_ST : HA_READY_ST);
+ break;
+
+ case HA_SYNCING_ST:
+ postNextEvent(NOP_EVT);
+ break;
+
+ case HA_TERMINATED_ST:
+ // We have checked above whether the clock skew is exceeding the threshold
+ // and we should terminate. If we're here, it means that the clock skew
+ // is acceptable. The partner may be still in the terminated state because
+ // it hasn't been restarted yet. Probably, this server is the first one
+ // being restarted after syncing the clocks. Let's just sit in the waiting
+ // state until the partner gets restarted.
+ LOG_INFO(ha_logger, HA_TERMINATED_RESTART_PARTNER);
+ postNextEvent(NOP_EVT);
+ break;
+
+ case HA_WAITING_ST:
+ // If both servers are waiting, the primary server 'wins' and is
+ // transitioned to the next state first.
+ if (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::PRIMARY) {
+ // If we're configured to not synchronize lease database, proceed directly
+ // to the "ready" state.
+ verboseTransition(config_->amSyncingLeases() ? HA_SYNCING_ST : HA_READY_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ case HA_UNAVAILABLE_ST:
+ if (shouldPartnerDown()) {
+ verboseTransition(HA_PARTNER_DOWN_ST);
+
+ } else {
+ postNextEvent(NOP_EVT);
+ }
+ break;
+
+ default:
+ postNextEvent(NOP_EVT);
+ }
+}
+
+void
+HAService::verboseTransition(const unsigned state) {
+ // Get current and new state name.
+ std::string current_state_name = getStateLabel(getCurrState());
+ std::string new_state_name = getStateLabel(state);
+
+ // Turn them to upper case so as they are better visible in the logs.
+ boost::to_upper(current_state_name);
+ boost::to_upper(new_state_name);
+
+ if (config_->getHAMode() != HAConfig::PASSIVE_BACKUP) {
+ // If this is load-balancing or hot-standby mode we also want to log
+ // partner's state.
+ auto partner_state = communication_state_->getPartnerState();
+ std::string partner_state_name = getStateLabel(partner_state);
+ boost::to_upper(partner_state_name);
+
+ // Log the transition.
+ LOG_INFO(ha_logger, HA_STATE_TRANSITION)
+ .arg(current_state_name)
+ .arg(new_state_name)
+ .arg(partner_state_name);
+
+ } else {
+ // In the passive-backup mode we don't know the partner's state.
+ LOG_INFO(ha_logger, HA_STATE_TRANSITION_PASSIVE_BACKUP)
+ .arg(current_state_name)
+ .arg(new_state_name);
+ }
+
+ // If we're transitioning directly from the "waiting" to "ready"
+ // state it indicates that the database synchronization is
+ // administratively disabled. Let's remind the user about this
+ // configuration setting.
+ if ((state == HA_READY_ST) && (getCurrState() == HA_WAITING_ST)) {
+ LOG_INFO(ha_logger, HA_CONFIG_LEASE_SYNCING_DISABLED_REMINDER);
+ }
+
+ // Do the actual transition.
+ transition(state, getNextEvent());
+
+ // Inform the administrator whether or not lease updates are generated.
+ // Updates are never generated by a backup server so it doesn't make
+ // sense to log anything for the backup server.
+ if ((config_->getHAMode() != HAConfig::PASSIVE_BACKUP) &&
+ (config_->getThisServerConfig()->getRole() != HAConfig::PeerConfig::BACKUP)) {
+ if (shouldSendLeaseUpdates(config_->getFailoverPeerConfig())) {
+ LOG_INFO(ha_logger, HA_LEASE_UPDATES_ENABLED)
+ .arg(new_state_name);
+
+ } else if (!config_->amSendingLeaseUpdates()) {
+ // Lease updates are administratively disabled.
+ LOG_INFO(ha_logger, HA_CONFIG_LEASE_UPDATES_DISABLED_REMINDER)
+ .arg(new_state_name);
+
+ } else {
+ // Lease updates are not administratively disabled, but they
+ // are not issued because this is the backup server or because
+ // in this state the server should not generate lease updates.
+ LOG_INFO(ha_logger, HA_LEASE_UPDATES_DISABLED)
+ .arg(new_state_name);
+ }
+ }
+}
+
+int
+HAService::getNormalState() const {
+ if (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::BACKUP) {
+ return (HA_BACKUP_ST);
+ }
+
+ switch (config_->getHAMode()) {
+ case HAConfig::LOAD_BALANCING:
+ return (HA_LOAD_BALANCING_ST);
+ case HAConfig::HOT_STANDBY:
+ return (HA_HOT_STANDBY_ST);
+ default:
+ return (HA_PASSIVE_BACKUP_ST);
+ }
+}
+
+bool
+HAService::unpause() {
+ if (isModelPaused()) {
+ LOG_INFO(ha_logger, HA_STATE_MACHINE_CONTINUED);
+ unpauseModel();
+ return (true);
+ }
+ return (false);
+}
+
+void
+HAService::conditionalLogPausedState() const {
+ // Inform the administrator if the state machine is paused.
+ if (isModelPaused()) {
+ std::string state_name = stateToString(getCurrState());
+ boost::to_upper(state_name);
+ LOG_INFO(ha_logger, HA_STATE_MACHINE_PAUSED)
+ .arg(state_name);
+ }
+}
+
+void
+HAService::serveDefaultScopes() {
+ query_filter_.serveDefaultScopes();
+}
+
+bool
+HAService::inScope(dhcp::Pkt4Ptr& query4) {
+ return (inScopeInternal(query4));
+}
+
+bool
+HAService::inScope(dhcp::Pkt6Ptr& query6) {
+ return (inScopeInternal(query6));
+}
+
+template<typename QueryPtrType>
+bool
+HAService::inScopeInternal(QueryPtrType& query) {
+ // Check if the query is in scope (should be processed by this server).
+ std::string scope_class;
+ const bool in_scope = query_filter_.inScope(query, scope_class);
+ // Whether or not the query is going to be processed by this server,
+ // we associate the query with the appropriate class.
+ query->addClass(dhcp::ClientClass(scope_class));
+ // The following is the part of the server failure detection algorithm.
+ // If the query should be processed by the partner we need to check if
+ // the partner responds. If the number of unanswered queries exceeds a
+ // configured threshold, we will consider the partner to be offline.
+ if (!in_scope && communication_state_->isCommunicationInterrupted()) {
+ communication_state_->analyzeMessage(query);
+ }
+ // Indicate if the query is in scope.
+ return (in_scope);
+}
+
+void
+HAService::adjustNetworkState() {
+ std::string current_state_name = getStateLabel(getCurrState());
+ boost::to_upper(current_state_name);
+
+ // DHCP service should be enabled in the following states.
+ const bool should_enable = ((getCurrState() == HA_COMMUNICATION_RECOVERY_ST) ||
+ (getCurrState() == HA_LOAD_BALANCING_ST) ||
+ (getCurrState() == HA_HOT_STANDBY_ST) ||
+ (getCurrState() == HA_PARTNER_DOWN_ST) ||
+ (getCurrState() == HA_PARTNER_IN_MAINTENANCE_ST) ||
+ (getCurrState() == HA_PASSIVE_BACKUP_ST) ||
+ (getCurrState() == HA_TERMINATED_ST));
+
+ if (!should_enable && network_state_->isServiceEnabled()) {
+ std::string current_state_name = getStateLabel(getCurrState());
+ boost::to_upper(current_state_name);
+ LOG_INFO(ha_logger, HA_LOCAL_DHCP_DISABLE)
+ .arg(config_->getThisServerName())
+ .arg(current_state_name);
+ network_state_->disableService(NetworkState::Origin::HA_COMMAND);
+
+ } else if (should_enable && !network_state_->isServiceEnabled()) {
+ std::string current_state_name = getStateLabel(getCurrState());
+ boost::to_upper(current_state_name);
+ LOG_INFO(ha_logger, HA_LOCAL_DHCP_ENABLE)
+ .arg(config_->getThisServerName())
+ .arg(current_state_name);
+ network_state_->enableService(NetworkState::Origin::HA_COMMAND);
+ }
+}
+
+bool
+HAService::shouldPartnerDown() const {
+ // Checking whether the communication with the partner is OK is the
+ // first step towards verifying if the server is up.
+ if (communication_state_->isCommunicationInterrupted()) {
+ // If the communication is interrupted, we also have to check
+ // whether the partner answers DHCP requests. The only cases
+ // when we don't (can't) do it are: the hot standby configuration
+ // in which this server is a primary and when the DHCP service is
+ // disabled so we can't analyze incoming traffic. Note that the
+ // primary server can't check delayed responses to the partner
+ // because the partner doesn't respond to any queries in this
+ // configuration.
+ if (network_state_->isServiceEnabled() &&
+ ((config_->getHAMode() == HAConfig::LOAD_BALANCING) ||
+ (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::STANDBY))) {
+ return (communication_state_->failureDetected());
+ }
+
+ // Hot standby / primary case.
+ return (true);
+ }
+
+ // Shouldn't transition to the partner down state.
+ return (false);
+}
+
+bool
+HAService::shouldTerminate() const {
+ // Check if skew is fatally large.
+ bool should_terminate = communication_state_->clockSkewShouldTerminate();
+
+ // If not issue a warning if it's getting large.
+ if (!should_terminate) {
+ communication_state_->clockSkewShouldWarn();
+ }
+
+ return (should_terminate);
+}
+
+bool
+HAService::isMaintenanceCanceled() const {
+ return (getLastEvent() == HA_MAINTENANCE_CANCEL_EVT);
+}
+
+bool
+HAService::isPartnerStateInvalid() const {
+ switch (communication_state_->getPartnerState()) {
+ case HA_COMMUNICATION_RECOVERY_ST:
+ if (config_->getHAMode() != HAConfig::LOAD_BALANCING) {
+ LOG_WARN(ha_logger, HA_INVALID_PARTNER_STATE_COMMUNICATION_RECOVERY);
+ return (true);
+ }
+ break;
+
+ case HA_HOT_STANDBY_ST:
+ if (config_->getHAMode() != HAConfig::HOT_STANDBY) {
+ LOG_WARN(ha_logger, HA_INVALID_PARTNER_STATE_HOT_STANDBY);
+ return (true);
+ }
+ break;
+
+ case HA_LOAD_BALANCING_ST:
+ if (config_->getHAMode() != HAConfig::LOAD_BALANCING) {
+ LOG_WARN(ha_logger, HA_INVALID_PARTNER_STATE_LOAD_BALANCING);
+ return (true);
+ }
+ break;
+
+ default:
+ ;
+ }
+ return (false);
+}
+
+size_t
+HAService::asyncSendLeaseUpdates(const dhcp::Pkt4Ptr& query,
+ const dhcp::Lease4CollectionPtr& leases,
+ const dhcp::Lease4CollectionPtr& deleted_leases,
+ const hooks::ParkingLotHandlePtr& parking_lot) {
+
+ // Get configurations of the peers. Exclude this instance.
+ HAConfig::PeerConfigMap peers_configs = config_->getOtherServersConfig();
+
+ size_t sent_num = 0;
+
+ // Schedule sending lease updates to each peer.
+ for (auto p = peers_configs.begin(); p != peers_configs.end(); ++p) {
+ HAConfig::PeerConfigPtr conf = p->second;
+
+ // Check if the lease updates should be queued. This is the case when the
+ // server is in the communication-recovery state. Queued lease updates may
+ // be sent when the communication is re-established.
+ if (shouldQueueLeaseUpdates(conf)) {
+ // Lease updates for deleted leases.
+ for (auto l = deleted_leases->begin(); l != deleted_leases->end(); ++l) {
+ lease_update_backlog_.push(LeaseUpdateBacklog::DELETE, *l);
+ }
+
+ // Lease updates for new allocations and updated leases.
+ for (auto l = leases->begin(); l != leases->end(); ++l) {
+ lease_update_backlog_.push(LeaseUpdateBacklog::ADD, *l);
+ }
+
+ continue;
+ }
+
+ // Check if the lease update should be sent to the server. If we're in
+ // the partner-down state we don't send lease updates to the partner.
+ if (!shouldSendLeaseUpdates(conf)) {
+ // If we decide to not send the lease updates to an active partner, we
+ // should make a record of it in the communication state. The partner
+ // can check if there were any unsent lease updates when he determines
+ // whether it should synchronize its database or not when it recovers
+ // from the partner-down state.
+ if (conf->getRole() != HAConfig::PeerConfig::BACKUP) {
+ communication_state_->increaseUnsentUpdateCount();
+ }
+ continue;
+ }
+
+ // Lease updates for deleted leases.
+ for (auto l = deleted_leases->begin(); l != deleted_leases->end(); ++l) {
+ asyncSendLeaseUpdate(query, conf, CommandCreator::createLease4Delete(**l),
+ parking_lot);
+ }
+
+ // Lease updates for new allocations and updated leases.
+ for (auto l = leases->begin(); l != leases->end(); ++l) {
+ asyncSendLeaseUpdate(query, conf, CommandCreator::createLease4Update(**l),
+ parking_lot);
+ }
+
+ // If we're contacting a backup server from which we don't expect a
+ // response prior to responding to the DHCP client we don't count
+ // it.
+ if ((config_->amWaitingBackupAck() || (conf->getRole() != HAConfig::PeerConfig::BACKUP))) {
+ ++sent_num;
+ }
+ }
+
+ return (sent_num);
+}
+
+size_t
+HAService::asyncSendLeaseUpdates(const dhcp::Pkt6Ptr& query,
+ const dhcp::Lease6CollectionPtr& leases,
+ const dhcp::Lease6CollectionPtr& deleted_leases,
+ const hooks::ParkingLotHandlePtr& parking_lot) {
+
+ // Get configurations of the peers. Exclude this instance.
+ HAConfig::PeerConfigMap peers_configs = config_->getOtherServersConfig();
+
+ size_t sent_num = 0;
+
+ // Schedule sending lease updates to each peer.
+ for (auto p = peers_configs.begin(); p != peers_configs.end(); ++p) {
+ HAConfig::PeerConfigPtr conf = p->second;
+
+ // Check if the lease updates should be queued. This is the case when the
+ // server is in the communication-recovery state. Queued lease updates may
+ // be sent when the communication is re-established.
+ if (shouldQueueLeaseUpdates(conf)) {
+ for (auto l = deleted_leases->begin(); l != deleted_leases->end(); ++l) {
+ lease_update_backlog_.push(LeaseUpdateBacklog::DELETE, *l);
+ }
+
+ // Lease updates for new allocations and updated leases.
+ for (auto l = leases->begin(); l != leases->end(); ++l) {
+ lease_update_backlog_.push(LeaseUpdateBacklog::ADD, *l);
+ }
+
+ continue;
+ }
+
+ // Check if the lease update should be sent to the server. If we're in
+ // the partner-down state we don't send lease updates to the partner.
+ if (!shouldSendLeaseUpdates(conf)) {
+ // If we decide to not send the lease updates to an active partner, we
+ // should make a record of it in the communication state. The partner
+ // can check if there were any unsent lease updates when he determines
+ // whether it should synchronize its database or not when it recovers
+ // from the partner-down state.
+ if (conf->getRole() != HAConfig::PeerConfig::BACKUP) {
+ communication_state_->increaseUnsentUpdateCount();
+ }
+ continue;
+ }
+
+ // If we're contacting a backup server from which we don't expect a
+ // response prior to responding to the DHCP client we don't count
+ // it.
+ if (config_->amWaitingBackupAck() || (conf->getRole() != HAConfig::PeerConfig::BACKUP)) {
+ ++sent_num;
+ }
+
+ // Send new/updated leases and deleted leases in one command.
+ asyncSendLeaseUpdate(query, conf, CommandCreator::createLease6BulkApply(leases, deleted_leases),
+ parking_lot);
+ }
+
+ return (sent_num);
+}
+
+template<typename QueryPtrType>
+bool
+HAService::leaseUpdateComplete(QueryPtrType& query,
+ const ParkingLotHandlePtr& parking_lot) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return (leaseUpdateCompleteInternal(query, parking_lot));
+ } else {
+ return (leaseUpdateCompleteInternal(query, parking_lot));
+ }
+}
+
+template<typename QueryPtrType>
+bool
+HAService::leaseUpdateCompleteInternal(QueryPtrType& query,
+ const ParkingLotHandlePtr& parking_lot) {
+ auto it = pending_requests_.find(query);
+
+ // If there are no more pending requests for this query, let's unpark
+ // the DHCP packet.
+ if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) {
+ parking_lot->unpark(query);
+
+ // If we have unparked the packet we can clear pending requests for
+ // this query.
+ if (it != pending_requests_.end()) {
+ pending_requests_.erase(it);
+ }
+ return (true);
+ }
+ return (false);
+}
+
+template<typename QueryPtrType>
+void
+HAService::updatePendingRequest(QueryPtrType& query) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ updatePendingRequestInternal(query);
+ } else {
+ updatePendingRequestInternal(query);
+ }
+}
+
+template<typename QueryPtrType>
+void
+HAService::updatePendingRequestInternal(QueryPtrType& query) {
+ if (pending_requests_.count(query) == 0) {
+ pending_requests_[query] = 1;
+ } else {
+ ++pending_requests_[query];
+ }
+}
+
+template<typename QueryPtrType>
+void
+HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
+ const HAConfig::PeerConfigPtr& config,
+ const ConstElementPtr& command,
+ const ParkingLotHandlePtr& parking_lot) {
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(config->getUrl().getStrippedHostname()));
+ config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(command);
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // When possible we prefer to pass weak pointers to the queries, rather
+ // than shared pointers, to avoid memory leaks in case cross reference
+ // between the pointers.
+ boost::weak_ptr<typename QueryPtrType::element_type> weak_query(query);
+
+ // Schedule asynchronous HTTP request.
+ client_->asyncSendRequest(config->getUrl(), config->getTlsContext(),
+ request, response,
+ [this, weak_query, parking_lot, config]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+ // Get the shared pointer of the query. The server should keep the
+ // pointer to the query and then park it. Therefore, we don't really
+ // expect it to be null. If it is null, something is really wrong.
+ QueryPtrType query = weak_query.lock();
+ if (!query) {
+ isc_throw(Unexpected, "query is null while receiving response from"
+ " HA peer. This is programmatic error");
+ }
+
+ // There are three possible groups of errors during the lease update.
+ // One is the IO error causing issues in communication with the peer.
+ // Another one is an HTTP parsing error. The last type of error is
+ // when non-success error code is returned in the response carried
+ // in the HTTP message or if the JSON response is otherwise broken.
+
+ bool lease_update_success = true;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ LOG_WARN(ha_logger, HA_LEASE_UPDATE_COMMUNICATIONS_FAILED)
+ .arg(query->getLabel())
+ .arg(config->getLogLabel())
+ .arg(ec ? ec.message() : error_str);
+
+ // Communication error, so let's drop parked packet. The DHCP
+ // response will not be sent.
+ lease_update_success = false;
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ int rcode = 0;
+ auto args = verifyAsyncResponse(response, rcode);
+ // In the v6 case the server may return a list of failed lease
+ // updates and we should log them.
+ logFailedLeaseUpdates(query, args);
+
+ } catch (const std::exception& ex) {
+ LOG_WARN(ha_logger, HA_LEASE_UPDATE_FAILED)
+ .arg(query->getLabel())
+ .arg(config->getLogLabel())
+ .arg(ex.what());
+
+ // Error while doing an update. The DHCP response will not be sent.
+ lease_update_success = false;
+ }
+ }
+
+ // We don't care about the result of the lease update to the backup server.
+ // It is a best effort update.
+ if ((config->getRole() != HAConfig::PeerConfig::BACKUP) && !lease_update_success) {
+ // If we were unable to communicate with the partner we set partner's
+ // state as unavailable.
+ communication_state_->setPartnerState("unavailable");
+ }
+
+ // It is possible to configure the server to not wait for a response from
+ // the backup server before we unpark the packet and respond to the client.
+ // Here we check if we're dealing with such situation.
+ if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) {
+ // We're expecting a response from the backup server or it is not
+ // a backup server and the lease update was unsuccessful. In such
+ // case the DHCP exchange fails.
+ if (!lease_update_success) {
+ parking_lot->drop(query);
+ }
+ } else {
+ // This was a response from the backup server and we're configured to
+ // not wait for their acknowledgments, so there is nothing more to do.
+ return;
+ }
+
+ if (leaseUpdateComplete(query, parking_lot)) {
+ // If we have finished sending the lease updates we need to run the
+ // state machine until the state machine finds that additional events
+ // are required, such as next heartbeat or a lease update. The runModel()
+ // may transition to another state, schedule asynchronous tasks etc.
+ // Then it returns control to the DHCP server.
+ runModel(HA_LEASE_UPDATES_COMPLETE_EVT);
+ }
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+
+ // The number of pending requests is the number of requests for which we
+ // expect an acknowledgment prior to responding to the DHCP clients. If
+ // we're configured to wait for the acks from the backups or it is not
+ // a backup increase the number of pending requests.
+ if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) {
+ // Request scheduled, so update the request counters for the query.
+ updatePendingRequest(query);
+ }
+}
+
+bool
+HAService::shouldSendLeaseUpdates(const HAConfig::PeerConfigPtr& peer_config) const {
+ // Never send lease updates if they are administratively disabled.
+ if (!config_->amSendingLeaseUpdates()) {
+ return (false);
+ }
+
+ // Always send updates to the backup server.
+ if (peer_config->getRole() == HAConfig::PeerConfig::BACKUP) {
+ return (true);
+ }
+
+ // Never send updates if this is a backup server.
+ if (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::BACKUP) {
+ return (false);
+ }
+
+ // In other case, whether we send lease updates or not depends on our
+ // state.
+ switch (getCurrState()) {
+ case HA_HOT_STANDBY_ST:
+ case HA_LOAD_BALANCING_ST:
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ return (true);
+
+ default:
+ ;
+ }
+
+ return (false);
+}
+
+bool
+HAService::shouldQueueLeaseUpdates(const HAConfig::PeerConfigPtr& peer_config) const {
+ if (!config_->amSendingLeaseUpdates()) {
+ return (false);
+ }
+
+ if (peer_config->getRole() == HAConfig::PeerConfig::BACKUP) {
+ return (false);
+ }
+
+ return (getCurrState() == HA_COMMUNICATION_RECOVERY_ST);
+}
+
+void
+HAService::logFailedLeaseUpdates(const PktPtr& query,
+ const ConstElementPtr& args) const {
+ // If there are no arguments, it means that the update was successful.
+ if (!args || (args->getType() != Element::map)) {
+ return;
+ }
+
+ // Instead of duplicating the code between the failed-deleted-leases and
+ // failed-leases, let's just have one function that does it for both.
+ auto log_proc = [](const PktPtr query, const ConstElementPtr& args,
+ const std::string& param_name, const log::MessageID& mesid) {
+
+ // Check if there are any failed leases.
+ auto failed_leases = args->get(param_name);
+
+ // The failed leases must be a list.
+ if (failed_leases && (failed_leases->getType() == Element::list)) {
+ // Go over the failed leases and log each of them.
+ for (int i = 0; i < failed_leases->size(); ++i) {
+ auto lease = failed_leases->get(i);
+ if (lease->getType() == Element::map) {
+
+ // ip-address
+ auto ip_address = lease->get("ip-address");
+
+ // lease type
+ auto lease_type = lease->get("type");
+
+ // error-message
+ auto error_message = lease->get("error-message");
+
+ LOG_INFO(ha_logger, mesid)
+ .arg(query->getLabel())
+ .arg(lease_type && (lease_type->getType() == Element::string) ?
+ lease_type->stringValue() : "(unknown)")
+ .arg(ip_address && (ip_address->getType() == Element::string) ?
+ ip_address->stringValue() : "(unknown)")
+ .arg(error_message && (error_message->getType() == Element::string) ?
+ error_message->stringValue() : "(unknown)");
+ }
+ }
+ }
+ };
+
+ // Process "failed-deleted-leases"
+ log_proc(query, args, "failed-deleted-leases", HA_LEASE_UPDATE_DELETE_FAILED_ON_PEER);
+
+ // Process "failed-leases".
+ log_proc(query, args, "failed-leases", HA_LEASE_UPDATE_CREATE_UPDATE_FAILED_ON_PEER);
+}
+
+ConstElementPtr
+HAService::processStatusGet() const {
+ ElementPtr ha_servers = Element::createMap();
+
+ // Local part
+ ElementPtr local = Element::createMap();
+ HAConfig::PeerConfig::Role role;
+ role = config_->getThisServerConfig()->getRole();
+ std::string role_txt = HAConfig::PeerConfig::roleToString(role);
+ local->set("role", Element::create(role_txt));
+ int state = getCurrState();
+ try {
+ local->set("state", Element::create(stateToString(state)));
+
+ } catch (...) {
+ // Empty string on error.
+ local->set("state", Element::create(std::string()));
+ }
+ std::set<std::string> scopes = query_filter_.getServedScopes();
+ ElementPtr list = Element::createList();
+ for (std::string scope : scopes) {
+ list->add(Element::create(scope));
+ }
+ local->set("scopes", list);
+ ha_servers->set("local", local);
+
+ // Do not include remote server information if this is a backup server or
+ // we're in the passive-backup mode.
+ if ((config_->getHAMode() == HAConfig::PASSIVE_BACKUP) ||
+ (config_->getThisServerConfig()->getRole() == HAConfig::PeerConfig::BACKUP)) {
+ return (ha_servers);
+ }
+
+ // Remote part
+ ElementPtr remote = communication_state_->getReport();
+
+ try {
+ role = config_->getFailoverPeerConfig()->getRole();
+ std::string role_txt = HAConfig::PeerConfig::roleToString(role);
+ remote->set("role", Element::create(role_txt));
+
+ } catch (...) {
+ remote->set("role", Element::create(std::string()));
+ }
+ ha_servers->set("remote", remote);
+
+ return (ha_servers);
+}
+
+ConstElementPtr
+HAService::processHeartbeat() {
+ ElementPtr arguments = Element::createMap();
+ std::string state_label = getState(getCurrState())->getLabel();
+ arguments->set("state", Element::create(state_label));
+
+ std::string date_time = HttpDateTime().rfc1123Format();
+ arguments->set("date-time", Element::create(date_time));
+
+ auto scopes = query_filter_.getServedScopes();
+ ElementPtr scopes_list = Element::createList();
+ for (auto scope : scopes) {
+ scopes_list->add(Element::create(scope));
+ }
+ arguments->set("scopes", scopes_list);
+
+ arguments->set("unsent-update-count",
+ Element::create(static_cast<int64_t>(communication_state_->getUnsentUpdateCount())));
+
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "HA peer status returned.",
+ arguments));
+}
+
+ConstElementPtr
+HAService::processHAReset() {
+ if (getCurrState() == HA_WAITING_ST) {
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "HA state machine already in WAITING state."));
+ }
+ verboseTransition(HA_WAITING_ST);
+ runModel(NOP_EVT);
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "HA state machine reset."));
+}
+
+void
+HAService::asyncSendHeartbeat() {
+ HAConfig::PeerConfigPtr partner_config = config_->getFailoverPeerConfig();
+
+ // If the sync_complete_notified_ is true it means that the partner
+ // notified us that it had completed lease database synchronization.
+ // We confirm that the partner is operational by sending the heartbeat
+ // to it. Regardless if the partner responds to our heartbeats or not,
+ // we should clear this flag. But, since we need the current value in
+ // the async call handler, we save it in the local variable before
+ // clearing it.
+ bool sync_complete_notified = sync_complete_notified_;
+ sync_complete_notified_ = false;
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(partner_config->getUrl().getStrippedHostname()));
+ partner_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createHeartbeat(server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // Schedule asynchronous HTTP request.
+ client_->asyncSendRequest(partner_config->getUrl(),
+ partner_config->getTlsContext(),
+ request, response,
+ [this, partner_config, sync_complete_notified]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ // There are three possible groups of errors during the heartbeat.
+ // One is the IO error causing issues in communication with the peer.
+ // Another one is an HTTP parsing error. The last type of error is
+ // when non-success error code is returned in the response carried
+ // in the HTTP message or if the JSON response is otherwise broken.
+
+ bool heartbeat_success = true;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ LOG_WARN(ha_logger, HA_HEARTBEAT_COMMUNICATIONS_FAILED)
+ .arg(partner_config->getLogLabel())
+ .arg(ec ? ec.message() : error_str);
+ heartbeat_success = false;
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ // Response must contain arguments and the arguments must
+ // be a map.
+ int rcode = 0;
+ ConstElementPtr args = verifyAsyncResponse(response, rcode);
+ if (!args || args->getType() != Element::map) {
+ isc_throw(CtrlChannelError, "returned arguments in the response"
+ " must be a map");
+ }
+ // Response must include partner's state.
+ ConstElementPtr state = args->get("state");
+ if (!state || state->getType() != Element::string) {
+ isc_throw(CtrlChannelError, "server state not returned in response"
+ " to a ha-heartbeat command or it is not a string");
+ }
+ // Remember the partner's state. This may throw if the returned
+ // state is invalid.
+ communication_state_->setPartnerState(state->stringValue());
+
+ ConstElementPtr date_time = args->get("date-time");
+ if (!date_time || date_time->getType() != Element::string) {
+ isc_throw(CtrlChannelError, "date-time not returned in response"
+ " to a ha-heartbeat command or it is not a string");
+ }
+ // Note the time returned by the partner to calculate the clock skew.
+ communication_state_->setPartnerTime(date_time->stringValue());
+
+ // Remember the scopes served by the partner.
+ try {
+ auto scopes = args->get("scopes");
+ communication_state_->setPartnerScopes(scopes);
+
+ } catch (...) {
+ // We don't want to fail if the scopes are missing because
+ // this would be incompatible with old HA hook library
+ // versions. We may make it mandatory one day, but during
+ // upgrades of existing HA setup it would be a real issue
+ // if we failed here.
+ }
+
+ // unsent-update-count was not present in earlier HA versions.
+ // Let's check if the partner has sent the parameter. We initialized
+ // the counter to 0, and it remains 0 if the partner doesn't send it.
+ // It effectively means that we don't track partner's unsent updates
+ // as in the earlier HA versions.
+ auto unsent_update_count = args->get("unsent-update-count");
+ if (unsent_update_count) {
+ if (unsent_update_count->getType() != Element::integer) {
+ isc_throw(CtrlChannelError, "unsent-update-count returned in"
+ " the ha-heartbeat response is not an integer");
+ }
+ communication_state_->setPartnerUnsentUpdateCount(static_cast<uint64_t>
+ (unsent_update_count->intValue()));
+ }
+
+ } catch (const std::exception& ex) {
+ LOG_WARN(ha_logger, HA_HEARTBEAT_FAILED)
+ .arg(partner_config->getLogLabel())
+ .arg(ex.what());
+ heartbeat_success = false;
+ }
+ }
+
+ // If heartbeat was successful, let's mark the connection with the
+ // peer as healthy.
+ if (heartbeat_success) {
+ communication_state_->poke();
+
+ } else {
+ // We were unable to retrieve partner's state, so let's mark it
+ // as unavailable.
+ communication_state_->setPartnerState("unavailable");
+ // Log if the communication is interrupted.
+ if (communication_state_->isCommunicationInterrupted()) {
+ LOG_WARN(ha_logger, HA_COMMUNICATION_INTERRUPTED)
+ .arg(partner_config->getName());
+ }
+ }
+
+ startHeartbeat();
+ // Even though the partner notified us about the synchronization completion,
+ // we still can't communicate with the partner. Let's continue serving
+ // the clients until the link is fixed.
+ if (sync_complete_notified && !heartbeat_success) {
+ postNextEvent(HA_SYNCED_PARTNER_UNAVAILABLE_EVT);
+ }
+ // Whatever the result of the heartbeat was, the state machine needs
+ // to react to this. Let's run the state machine until the state machine
+ // finds that some new events are required, i.e. next heartbeat or
+ // lease update. The runModel() may transition to another state, schedule
+ // asynchronous tasks etc. Then it returns control to the DHCP server.
+ runModel(HA_HEARTBEAT_COMPLETE_EVT);
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+}
+
+void
+HAService::scheduleHeartbeat() {
+ if (!communication_state_->isHeartbeatRunning()) {
+ startHeartbeat();
+ }
+}
+
+void
+HAService::startHeartbeat() {
+ if (config_->getHeartbeatDelay() > 0) {
+ communication_state_->startHeartbeat(config_->getHeartbeatDelay(),
+ std::bind(&HAService::asyncSendHeartbeat,
+ this));
+ }
+}
+
+void
+HAService::asyncDisableDHCPService(HttpClient& http_client,
+ const std::string& server_name,
+ const unsigned int max_period,
+ PostRequestCallback post_request_action) {
+ HAConfig::PeerConfigPtr remote_config = config_->getPeerConfig(server_name);
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(remote_config->getUrl().getStrippedHostname()));
+
+ remote_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createDHCPDisable(max_period,
+ server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // Schedule asynchronous HTTP request.
+ http_client.asyncSendRequest(remote_config->getUrl(),
+ remote_config->getTlsContext(),
+ request, response,
+ [this, remote_config, post_request_action]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ // There are three possible groups of errors during the heartbeat.
+ // One is the IO error causing issues in communication with the peer.
+ // Another one is an HTTP parsing error. The last type of error is
+ // when non-success error code is returned in the response carried
+ // in the HTTP message or if the JSON response is otherwise broken.
+
+ int rcode = 0;
+ std::string error_message;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_DHCP_DISABLE_COMMUNICATIONS_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ static_cast<void>(verifyAsyncResponse(response, rcode));
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_DHCP_DISABLE_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+ }
+
+ // Invoke post request action if it was specified.
+ if (post_request_action) {
+ post_request_action(error_message.empty(),
+ error_message,
+ rcode);
+ }
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+}
+
+void
+HAService::asyncEnableDHCPService(HttpClient& http_client,
+ const std::string& server_name,
+ PostRequestCallback post_request_action) {
+ HAConfig::PeerConfigPtr remote_config = config_->getPeerConfig(server_name);
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(remote_config->getUrl().getStrippedHostname()));
+ remote_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createDHCPEnable(server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // Schedule asynchronous HTTP request.
+ http_client.asyncSendRequest(remote_config->getUrl(),
+ remote_config->getTlsContext(),
+ request, response,
+ [this, remote_config, post_request_action]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ // There are three possible groups of errors during the heartbeat.
+ // One is the IO error causing issues in communication with the peer.
+ // Another one is an HTTP parsing error. The last type of error is
+ // when non-success error code is returned in the response carried
+ // in the HTTP message or if the JSON response is otherwise broken.
+
+ int rcode = 0;
+ std::string error_message;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_DHCP_ENABLE_COMMUNICATIONS_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ static_cast<void>(verifyAsyncResponse(response, rcode));
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_DHCP_ENABLE_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+ }
+
+ // Invoke post request action if it was specified.
+ if (post_request_action) {
+ post_request_action(error_message.empty(),
+ error_message,
+ rcode);
+ }
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+}
+
+void
+HAService::localDisableDHCPService() {
+ network_state_->disableService(NetworkState::Origin::HA_COMMAND);
+}
+
+void
+HAService::localEnableDHCPService() {
+ network_state_->enableService(NetworkState::Origin::HA_COMMAND);
+}
+
+void
+HAService::asyncSyncLeases() {
+ PostSyncCallback null_action;
+
+ // Timeout is configured in milliseconds. Need to convert to seconds.
+ unsigned int dhcp_disable_timeout =
+ static_cast<unsigned int>(config_->getSyncTimeout() / 1000);
+ if (dhcp_disable_timeout == 0) {
+ // Ensure that we always use at least 1 second timeout.
+ dhcp_disable_timeout = 1;
+ }
+
+ asyncSyncLeases(*client_, config_->getFailoverPeerConfig()->getName(),
+ dhcp_disable_timeout, LeasePtr(), null_action);
+}
+
+void
+HAService::asyncSyncLeases(http::HttpClient& http_client,
+ const std::string& server_name,
+ const unsigned int max_period,
+ const dhcp::LeasePtr& last_lease,
+ PostSyncCallback post_sync_action,
+ const bool dhcp_disabled) {
+ // Synchronization starts with a command to disable DHCP service of the
+ // peer from which we're fetching leases. We don't want the other server
+ // to allocate new leases while we fetch from it. The DHCP service will
+ // be disabled for a certain amount of time and will be automatically
+ // re-enabled if we die during the synchronization.
+ asyncDisableDHCPService(http_client, server_name, max_period,
+ [this, &http_client, server_name, max_period, last_lease,
+ post_sync_action, dhcp_disabled]
+ (const bool success, const std::string& error_message, const int) {
+
+ // If we have successfully disabled the DHCP service on the peer,
+ // we can start fetching the leases.
+ if (success) {
+ // The last argument indicates that disabling the DHCP
+ // service on the partner server was successful.
+ asyncSyncLeasesInternal(http_client, server_name, max_period,
+ last_lease, post_sync_action, true);
+
+ } else {
+ post_sync_action(success, error_message, dhcp_disabled);
+ }
+ });
+}
+
+void
+HAService::asyncSyncLeasesInternal(http::HttpClient& http_client,
+ const std::string& server_name,
+ const unsigned int max_period,
+ const dhcp::LeasePtr& last_lease,
+ PostSyncCallback post_sync_action,
+ const bool dhcp_disabled) {
+
+ HAConfig::PeerConfigPtr partner_config = config_->getFailoverPeerConfig();
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(partner_config->getUrl().getStrippedHostname()));
+ partner_config->addBasicAuthHttpHeader(request);
+ if (server_type_ == HAServerType::DHCPv4) {
+ request->setBodyAsJson(CommandCreator::createLease4GetPage(
+ boost::dynamic_pointer_cast<Lease4>(last_lease), config_->getSyncPageLimit()));
+
+ } else {
+ request->setBodyAsJson(CommandCreator::createLease6GetPage(
+ boost::dynamic_pointer_cast<Lease6>(last_lease), config_->getSyncPageLimit()));
+ }
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // Schedule asynchronous HTTP request.
+ http_client.asyncSendRequest(partner_config->getUrl(),
+ partner_config->getTlsContext(),
+ request, response,
+ [this, partner_config, post_sync_action, &http_client, server_name,
+ max_period, dhcp_disabled]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ // Holds last lease received on the page of leases. If the last
+ // page was hit, this value remains null.
+ LeasePtr last_lease;
+
+ // There are three possible groups of errors during the heartbeat.
+ // One is the IO error causing issues in communication with the peer.
+ // Another one is an HTTP parsing error. The last type of error is
+ // when non-success error code is returned in the response carried
+ // in the HTTP message or if the JSON response is otherwise broken.
+
+ std::string error_message;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_LEASES_SYNC_COMMUNICATIONS_FAILED)
+ .arg(partner_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+ // Handle third group of errors.
+ try {
+ int rcode = 0;
+ ConstElementPtr args = verifyAsyncResponse(response, rcode);
+
+ // Arguments must be a map.
+ if (args && (args->getType() != Element::map)) {
+ isc_throw(CtrlChannelError,
+ "arguments in the received response must be a map");
+ }
+
+ ConstElementPtr leases = args->get("leases");
+ if (!leases || (leases->getType() != Element::list)) {
+ isc_throw(CtrlChannelError,
+ "server response does not contain leases argument or this"
+ " argument is not a list");
+ }
+
+ // Iterate over the leases and update the database as appropriate.
+ const auto& leases_element = leases->listValue();
+
+ LOG_INFO(ha_logger, HA_LEASES_SYNC_LEASE_PAGE_RECEIVED)
+ .arg(leases_element.size())
+ .arg(server_name);
+
+ for (auto l = leases_element.begin(); l != leases_element.end(); ++l) {
+ try {
+
+ if (server_type_ == HAServerType::DHCPv4) {
+ Lease4Ptr lease = Lease4::fromElement(*l);
+
+ // Check if there is such lease in the database already.
+ Lease4Ptr existing_lease = LeaseMgrFactory::instance().getLease4(lease->addr_);
+ if (!existing_lease) {
+ // There is no such lease, so let's add it.
+ LeaseMgrFactory::instance().addLease(lease);
+
+ } else if (existing_lease->cltt_ < lease->cltt_) {
+ // If the existing lease is older than the fetched lease, update
+ // the lease in our local database.
+ // Update lease current expiration time with value received from the
+ // database. Some database backends reject operations on the lease if
+ // the current expiration time value does not match what is stored.
+ Lease::syncCurrentExpirationTime(*existing_lease, *lease);
+ LeaseMgrFactory::instance().updateLease4(lease);
+
+ } else {
+ LOG_DEBUG(ha_logger, DBGLVL_TRACE_BASIC, HA_LEASE_SYNC_STALE_LEASE4_SKIP)
+ .arg(lease->addr_.toText())
+ .arg(lease->subnet_id_);
+ }
+
+ // If we're not on the last page and we're processing final lease on
+ // this page, let's record the lease as input to the next
+ // lease4-get-page command.
+ if ((leases_element.size() >= config_->getSyncPageLimit()) &&
+ (l + 1 == leases_element.end())) {
+ last_lease = boost::dynamic_pointer_cast<Lease>(lease);
+ }
+
+ } else {
+ Lease6Ptr lease = Lease6::fromElement(*l);
+
+ // Check if there is such lease in the database already.
+ Lease6Ptr existing_lease = LeaseMgrFactory::instance().getLease6(lease->type_,
+ lease->addr_);
+ if (!existing_lease) {
+ // There is no such lease, so let's add it.
+ LeaseMgrFactory::instance().addLease(lease);
+
+ } else if (existing_lease->cltt_ < lease->cltt_) {
+ // If the existing lease is older than the fetched lease, update
+ // the lease in our local database.
+ // Update lease current expiration time with value received from the
+ // database. Some database backends reject operations on the lease if
+ // the current expiration time value does not match what is stored.
+ Lease::syncCurrentExpirationTime(*existing_lease, *lease);
+ LeaseMgrFactory::instance().updateLease6(lease);
+
+ } else {
+ LOG_DEBUG(ha_logger, DBGLVL_TRACE_BASIC, HA_LEASE_SYNC_STALE_LEASE6_SKIP)
+ .arg(lease->addr_.toText())
+ .arg(lease->subnet_id_);
+ }
+
+ // If we're not on the last page and we're processing final lease on
+ // this page, let's record the lease as input to the next
+ // lease6-get-page command.
+ if ((leases_element.size() >= config_->getSyncPageLimit()) &&
+ (l + 1 == leases_element.end())) {
+ last_lease = boost::dynamic_pointer_cast<Lease>(lease);
+ }
+ }
+
+ } catch (const std::exception& ex) {
+ LOG_WARN(ha_logger, HA_LEASE_SYNC_FAILED)
+ .arg((*l)->str())
+ .arg(ex.what());
+ }
+ }
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_LEASES_SYNC_FAILED)
+ .arg(partner_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+
+ } else if (last_lease) {
+ // This indicates that there are more leases to be fetched.
+ // Therefore, we have to send another leaseX-get-page command.
+ asyncSyncLeases(http_client, server_name, max_period, last_lease,
+ post_sync_action, dhcp_disabled);
+ return;
+ }
+
+ // Invoke post synchronization action if it was specified.
+ if (post_sync_action) {
+ post_sync_action(error_message.empty(),
+ error_message,
+ dhcp_disabled);
+ }
+ },
+ HttpClient::RequestTimeout(config_->getSyncTimeout()),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+
+}
+
+ConstElementPtr
+HAService::processSynchronize(const std::string& server_name,
+ const unsigned int max_period) {
+ std::string answer_message;
+ int sync_status = synchronize(answer_message, server_name, max_period);
+ return (createAnswer(sync_status, answer_message));
+}
+
+int
+HAService::synchronize(std::string& status_message, const std::string& server_name,
+ const unsigned int max_period) {
+ IOService io_service;
+ HttpClient client(io_service);
+
+ asyncSyncLeases(client, server_name, max_period, Lease4Ptr(),
+ [&](const bool success, const std::string& error_message,
+ const bool dhcp_disabled) {
+ // If there was a fatal error while fetching the leases, let's
+ // log an error message so as it can be included in the response
+ // to the controlling client.
+ if (!success) {
+ status_message = error_message;
+ }
+
+ // Whether or not there was an error while fetching the leases,
+ // we need to re-enable the DHCP service on the peer if the
+ // DHCP service was disabled in the course of synchronization.
+ if (dhcp_disabled) {
+ // If the synchronization was completed successfully let's
+ // try to send the ha-sync-complete-notify command to the
+ // partner.
+ if (success) {
+ asyncSyncCompleteNotify(client, server_name,
+ [&](const bool success,
+ const std::string& error_message,
+ const int rcode) {
+ // This command may not be supported by the partner when it
+ // runs an older Kea version. In that case, send the dhcp-enable
+ // command as in previous Kea version.
+ if (rcode == CONTROL_RESULT_COMMAND_UNSUPPORTED) {
+ asyncEnableDHCPService(client, server_name,
+ [&](const bool success,
+ const std::string& error_message,
+ const int) {
+ // It is possible that we have already recorded an error
+ // message while synchronizing the lease database. Don't
+ // override the existing error message.
+ if (!success && status_message.empty()) {
+ status_message = error_message;
+ }
+
+ // The synchronization process is completed, so let's break
+ // the IO service so as we can return the response to the
+ // controlling client.
+ io_service.stop();
+ });
+
+ } else {
+ // ha-sync-complete-notify command was delivered to the partner.
+ // The synchronization process ends here.
+ if (!success && status_message.empty()) {
+ status_message = error_message;
+ }
+
+ io_service.stop();
+ }
+ });
+
+ } else {
+ // Synchronization was unsuccessful. Send the dhcp-enable command to
+ // re-enable the DHCP service. Note, that we don't send the
+ // ha-sync-complete-notify command in this case. It is only sent in
+ // the case when synchronization ends successfully.
+ asyncEnableDHCPService(client, server_name,
+ [&](const bool success,
+ const std::string& error_message,
+ const int) {
+ if (!success && status_message.empty()) {
+ status_message = error_message;
+ }
+
+ // The synchronization process is completed, so let's break
+ // the IO service so as we can return the response to the
+ // controlling client.
+ io_service.stop();
+
+ });
+ }
+
+ } else {
+ // Also stop IO service if there is no need to enable DHCP
+ // service.
+ io_service.stop();
+ }
+ });
+
+ LOG_INFO(ha_logger, HA_SYNC_START).arg(server_name);
+
+ // Measure duration of the synchronization.
+ Stopwatch stopwatch;
+
+ // Run the IO service until it is stopped by any of the callbacks. This
+ // makes it synchronous.
+ io_service.run();
+
+ // End measuring duration.
+ stopwatch.stop();
+
+ // If an error message has been recorded, return an error to the controlling
+ // client.
+ if (!status_message.empty()) {
+ postNextEvent(HA_SYNCING_FAILED_EVT);
+
+ LOG_ERROR(ha_logger, HA_SYNC_FAILED)
+ .arg(server_name)
+ .arg(status_message);
+
+ return (CONTROL_RESULT_ERROR);
+
+ }
+
+ // Everything was fine, so let's return a success.
+ status_message = "Lease database synchronization complete.";
+ postNextEvent(HA_SYNCING_SUCCEEDED_EVT);
+
+ LOG_INFO(ha_logger, HA_SYNC_SUCCESSFUL)
+ .arg(server_name)
+ .arg(stopwatch.logFormatLastDuration());
+
+ return (CONTROL_RESULT_SUCCESS);
+}
+
+void
+HAService::asyncSendLeaseUpdatesFromBacklog(HttpClient& http_client,
+ const HAConfig::PeerConfigPtr& config,
+ PostRequestCallback post_request_action) {
+ if (lease_update_backlog_.size() == 0) {
+ post_request_action(true, "", CONTROL_RESULT_SUCCESS);
+ return;
+ }
+
+ ConstElementPtr command;
+ if (server_type_ == HAServerType::DHCPv4) {
+ LeaseUpdateBacklog::OpType op_type;
+ Lease4Ptr lease = boost::dynamic_pointer_cast<Lease4>(lease_update_backlog_.pop(op_type));
+ if (op_type == LeaseUpdateBacklog::ADD) {
+ command = CommandCreator::createLease4Update(*lease);
+ } else {
+ command = CommandCreator::createLease4Delete(*lease);
+ }
+
+ } else {
+ command = CommandCreator::createLease6BulkApply(lease_update_backlog_);
+ }
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(config->getUrl().getStrippedHostname()));
+ config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(command);
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ http_client.asyncSendRequest(config->getUrl(), config->getTlsContext(),
+ request, response,
+ [this, &http_client, config, post_request_action]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ int rcode = 0;
+ std::string error_message;
+
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_WARN(ha_logger, HA_LEASES_BACKLOG_COMMUNICATIONS_FAILED)
+ .arg(config->getLogLabel())
+ .arg(ec ? ec.message() : error_str);
+
+ } else {
+ // Handle third group of errors.
+ try {
+ auto args = verifyAsyncResponse(response, rcode);
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_WARN(ha_logger, HA_LEASES_BACKLOG_FAILED)
+ .arg(config->getLogLabel())
+ .arg(ex.what());
+ }
+ }
+
+ // Recursively send all outstanding lease updates or break when an
+ // error occurs. In DHCPv6, this is a single iteration because we use
+ // lease6-bulk-apply, which combines many lease updates in a single
+ // transaction. In the case of DHCPv4, each update is sent in its own
+ // transaction.
+ if (error_message.empty()) {
+ asyncSendLeaseUpdatesFromBacklog(http_client, config, post_request_action);
+ } else {
+ post_request_action(error_message.empty(), error_message, rcode);
+ }
+ });
+}
+
+bool
+HAService::sendLeaseUpdatesFromBacklog() {
+ auto num_updates = lease_update_backlog_.size();
+ if (num_updates == 0) {
+ LOG_INFO(ha_logger, HA_LEASES_BACKLOG_NOTHING_TO_SEND);
+ return (true);
+ }
+
+ IOService io_service;
+ HttpClient client(io_service);
+ auto remote_config = config_->getFailoverPeerConfig();
+ bool updates_successful = true;
+
+ LOG_INFO(ha_logger, HA_LEASES_BACKLOG_START)
+ .arg(num_updates)
+ .arg(remote_config->getName());
+
+ asyncSendLeaseUpdatesFromBacklog(client, remote_config,
+ [&](const bool success, const std::string&, const int) {
+ io_service.stop();
+ updates_successful = success;
+ });
+
+ // Measure duration of the updates.
+ Stopwatch stopwatch;
+
+ // Run the IO service until it is stopped by the callback. This makes it synchronous.
+ io_service.run();
+
+ // End measuring duration.
+ stopwatch.stop();
+
+ if (updates_successful) {
+ LOG_INFO(ha_logger, HA_LEASES_BACKLOG_SUCCESS)
+ .arg(remote_config->getName())
+ .arg(stopwatch.logFormatLastDuration());
+ }
+
+ return (updates_successful);
+}
+
+void
+HAService::asyncSendHAReset(HttpClient& http_client,
+ const HAConfig::PeerConfigPtr& config,
+ PostRequestCallback post_request_action) {
+ ConstElementPtr command = CommandCreator::createHAReset(server_type_);
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(config->getUrl().getStrippedHostname()));
+ config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(command);
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ http_client.asyncSendRequest(config->getUrl(), config->getTlsContext(),
+ request, response,
+ [this, config, post_request_action]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ int rcode = 0;
+ std::string error_message;
+
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_WARN(ha_logger, HA_RESET_COMMUNICATIONS_FAILED)
+ .arg(config->getLogLabel())
+ .arg(ec ? ec.message() : error_str);
+
+ } else {
+ // Handle third group of errors.
+ try {
+ auto args = verifyAsyncResponse(response, rcode);
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_WARN(ha_logger, HA_RESET_FAILED)
+ .arg(config->getLogLabel())
+ .arg(ex.what());
+ }
+ }
+
+ post_request_action(error_message.empty(), error_message, rcode);
+ });
+}
+
+bool
+HAService::sendHAReset() {
+ IOService io_service;
+ HttpClient client(io_service);
+ auto remote_config = config_->getFailoverPeerConfig();
+ bool reset_successful = true;
+
+ asyncSendHAReset(client, remote_config,
+ [&](const bool success, const std::string&, const int) {
+ io_service.stop();
+ reset_successful = success;
+ });
+
+ // Run the IO service until it is stopped by the callback. This makes it synchronous.
+ io_service.run();
+
+ return (reset_successful);
+}
+
+ConstElementPtr
+HAService::processScopes(const std::vector<std::string>& scopes) {
+ try {
+ query_filter_.serveScopes(scopes);
+ adjustNetworkState();
+
+ } catch (const std::exception& ex) {
+ return (createAnswer(CONTROL_RESULT_ERROR, ex.what()));
+ }
+
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "New HA scopes configured."));
+}
+
+ConstElementPtr
+HAService::processContinue() {
+ if (unpause()) {
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "HA state machine continues."));
+ }
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "HA state machine is not paused."));
+}
+
+ConstElementPtr
+HAService::processMaintenanceNotify(const bool cancel) {
+ if (cancel) {
+ if (getCurrState() != HA_IN_MAINTENANCE_ST) {
+ return (createAnswer(CONTROL_RESULT_ERROR, "Unable to cancel the"
+ " maintenance for the server not in the"
+ " in-maintenance state."));
+ }
+
+ postNextEvent(HA_MAINTENANCE_CANCEL_EVT);
+ verboseTransition(getPrevState());
+ runModel(NOP_EVT);
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "Server maintenance canceled."));
+ }
+
+ switch (getCurrState()) {
+ case HA_BACKUP_ST:
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ case HA_TERMINATED_ST:
+ // The reason why we don't return an error result here is that we have to
+ // have a way to distinguish between the errors caused by the communication
+ // issues and the cases when there is no communication error but the server
+ // is not allowed to enter the in-maintenance state. In the former case, the
+ // partner would go to partner-down. In the case signaled by the special
+ // result code entering the maintenance state is not allowed.
+ return (createAnswer(HA_CONTROL_RESULT_MAINTENANCE_NOT_ALLOWED,
+ "Unable to transition the server from the "
+ + stateToString(getCurrState()) + " to"
+ " in-maintenance state."));
+ default:
+ verboseTransition(HA_IN_MAINTENANCE_ST);
+ runModel(HA_MAINTENANCE_NOTIFY_EVT);
+ }
+ return (createAnswer(CONTROL_RESULT_SUCCESS, "Server is in-maintenance state."));
+}
+
+ConstElementPtr
+HAService::processMaintenanceStart() {
+ switch (getCurrState()) {
+ case HA_BACKUP_ST:
+ case HA_IN_MAINTENANCE_ST:
+ case HA_PARTNER_IN_MAINTENANCE_ST:
+ case HA_TERMINATED_ST:
+ return (createAnswer(CONTROL_RESULT_ERROR, "Unable to transition the server from"
+ " the " + stateToString(getCurrState()) + " to"
+ " partner-in-maintenance state."));
+ default:
+ ;
+ }
+
+ HAConfig::PeerConfigPtr remote_config = config_->getFailoverPeerConfig();
+
+ // Create HTTP/1.1 request including ha-maintenance-notify command
+ // with the cancel flag set to false.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(remote_config->getUrl().getStrippedHostname()));
+ remote_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createMaintenanceNotify(false, server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ IOService io_service;
+ HttpClient client(io_service);
+
+ boost::system::error_code captured_ec;
+ std::string captured_error_message;
+ int captured_rcode = 0;
+
+ // Schedule asynchronous HTTP request.
+ client.asyncSendRequest(remote_config->getUrl(),
+ remote_config->getTlsContext(),
+ request, response,
+ [this, remote_config, &io_service, &captured_ec, &captured_error_message,
+ &captured_rcode]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ io_service.stop();
+
+ // There are three possible groups of errors. One is the IO error
+ // causing issues in communication with the peer. Another one is
+ // an HTTP parsing error. The last type of error is when non-success
+ // error code is returned in the response carried in the HTTP message
+ // or if the JSON response is otherwise broken.
+
+ std::string error_message;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_MAINTENANCE_NOTIFY_COMMUNICATIONS_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ static_cast<void>(verifyAsyncResponse(response, captured_rcode));
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_MAINTENANCE_NOTIFY_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+ }
+
+ captured_ec = ec;
+ captured_error_message = error_message;
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+
+ // Run the IO service until it is stopped by any of the callbacks. This
+ // makes it synchronous.
+ io_service.run();
+
+ // If there was a communication problem with the partner we assume that
+ // the partner is already down while we receive this command.
+ if (captured_ec || (captured_rcode == CONTROL_RESULT_ERROR)) {
+ postNextEvent(HA_MAINTENANCE_START_EVT);
+ verboseTransition(HA_PARTNER_DOWN_ST);
+ runModel(NOP_EVT);
+ return (createAnswer(CONTROL_RESULT_SUCCESS,
+ "Server is now in the partner-down state as its"
+ " partner appears to be offline for maintenance."));
+
+ } else if (captured_rcode == CONTROL_RESULT_SUCCESS) {
+ // If the partner responded indicating no error it means that the
+ // partner has been transitioned to the in-maintenance state. In that
+ // case we transition to the partner-in-maintenance state.
+ postNextEvent(HA_MAINTENANCE_START_EVT);
+ verboseTransition(HA_PARTNER_IN_MAINTENANCE_ST);
+ runModel(NOP_EVT);
+
+ } else {
+ // Partner server returned a special status code which means that it can't
+ // transition to the partner-in-maintenance state.
+ return (createAnswer(CONTROL_RESULT_ERROR, "Unable to transition to the"
+ " partner-in-maintenance state. The partner server responded"
+ " with the following message to the ha-maintenance-notify"
+ " command: " + captured_error_message + "."));
+
+ }
+
+ return (createAnswer(CONTROL_RESULT_SUCCESS,
+ "Server is now in the partner-in-maintenance state"
+ " and its partner is in-maintenance state. The partner"
+ " can be now safely shut down."));
+}
+
+ConstElementPtr
+HAService::processMaintenanceCancel() {
+ if (getCurrState() != HA_PARTNER_IN_MAINTENANCE_ST) {
+ return (createAnswer(CONTROL_RESULT_ERROR, "Unable to cancel maintenance"
+ " request because the server is not in the"
+ " partner-in-maintenance state."));
+ }
+
+ HAConfig::PeerConfigPtr remote_config = config_->getFailoverPeerConfig();
+
+ // Create HTTP/1.1 request including ha-maintenance-notify command
+ // with the cancel flag set to true.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(remote_config->getUrl().getStrippedHostname()));
+ remote_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createMaintenanceNotify(true, server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ IOService io_service;
+ HttpClient client(io_service);
+
+ std::string error_message;
+
+ // Schedule asynchronous HTTP request.
+ client.asyncSendRequest(remote_config->getUrl(),
+ remote_config->getTlsContext(),
+ request, response,
+ [this, remote_config, &io_service, &error_message]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ io_service.stop();
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_MAINTENANCE_NOTIFY_CANCEL_COMMUNICATIONS_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ int rcode = 0;
+ static_cast<void>(verifyAsyncResponse(response, rcode));
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_MAINTENANCE_NOTIFY_CANCEL_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+ }
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+
+ // Run the IO service until it is stopped by any of the callbacks. This
+ // makes it synchronous.
+ io_service.run();
+
+ // There was an error in communication with the partner or the
+ // partner was unable to revert its state.
+ if (!error_message.empty()) {
+ return (createAnswer(CONTROL_RESULT_ERROR,
+ "Unable to cancel maintenance. The partner server responded"
+ " with the following message to the ha-maintenance-notify"
+ " command: " + error_message + "."));
+ }
+
+ // Successfully reverted partner's state. Let's also revert our state to the
+ // previous one.
+ postNextEvent(HA_MAINTENANCE_CANCEL_EVT);
+ verboseTransition(getPrevState());
+ runModel(NOP_EVT);
+
+ return (createAnswer(CONTROL_RESULT_SUCCESS,
+ "Server maintenance successfully canceled."));
+}
+
+void
+HAService::asyncSyncCompleteNotify(HttpClient& http_client,
+ const std::string& server_name,
+ PostRequestCallback post_request_action) {
+ HAConfig::PeerConfigPtr remote_config = config_->getPeerConfig(server_name);
+
+ // Create HTTP/1.1 request including our command.
+ PostHttpRequestJsonPtr request = boost::make_shared<PostHttpRequestJson>
+ (HttpRequest::Method::HTTP_POST, "/", HttpVersion::HTTP_11(),
+ HostHttpHeader(remote_config->getUrl().getStrippedHostname()));
+
+ remote_config->addBasicAuthHttpHeader(request);
+ request->setBodyAsJson(CommandCreator::createSyncCompleteNotify(server_type_));
+ request->finalize();
+
+ // Response object should also be created because the HTTP client needs
+ // to know the type of the expected response.
+ HttpResponseJsonPtr response = boost::make_shared<HttpResponseJson>();
+
+ // Schedule asynchronous HTTP request.
+ http_client.asyncSendRequest(remote_config->getUrl(),
+ remote_config->getTlsContext(),
+ request, response,
+ [this, remote_config, post_request_action]
+ (const boost::system::error_code& ec,
+ const HttpResponsePtr& response,
+ const std::string& error_str) {
+
+ // There are three possible groups of errors. One is the IO error
+ // causing issues in communication with the peer. Another one is an
+ // HTTP parsing error. The last type of error is when non-success
+ // error code is returned in the response carried in the HTTP message
+ // or if the JSON response is otherwise broken.
+
+ int rcode = 0;
+ std::string error_message;
+
+ // Handle first two groups of errors.
+ if (ec || !error_str.empty()) {
+ error_message = (ec ? ec.message() : error_str);
+ LOG_ERROR(ha_logger, HA_SYNC_COMPLETE_NOTIFY_COMMUNICATIONS_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+
+ } else {
+
+ // Handle third group of errors.
+ try {
+ static_cast<void>(verifyAsyncResponse(response, rcode));
+
+ } catch (const CommandUnsupportedError& ex) {
+ rcode = CONTROL_RESULT_COMMAND_UNSUPPORTED;
+
+ } catch (const std::exception& ex) {
+ error_message = ex.what();
+ LOG_ERROR(ha_logger, HA_SYNC_COMPLETE_NOTIFY_FAILED)
+ .arg(remote_config->getLogLabel())
+ .arg(error_message);
+ }
+ }
+
+ // If there was an error communicating with the partner, mark the
+ // partner as unavailable.
+ if (!error_message.empty()) {
+ communication_state_->setPartnerState("unavailable");
+ }
+
+ // Invoke post request action if it was specified.
+ if (post_request_action) {
+ post_request_action(error_message.empty(),
+ error_message,
+ rcode);
+ }
+ },
+ HttpClient::RequestTimeout(TIMEOUT_DEFAULT_HTTP_CLIENT_REQUEST),
+ std::bind(&HAService::clientConnectHandler, this, ph::_1, ph::_2),
+ std::bind(&HAService::clientHandshakeHandler, this, ph::_1),
+ std::bind(&HAService::clientCloseHandler, this, ph::_1)
+ );
+}
+
+ConstElementPtr
+HAService::processSyncCompleteNotify() {
+ if (getCurrState() == HA_PARTNER_DOWN_ST) {
+ sync_complete_notified_ = true;
+ } else {
+ localEnableDHCPService();
+ }
+ return (createAnswer(CONTROL_RESULT_SUCCESS,
+ "Server successfully notified about the synchronization completion."));
+}
+
+ConstElementPtr
+HAService::verifyAsyncResponse(const HttpResponsePtr& response, int& rcode) {
+ // Set the return code to error in case of early throw.
+ rcode = CONTROL_RESULT_ERROR;
+ // The response must cast to JSON type.
+ HttpResponseJsonPtr json_response =
+ boost::dynamic_pointer_cast<HttpResponseJson>(response);
+ if (!json_response) {
+ isc_throw(CtrlChannelError, "no valid HTTP response found");
+ }
+
+ // Body holds the response to our command.
+ ConstElementPtr body = json_response->getBodyAsJson();
+ if (!body) {
+ isc_throw(CtrlChannelError, "no body found in the response");
+ }
+
+ // Body should contain a list of responses from multiple servers.
+ if (body->getType() != Element::list) {
+ // Some control agent errors are returned as a map.
+ if (body->getType() == Element::map) {
+ ElementPtr list = Element::createList();
+ ElementPtr answer = Element::createMap();
+ answer->set(CONTROL_RESULT, Element::create(rcode));
+ ConstElementPtr text = body->get(CONTROL_TEXT);
+ if (text) {
+ answer->set(CONTROL_TEXT, text);
+ }
+ list->add(answer);
+ body = list;
+ } else {
+ isc_throw(CtrlChannelError, "body of the response must be a list");
+ }
+ }
+
+ // There must be at least one response.
+ if (body->empty()) {
+ isc_throw(CtrlChannelError, "list of responses must not be empty");
+ }
+
+ // Check if the status code of the first response. We don't support multiple
+ // at this time, because we always send a request to a single location.
+ ConstElementPtr args = parseAnswer(rcode, body->get(0));
+ if ((rcode != CONTROL_RESULT_SUCCESS) &&
+ (rcode != CONTROL_RESULT_EMPTY)) {
+ std::ostringstream s;
+ // Include an error text if available.
+ if (args && args->getType() == Element::string) {
+ s << args->stringValue() << ", ";
+ }
+ // Include an error code.
+ s << "error code " << rcode;
+
+ if (rcode == CONTROL_RESULT_COMMAND_UNSUPPORTED) {
+ isc_throw(CommandUnsupportedError, s.str());
+ } else {
+ isc_throw(CtrlChannelError, s.str());
+ }
+ }
+
+ return (args);
+}
+
+bool
+HAService::clientConnectHandler(const boost::system::error_code& ec, int tcp_native_fd) {
+
+ // If client is running it's own IOService we do NOT want to
+ // register the socket with IfaceMgr.
+ if (client_->getThreadIOService()) {
+ return (true);
+ }
+
+ // If things look OK register the socket with Interface Manager. Note
+ // we don't register if the FD is < 0 to avoid an exception throw.
+ // It is unlikely that this will occur but we want to be liberal
+ // and avoid issues.
+ if ((!ec || (ec.value() == boost::asio::error::in_progress))
+ && (tcp_native_fd >= 0)) {
+ // External socket callback is a NOP. Ready events handlers are
+ // run by an explicit call IOService ready in kea-dhcp<n> code.
+ // We are registering the socket only to interrupt main-thread
+ // select().
+ IfaceMgr::instance().addExternalSocket(tcp_native_fd,
+ std::bind(&HAService::socketReadyHandler, this, ph::_1)
+ );
+ }
+
+ // If ec.value() == boost::asio::error::already_connected, we should already
+ // be registered, so nothing to do. If it is any other value, then connect
+ // failed and Connection logic should handle that, not us, so no matter
+ // what happens we're returning true.
+ return (true);
+}
+
+void
+HAService::socketReadyHandler(int tcp_native_fd) {
+ // If the socket is ready but does not belong to one of our client's
+ // ongoing transactions, we close it. This will unregister it from
+ // IfaceMgr and ensure the client starts over with a fresh connection
+ // if it needs to do so.
+ client_->closeIfOutOfBand(tcp_native_fd);
+}
+
+void
+HAService::clientCloseHandler(int tcp_native_fd) {
+ if (tcp_native_fd >= 0) {
+ IfaceMgr::instance().deleteExternalSocket(tcp_native_fd);
+ }
+};
+
+size_t
+HAService::pendingRequestSize() {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return (pending_requests_.size());
+ } else {
+ return (pending_requests_.size());
+ }
+}
+
+template<typename QueryPtrType>
+int
+HAService::getPendingRequest(const QueryPtrType& query) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return (getPendingRequestInternal(query));
+ } else {
+ return (getPendingRequestInternal(query));
+ }
+}
+
+template<typename QueryPtrType>
+int
+HAService::getPendingRequestInternal(const QueryPtrType& query) {
+ if (pending_requests_.count(query) == 0) {
+ return (0);
+ } else {
+ return (pending_requests_[query]);
+ }
+}
+
+void
+HAService::checkPermissionsClientAndListener() {
+ // Since this function is used as CS callback all exceptions must be
+ // suppressed (except the @ref MultiThreadingInvalidOperation), unlikely
+ // though they may be.
+ // The @ref MultiThreadingInvalidOperation is propagated to the scope of the
+ // @ref MultiThreadingCriticalSection constructor.
+ try {
+ if (client_) {
+ client_->checkPermissions();
+ }
+
+ if (listener_) {
+ listener_->checkPermissions();
+ }
+ } catch (const isc::MultiThreadingInvalidOperation& ex) {
+ LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_ILLEGAL)
+ .arg(ex.what());
+ // The exception needs to be propagated to the caller of the
+ // @ref MultiThreadingCriticalSection constructor.
+ throw;
+ } catch (const std::exception& ex) {
+ LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_FAILED)
+ .arg(ex.what());
+ }
+}
+
+void
+HAService::startClientAndListener() {
+ // Add critical section callbacks.
+ MultiThreadingMgr::instance().addCriticalSectionCallbacks("HA_MT",
+ std::bind(&HAService::checkPermissionsClientAndListener, this),
+ std::bind(&HAService::pauseClientAndListener, this),
+ std::bind(&HAService::resumeClientAndListener, this));
+
+ if (client_) {
+ client_->start();
+ }
+
+ if (listener_) {
+ listener_->start();
+ }
+}
+
+void
+HAService::pauseClientAndListener() {
+ // Since this function is used as CS callback all exceptions must be
+ // suppressed, unlikely though they may be.
+ try {
+ if (client_) {
+ client_->pause();
+ }
+
+ if (listener_) {
+ listener_->pause();
+ }
+ } catch (const std::exception& ex) {
+ LOG_ERROR(ha_logger, HA_PAUSE_CLIENT_LISTENER_FAILED)
+ .arg(ex.what());
+ }
+}
+
+void
+HAService::resumeClientAndListener() {
+ // Since this function is used as CS callback all exceptions must be
+ // suppressed, unlikely though they may be.
+ try {
+ if (client_) {
+ client_->resume();
+ }
+
+ if (listener_) {
+ listener_->resume();
+ }
+ } catch (std::exception& ex) {
+ LOG_ERROR(ha_logger, HA_RESUME_CLIENT_LISTENER_FAILED)
+ .arg(ex.what());
+ }
+}
+
+void
+HAService::stopClientAndListener() {
+ // Remove critical section callbacks.
+ MultiThreadingMgr::instance().removeCriticalSectionCallbacks("HA_MT");
+
+ if (client_) {
+ client_->stop();
+ }
+
+ if (listener_) {
+ listener_->stop();
+ }
+}
+
+// Explicit instantiations.
+template int HAService::getPendingRequest(const Pkt4Ptr&);
+template int HAService::getPendingRequest(const Pkt6Ptr&);
+
+} // end of namespace isc::ha
+} // end of namespace isc