diff options
Diffstat (limited to 'src/hooks/dhcp/high_availability/communication_state.cc')
-rw-r--r-- | src/hooks/dhcp/high_availability/communication_state.cc | 804 |
1 files changed, 804 insertions, 0 deletions
diff --git a/src/hooks/dhcp/high_availability/communication_state.cc b/src/hooks/dhcp/high_availability/communication_state.cc new file mode 100644 index 0000000..c7e2837 --- /dev/null +++ b/src/hooks/dhcp/high_availability/communication_state.cc @@ -0,0 +1,804 @@ +// Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC") +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <config.h> + +#include <communication_state.h> +#include <ha_log.h> +#include <ha_service_states.h> +#include <cc/data.h> +#include <exceptions/exceptions.h> +#include <dhcp/dhcp4.h> +#include <dhcp/dhcp6.h> +#include <dhcp/option_int.h> +#include <dhcp/pkt4.h> +#include <dhcp/pkt6.h> +#include <http/date_time.h> +#include <util/boost_time_utils.h> +#include <util/multi_threading_mgr.h> + +#include <boost/pointer_cast.hpp> + +#include <functional> +#include <limits> +#include <sstream> +#include <utility> + +using namespace isc::asiolink; +using namespace isc::data; +using namespace isc::dhcp; +using namespace isc::http; +using namespace isc::log; +using namespace isc::util; + +using namespace boost::posix_time; +using namespace std; + +namespace { + +/// @brief Warning is issued if the clock skew exceeds this value. +constexpr long WARN_CLOCK_SKEW = 30; + +/// @brief HA service terminates if the clock skew exceeds this value. +constexpr long TERM_CLOCK_SKEW = 60; + +/// @brief Minimum time between two consecutive clock skew warnings. +constexpr long MIN_TIME_SINCE_CLOCK_SKEW_WARN = 60; + +} + +namespace isc { +namespace ha { + +CommunicationState::CommunicationState(const IOServicePtr& io_service, + const HAConfigPtr& config) + : io_service_(io_service), config_(config), timer_(), interval_(0), + poke_time_(boost::posix_time::microsec_clock::universal_time()), + heartbeat_impl_(0), partner_state_(-1), partner_scopes_(), + clock_skew_(0, 0, 0, 0), last_clock_skew_warn_(), + my_time_at_skew_(), partner_time_at_skew_(), + analyzed_messages_count_(0), unsent_update_count_(0), + partner_unsent_update_count_{0, 0}, mutex_(new mutex()) { +} + +CommunicationState::~CommunicationState() { + stopHeartbeat(); +} + +void +CommunicationState::modifyPokeTime(const long secs) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + poke_time_ += boost::posix_time::seconds(secs); + } else { + poke_time_ += boost::posix_time::seconds(secs); + } +} + +int +CommunicationState::getPartnerState() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (partner_state_); + } else { + return (partner_state_); + } +} + +void +CommunicationState::setPartnerState(const std::string& state) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + setPartnerStateInternal(state); + } else { + setPartnerStateInternal(state); + } +} + +void +CommunicationState::setPartnerStateInternal(const std::string& state) { + try { + partner_state_ = stringToState(state); + } catch (...) { + isc_throw(BadValue, "unsupported HA partner state returned " + << state); + } +} + +std::set<std::string> +CommunicationState::getPartnerScopes() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (partner_scopes_); + } else { + return (partner_scopes_); + } +} + +void +CommunicationState::setPartnerScopes(ConstElementPtr new_scopes) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + setPartnerScopesInternal(new_scopes); + } else { + setPartnerScopesInternal(new_scopes); + } +} + +void +CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) { + if (!new_scopes || (new_scopes->getType() != Element::list)) { + isc_throw(BadValue, "unable to record partner's HA scopes because" + " the received value is not a valid JSON list"); + } + + std::set<std::string> partner_scopes; + for (auto i = 0; i < new_scopes->size(); ++i) { + auto scope = new_scopes->get(i); + if (scope->getType() != Element::string) { + isc_throw(BadValue, "unable to record partner's HA scopes because" + " the received scope value is not a valid JSON string"); + } + auto scope_str = scope->stringValue(); + if (!scope_str.empty()) { + partner_scopes.insert(scope_str); + } + } + partner_scopes_ = partner_scopes; +} + +void +CommunicationState::startHeartbeat(const long interval, + const std::function<void()>& heartbeat_impl) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + startHeartbeatInternal(interval, heartbeat_impl); + } else { + startHeartbeatInternal(interval, heartbeat_impl); + } +} + +void +CommunicationState::startHeartbeatInternal(const long interval, + const std::function<void()>& heartbeat_impl) { + bool settings_modified = false; + + // If we're setting the heartbeat for the first time, it should + // be non-null. + if (heartbeat_impl) { + settings_modified = true; + heartbeat_impl_ = heartbeat_impl; + + } else if (!heartbeat_impl_) { + // The heartbeat is re-scheduled but we have no historic implementation + // pointer we could re-use. This is a programmatic issue. + isc_throw(BadValue, "unable to start heartbeat when pointer" + " to the heartbeat implementation is not specified"); + } + + // If we're setting the heartbeat for the first time, the interval + // should be greater than 0. + if (interval != 0) { + settings_modified |= (interval_ != interval); + interval_ = interval; + + } else if (interval_ <= 0) { + // The heartbeat is re-scheduled but we have no historic interval + // which we could re-use. This is a programmatic issue. + heartbeat_impl_ = 0; + isc_throw(BadValue, "unable to start heartbeat when interval" + " for the heartbeat timer is not specified"); + } + + if (!timer_) { + timer_.reset(new IntervalTimer(*io_service_)); + } + + if (settings_modified) { + timer_->setup(heartbeat_impl_, interval_, IntervalTimer::ONE_SHOT); + } +} + +void +CommunicationState::stopHeartbeat() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + stopHeartbeatInternal(); + } else { + stopHeartbeatInternal(); + } +} + +void +CommunicationState::stopHeartbeatInternal() { + if (timer_) { + timer_->cancel(); + timer_.reset(); + interval_ = 0; + heartbeat_impl_ = 0; + } +} + +bool +CommunicationState::isHeartbeatRunning() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (static_cast<bool>(timer_)); + } else { + return (static_cast<bool>(timer_)); + } +} + +boost::posix_time::time_duration +CommunicationState::updatePokeTime() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (updatePokeTimeInternal()); + } else { + return (updatePokeTimeInternal()); + } +} + +boost::posix_time::time_duration +CommunicationState::updatePokeTimeInternal() { + // Remember previous poke time. + boost::posix_time::ptime prev_poke_time = poke_time_; + // Set poke time to the current time. + poke_time_ = boost::posix_time::microsec_clock::universal_time(); + return (poke_time_ - prev_poke_time); +} + +void +CommunicationState::poke() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + pokeInternal(); + } else { + pokeInternal(); + } +} + +void +CommunicationState::pokeInternal() { + // Update poke time and compute duration. + boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal(); + + // If we have been tracking the DHCP messages directed to the partner, + // we need to clear any gathered information because the connection + // seems to be (re)established. + clearConnectingClients(); + analyzed_messages_count_ = 0; + + if (timer_) { + // Check the duration since last poke. If it is less than a second, we don't + // want to reschedule the timer. In order to avoid the overhead of + // re-scheduling the timer too frequently we reschedule it only if the + // duration is 1s or more. This matches the time resolution for heartbeats. + if (duration_since_poke.total_seconds() > 0) { + // A poke causes the timer to be re-scheduled to prevent it + // from triggering a heartbeat shortly after confirming the + // connection is ok. + startHeartbeatInternal(); + } + } +} + +int64_t +CommunicationState::getDurationInMillisecs() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (getDurationInMillisecsInternal()); + } else { + return (getDurationInMillisecsInternal()); + } +} + +int64_t +CommunicationState::getDurationInMillisecsInternal() const { + ptime now = boost::posix_time::microsec_clock::universal_time(); + time_duration duration = now - poke_time_; + return (duration.total_milliseconds()); +} + +bool +CommunicationState::isCommunicationInterrupted() const { + return (getDurationInMillisecs() > config_->getMaxResponseDelay()); +} + +size_t +CommunicationState::getAnalyzedMessagesCount() const { + return (analyzed_messages_count_); +} + +bool +CommunicationState::clockSkewShouldWarn() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (clockSkewShouldWarnInternal()); + } else { + return (clockSkewShouldWarnInternal()); + } +} + +bool +CommunicationState::clockSkewShouldWarnInternal() { + // First check if the clock skew is beyond the threshold. + if (isClockSkewGreater(WARN_CLOCK_SKEW)) { + + // In order to prevent to frequent warnings we provide a gating mechanism + // which doesn't allow for issuing a warning earlier than 60 seconds after + // the previous one. + + // Find the current time and the duration since last warning. + ptime now = boost::posix_time::microsec_clock::universal_time(); + time_duration since_warn_duration = now - last_clock_skew_warn_; + + // If the last warning was issued more than 60 seconds ago or it is a + // first warning, we need to update the last warning timestamp and return + // true to indicate that new warning should be issued. + if (last_clock_skew_warn_.is_not_a_date_time() || + (since_warn_duration.total_seconds() > MIN_TIME_SINCE_CLOCK_SKEW_WARN)) { + last_clock_skew_warn_ = now; + LOG_WARN(ha_logger, HA_HIGH_CLOCK_SKEW) + .arg(logFormatClockSkewInternal()); + return (true); + } + } + + // The warning should not be issued. + return (false); +} + +bool +CommunicationState::clockSkewShouldTerminate() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + // Issue a warning if the clock skew is greater than 60s. + return (clockSkewShouldTerminateInternal()); + } else { + return (clockSkewShouldTerminateInternal()); + } +} + +bool +CommunicationState::clockSkewShouldTerminateInternal() const { + if (isClockSkewGreater(TERM_CLOCK_SKEW)) { + LOG_ERROR(ha_logger, HA_HIGH_CLOCK_SKEW_CAUSES_TERMINATION) + .arg(logFormatClockSkewInternal()); + return (true); + } + + return (false); +} + +bool +CommunicationState::isClockSkewGreater(const long seconds) const { + return ((clock_skew_.total_seconds() > seconds) || + (clock_skew_.total_seconds() < -seconds)); +} + +void +CommunicationState::setPartnerTime(const std::string& time_text) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + setPartnerTimeInternal(time_text); + } else { + setPartnerTimeInternal(time_text); + } +} + +void +CommunicationState::setPartnerTimeInternal(const std::string& time_text) { + partner_time_at_skew_ = HttpDateTime().fromRfc1123(time_text).getPtime(); + my_time_at_skew_ = HttpDateTime().getPtime(); + clock_skew_ = partner_time_at_skew_ - my_time_at_skew_; +} + +std::string +CommunicationState::logFormatClockSkew() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (logFormatClockSkewInternal()); + } else { + return (logFormatClockSkewInternal()); + } +} + +std::string +CommunicationState::logFormatClockSkewInternal() const { + std::ostringstream os; + + if ((my_time_at_skew_.is_not_a_date_time()) || + (partner_time_at_skew_.is_not_a_date_time())) { + // Guard against being called before times have been set. + // Otherwise we'll get out-range exceptions. + return ("skew not initialized"); + } + + // Note HttpTime resolution is only to seconds, so we use fractional + // precision of zero when logging. + os << "my time: " << util::ptimeToText(my_time_at_skew_, 0) + << ", partner's time: " << util::ptimeToText(partner_time_at_skew_, 0) + << ", partner's clock is "; + + // If negative clock skew, the partner's time is behind our time. + if (clock_skew_.is_negative()) { + os << clock_skew_.invert_sign().total_seconds() << "s behind"; + } else { + // Partner's time is ahead of ours. + os << clock_skew_.total_seconds() << "s ahead"; + } + + return (os.str()); +} + +ElementPtr +CommunicationState::getReport() const { + auto report = Element::createMap(); + + auto in_touch = (getPartnerState() > 0); + report->set("in-touch", Element::create(in_touch)); + + auto age = in_touch ? static_cast<long long int>(getDurationInMillisecs() / 1000) : 0; + report->set("age", Element::create(age)); + + try { + report->set("last-state", Element::create(stateToString(getPartnerState()))); + + } catch (...) { + report->set("last-state", Element::create(std::string())); + } + + auto list = Element::createList(); + for (auto scope : getPartnerScopes()) { + list->add(Element::create(scope)); + } + report->set("last-scopes", list); + report->set("communication-interrupted", + Element::create(isCommunicationInterrupted())); + report->set("connecting-clients", Element::create(static_cast<long long>(getConnectingClientsCount()))); + report->set("unacked-clients", Element::create(static_cast<long long>(getUnackedClientsCount()))); + + long long unacked_clients_left = 0; + if (isCommunicationInterrupted() && (config_->getMaxUnackedClients() >= getUnackedClientsCount())) { + unacked_clients_left = static_cast<long long>(config_->getMaxUnackedClients() - + getUnackedClientsCount() + 1); + } + report->set("unacked-clients-left", Element::create(unacked_clients_left)); + report->set("analyzed-packets", Element::create(static_cast<long long>(getAnalyzedMessagesCount()))); + + return (report); +} + +uint64_t +CommunicationState::getUnsentUpdateCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (unsent_update_count_); + } else { + return (unsent_update_count_); + } +} + +void +CommunicationState::increaseUnsentUpdateCount() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + increaseUnsentUpdateCountInternal(); + } else { + increaseUnsentUpdateCountInternal(); + } +} + +void +CommunicationState::increaseUnsentUpdateCountInternal() { + // Protect against setting the incremented value to zero. + // The zero value is reserved for a server startup. + if (unsent_update_count_ < std::numeric_limits<uint64_t>::max()) { + ++unsent_update_count_; + } else { + unsent_update_count_ = 1; + } +} + +bool +CommunicationState::hasPartnerNewUnsentUpdates() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (hasPartnerNewUnsentUpdatesInternal()); + } else { + return (hasPartnerNewUnsentUpdatesInternal()); + } +} + +bool +CommunicationState::hasPartnerNewUnsentUpdatesInternal() const { + return (partner_unsent_update_count_.second > 0 && + (partner_unsent_update_count_.first != partner_unsent_update_count_.second)); +} + +void +CommunicationState::setPartnerUnsentUpdateCount(uint64_t unsent_update_count) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + setPartnerUnsentUpdateCountInternal(unsent_update_count); + } else { + setPartnerUnsentUpdateCountInternal(unsent_update_count); + } +} + +void +CommunicationState::setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count) { + partner_unsent_update_count_.first = partner_unsent_update_count_.second; + partner_unsent_update_count_.second = unsent_update_count; +} + +CommunicationState4::CommunicationState4(const IOServicePtr& io_service, + const HAConfigPtr& config) + : CommunicationState(io_service, config), connecting_clients_() { +} + +void +CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + analyzeMessageInternal(message); + } else { + analyzeMessageInternal(message); + } +} + +void +CommunicationState4::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) { + // The DHCP message must successfully cast to a Pkt4 object. + Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message); + if (!msg) { + isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv4 message"); + } + + ++analyzed_messages_count_; + + // Check value of the "secs" field by comparing it with the configured + // threshold. + uint16_t secs = msg->getSecs(); + + // It was observed that some Windows clients may send swapped bytes in the + // "secs" field. When the second byte is 0 and the first byte is non-zero + // we consider bytes to be swapped and so we correct them. + if ((secs > 255) && ((secs & 0xFF) == 0)) { + secs = ((secs >> 8) | (secs << 8)); + } + + // Check the value of the "secs" field. The "secs" field holds a value in + // seconds, hence we have to multiple by 1000 to get a value in milliseconds. + // If the secs value is above the threshold, it means that the current + // client should be considered unacked. + auto unacked = (secs * 1000 > config_->getMaxAckDelay()); + + // Client identifier will be stored together with the hardware address. It + // may remain empty if the client hasn't specified it. + std::vector<uint8_t> client_id; + OptionPtr opt_client_id = msg->getOption(DHO_DHCP_CLIENT_IDENTIFIER); + if (opt_client_id) { + client_id = opt_client_id->getData(); + } + + bool log_unacked = false; + + // Check if the given client was already recorded. + auto& idx = connecting_clients_.get<0>(); + auto existing_request = idx.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id)); + if (existing_request != idx.end()) { + // If the client was recorded and was not considered unacked + // but it should be considered unacked as a result of processing + // this packet, let's update the recorded request to mark the + // client unacked. + if (!existing_request->unacked_ && unacked) { + ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked }; + idx.replace(existing_request, connecting_client); + log_unacked = true; + } + + } else { + // This is the first time we see the packet from this client. Let's + // record it. + ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked }; + idx.insert(connecting_client); + log_unacked = unacked; + + if (!unacked) { + // This is the first time we see this client after getting into the + // communication interrupted state. But, this client hasn't been + // yet trying log enough to be considered unacked. + LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4) + .arg(message->getLabel()); + } + } + + // Only log the first time we detect a client is unacked. + if (log_unacked) { + unsigned unacked_left = 0; + unsigned unacked_total = connecting_clients_.get<1>().count(true); + if (config_->getMaxUnackedClients() >= unacked_total) { + unacked_left = config_->getMaxUnackedClients() - unacked_total + 1; + } + LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED) + .arg(message->getLabel()) + .arg(unacked_total) + .arg(unacked_left); + } +} + +bool +CommunicationState4::failureDetected() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (failureDetectedInternal()); + } else { + return (failureDetectedInternal()); + } +} + +bool +CommunicationState4::failureDetectedInternal() const { + return ((config_->getMaxUnackedClients() == 0) || + (connecting_clients_.get<1>().count(true) > + config_->getMaxUnackedClients())); +} + +size_t +CommunicationState4::getConnectingClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (connecting_clients_.size()); + } else { + return (connecting_clients_.size()); + } +} + +size_t +CommunicationState4::getUnackedClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (connecting_clients_.get<1>().count(true)); + } else { + return (connecting_clients_.get<1>().count(true)); + } +} + +void +CommunicationState4::clearConnectingClients() { + connecting_clients_.clear(); +} + +CommunicationState6::CommunicationState6(const IOServicePtr& io_service, + const HAConfigPtr& config) + : CommunicationState(io_service, config), connecting_clients_() { +} + +void +CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + analyzeMessageInternal(message); + } else { + analyzeMessageInternal(message); + } +} + +void +CommunicationState6::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) { + // The DHCP message must successfully cast to a Pkt6 object. + Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message); + if (!msg) { + isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv6 message"); + } + + ++analyzed_messages_count_; + + // Check the value of the "elapsed time" option. If it is below the threshold + // there is nothing to do. The "elapsed time" option holds the time in + // 1/100 of second, hence we have to multiply by 10 to get a value in milliseconds. + OptionUint16Ptr elapsed_time = boost::dynamic_pointer_cast< + OptionUint16>(msg->getOption(D6O_ELAPSED_TIME)); + auto unacked = (elapsed_time && elapsed_time->getValue() * 10 > config_->getMaxAckDelay()); + + // Get the DUID of the client to see if it hasn't been recorded already. + OptionPtr duid = msg->getOption(D6O_CLIENTID); + if (!duid) { + return; + } + + bool log_unacked = false; + + // Check if the given client was already recorded. + auto& idx = connecting_clients_.get<0>(); + auto existing_request = idx.find(duid->getData()); + if (existing_request != idx.end()) { + // If the client was recorded and was not considered unacked + // but it should be considered unacked as a result of processing + // this packet, let's update the recorded request to mark the + // client unacked. + if (!existing_request->unacked_ && unacked) { + ConnectingClient6 connecting_client{ duid->getData(), unacked }; + idx.replace(existing_request, connecting_client); + log_unacked = true; + } + + } else { + // This is the first time we see the packet from this client. Let's + // record it. + ConnectingClient6 connecting_client{ duid->getData(), unacked }; + idx.insert(connecting_client); + log_unacked = unacked; + + if (!unacked) { + // This is the first time we see this client after getting into the + // communication interrupted state. But, this client hasn't been + // yet trying log enough to be considered unacked. + LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6) + .arg(message->getLabel()); + } + } + + // Only log the first time we detect a client is unacked. + if (log_unacked) { + unsigned unacked_left = 0; + unsigned unacked_total = connecting_clients_.get<1>().count(true); + if (config_->getMaxUnackedClients() >= unacked_total) { + unacked_left = config_->getMaxUnackedClients() - unacked_total + 1; + } + LOG_INFO(ha_logger, HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED) + .arg(message->getLabel()) + .arg(unacked_total) + .arg(unacked_left); + } +} + +bool +CommunicationState6::failureDetected() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (failureDetectedInternal()); + } else { + return (failureDetectedInternal()); + } +} + +bool +CommunicationState6::failureDetectedInternal() const { + return ((config_->getMaxUnackedClients() == 0) || + (connecting_clients_.get<1>().count(true) > + config_->getMaxUnackedClients())); +} + +size_t +CommunicationState6::getConnectingClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (connecting_clients_.size()); + } else { + return (connecting_clients_.size()); + } +} + +size_t +CommunicationState6::getUnackedClientsCount() const { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard<std::mutex> lk(*mutex_); + return (connecting_clients_.get<1>().count(true)); + } else { + return (connecting_clients_.get<1>().count(true)); + } +} + +void +CommunicationState6::clearConnectingClients() { + connecting_clients_.clear(); +} + +} // end of namespace isc::ha +} // end of namespace isc |