summaryrefslogtreecommitdiffstats
path: root/src/lib/dhcp_ddns/ncr_io.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:15:43 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:15:43 +0000
commitf5f56e1a1c4d9e9496fcb9d81131066a964ccd23 (patch)
tree49e44c6f87febed37efb953ab5485aa49f6481a7 /src/lib/dhcp_ddns/ncr_io.cc
parentInitial commit. (diff)
downloadisc-kea-f5f56e1a1c4d9e9496fcb9d81131066a964ccd23.tar.xz
isc-kea-f5f56e1a1c4d9e9496fcb9d81131066a964ccd23.zip
Adding upstream version 2.4.1.upstream/2.4.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib/dhcp_ddns/ncr_io.cc')
-rw-r--r--src/lib/dhcp_ddns/ncr_io.cc499
1 files changed, 499 insertions, 0 deletions
diff --git a/src/lib/dhcp_ddns/ncr_io.cc b/src/lib/dhcp_ddns/ncr_io.cc
new file mode 100644
index 0000000..220fb3b
--- /dev/null
+++ b/src/lib/dhcp_ddns/ncr_io.cc
@@ -0,0 +1,499 @@
+// Copyright (C) 2013-2020 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <asiolink/asio_wrapper.h>
+#include <dhcp_ddns/dhcp_ddns_log.h>
+#include <dhcp_ddns/ncr_io.h>
+#include <util/multi_threading_mgr.h>
+
+#include <boost/algorithm/string/predicate.hpp>
+
+#include <mutex>
+
+namespace isc {
+namespace dhcp_ddns {
+
+using namespace isc::util;
+using namespace std;
+
+NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
+ if (boost::iequals(protocol_str, "UDP")) {
+ return (NCR_UDP);
+ }
+
+ if (boost::iequals(protocol_str, "TCP")) {
+ return (NCR_TCP);
+ }
+
+ isc_throw(BadValue,
+ "Invalid NameChangeRequest protocol: " << protocol_str);
+}
+
+std::string ncrProtocolToString(NameChangeProtocol protocol) {
+ switch (protocol) {
+ case NCR_UDP:
+ return ("UDP");
+ case NCR_TCP:
+ return ("TCP");
+ default:
+ break;
+ }
+
+ std::ostringstream stream;
+ stream << "UNKNOWN(" << protocol << ")";
+ return (stream.str());
+}
+
+
+//************************** NameChangeListener ***************************
+
+NameChangeListener::NameChangeListener(RequestReceiveHandler&
+ recv_handler)
+ : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
+};
+
+
+void
+NameChangeListener::startListening(isc::asiolink::IOService& io_service) {
+ if (amListening()) {
+ // This amounts to a programmatic error.
+ isc_throw(NcrListenerError, "NameChangeListener is already listening");
+ }
+
+ // Call implementation dependent open.
+ try {
+ open(io_service);
+ } catch (const isc::Exception& ex) {
+ stopListening();
+ isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
+ }
+
+ // Set our status to listening.
+ setListening(true);
+
+ // Start the first asynchronous receive.
+ try {
+ receiveNext();
+ } catch (const isc::Exception& ex) {
+ stopListening();
+ isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
+ }
+}
+
+void
+NameChangeListener::receiveNext() {
+ io_pending_ = true;
+ doReceive();
+}
+
+void
+NameChangeListener::stopListening() {
+ try {
+ // Call implementation dependent close.
+ close();
+ } catch (const isc::Exception &ex) {
+ // Swallow exceptions. If we have some sort of error we'll log
+ // it but we won't propagate the throw.
+ LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR)
+ .arg(ex.what());
+ }
+
+ // Set it false, no matter what. This allows us to at least try to
+ // re-open via startListening().
+ setListening(false);
+}
+
+void
+NameChangeListener::invokeRecvHandler(const Result result,
+ NameChangeRequestPtr& ncr) {
+ // Call the registered application layer handler.
+ // Surround the invocation with a try-catch. The invoked handler is
+ // not supposed to throw, but in the event it does we will at least
+ // report it.
+ try {
+ io_pending_ = false;
+ recv_handler_(result, ncr);
+ } catch (const std::exception& ex) {
+ LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
+ .arg(ex.what());
+ }
+
+ // Start the next IO layer asynchronous receive.
+ // In the event the handler above intervened and decided to stop listening
+ // we need to check that first.
+ if (amListening()) {
+ try {
+ receiveNext();
+ } catch (const isc::Exception& ex) {
+ // It is possible though unlikely, for doReceive to fail without
+ // scheduling the read. While, unlikely, it does mean the callback
+ // will not get called with a failure. A throw here would surface
+ // at the IOService::run (or run variant) invocation. So we will
+ // close the window by invoking the application handler with
+ // a failed result, and let the application layer sort it out.
+ LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_RECV_NEXT_ERROR)
+ .arg(ex.what());
+
+ // Call the registered application layer handler.
+ // Surround the invocation with a try-catch. The invoked handler is
+ // not supposed to throw, but in the event it does we will at least
+ // report it.
+ NameChangeRequestPtr empty;
+ try {
+ io_pending_ = false;
+ recv_handler_(ERROR, empty);
+ } catch (const std::exception& ex) {
+ LOG_ERROR(dhcp_ddns_logger,
+ DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR)
+ .arg(ex.what());
+ }
+ }
+ }
+}
+
+//************************* NameChangeSender ******************************
+
+NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
+ size_t send_queue_max)
+ : sending_(false), send_handler_(send_handler),
+ send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
+
+ // Queue size must be big enough to hold at least 1 entry.
+ setQueueMaxSize(send_queue_max);
+}
+
+void
+NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
+ if (amSending()) {
+ // This amounts to a programmatic error.
+ isc_throw(NcrSenderError, "NameChangeSender is already sending");
+ }
+
+ // Call implementation dependent open.
+ try {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ startSendingInternal(io_service);
+ } else {
+ startSendingInternal(io_service);
+ }
+ } catch (const isc::Exception& ex) {
+ stopSending();
+ isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
+ }
+}
+
+void
+NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
+ // Clear send marker.
+ ncr_to_send_.reset();
+
+ // Remember io service we're given.
+ io_service_ = &io_service;
+ open(io_service);
+
+ // Set our status to sending.
+ setSending(true);
+
+ // If there's any queued already.. we'll start sending.
+ sendNext();
+}
+
+void
+NameChangeSender::stopSending() {
+ // Set it send indicator to false, no matter what. This allows us to at
+ // least try to re-open via startSending(). Also, setting it false now,
+ // allows us to break sendNext() chain in invokeSendHandler.
+ setSending(false);
+
+ // If there is an outstanding IO to complete, attempt to process it.
+ if (ioReady() && io_service_ != NULL) {
+ try {
+ runReadyIO();
+ } catch (const std::exception& ex) {
+ // Swallow exceptions. If we have some sort of error we'll log
+ // it but we won't propagate the throw.
+ LOG_ERROR(dhcp_ddns_logger,
+ DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
+ }
+ }
+
+ try {
+ // Call implementation dependent close.
+ close();
+ } catch (const isc::Exception &ex) {
+ // Swallow exceptions. If we have some sort of error we'll log
+ // it but we won't propagate the throw.
+ LOG_ERROR(dhcp_ddns_logger,
+ DHCP_DDNS_NCR_SEND_CLOSE_ERROR).arg(ex.what());
+ }
+
+ io_service_ = NULL;
+}
+
+void
+NameChangeSender::sendRequest(NameChangeRequestPtr& ncr) {
+ if (!amSending()) {
+ isc_throw(NcrSenderError, "sender is not ready to send");
+ }
+
+ if (!ncr) {
+ isc_throw(NcrSenderError, "request to send is empty");
+ }
+
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ sendRequestInternal(ncr);
+ } else {
+ sendRequestInternal(ncr);
+ }
+}
+
+void
+NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
+ if (send_queue_.size() >= send_queue_max_) {
+ isc_throw(NcrSenderQueueFull,
+ "send queue has reached maximum capacity: "
+ << send_queue_max_);
+ }
+
+ // Put it on the queue.
+ send_queue_.push_back(ncr);
+
+ // Call sendNext to schedule the next one to go.
+ sendNext();
+}
+
+void
+NameChangeSender::sendNext() {
+ if (ncr_to_send_) {
+ // @todo Not sure if there is any risk of getting stuck here but
+ // an interval timer to defend would be good.
+ // In reality, the derivation should ensure they timeout themselves
+ return;
+ }
+
+ // If queue isn't empty, then get one from the front. Note we leave
+ // it on the front of the queue until we successfully send it.
+ if (!send_queue_.empty()) {
+ ncr_to_send_ = send_queue_.front();
+
+ // @todo start defense timer
+ // If a send were to hang and we timed it out, then timeout
+ // handler need to cycle thru open/close ?
+
+ // Call implementation dependent send.
+ doSend(ncr_to_send_);
+ }
+}
+
+void
+NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ invokeSendHandlerInternal(result);
+ } else {
+ invokeSendHandlerInternal(result);
+ }
+}
+
+void
+NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
+ // @todo reset defense timer
+ if (result == SUCCESS) {
+ // It shipped so pull it off the queue.
+ send_queue_.pop_front();
+ }
+
+ // Invoke the completion handler passing in the result and a pointer
+ // the request involved.
+ // Surround the invocation with a try-catch. The invoked handler is
+ // not supposed to throw, but in the event it does we will at least
+ // report it.
+ try {
+ send_handler_(result, ncr_to_send_);
+ } catch (const std::exception& ex) {
+ LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR)
+ .arg(ex.what());
+ }
+
+ // Clear the pending ncr pointer.
+ ncr_to_send_.reset();
+
+ // Set up the next send
+ try {
+ if (amSending()) {
+ sendNext();
+ }
+ } catch (const isc::Exception& ex) {
+ // It is possible though unlikely, for sendNext to fail without
+ // scheduling the send. While, unlikely, it does mean the callback
+ // will not get called with a failure. A throw here would surface
+ // at the IOService::run (or run variant) invocation. So we will
+ // close the window by invoking the application handler with
+ // a failed result, and let the application layer sort it out.
+ LOG_ERROR(dhcp_ddns_logger, DHCP_DDNS_NCR_SEND_NEXT_ERROR)
+ .arg(ex.what());
+
+ // Invoke the completion handler passing in failed result.
+ // Surround the invocation with a try-catch. The invoked handler is
+ // not supposed to throw, but in the event it does we will at least
+ // report it.
+ try {
+ send_handler_(ERROR, ncr_to_send_);
+ } catch (const std::exception& ex) {
+ LOG_ERROR(dhcp_ddns_logger,
+ DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR).arg(ex.what());
+ }
+ }
+}
+
+void
+NameChangeSender::skipNext() {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ skipNextInternal();
+ } else {
+ skipNextInternal();
+ }
+}
+
+void
+NameChangeSender::skipNextInternal() {
+ if (!send_queue_.empty()) {
+ // Discards the request at the front of the queue.
+ send_queue_.pop_front();
+ }
+}
+
+void
+NameChangeSender::clearSendQueue() {
+ if (amSending()) {
+ isc_throw(NcrSenderError, "Cannot clear queue while sending");
+ }
+
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ send_queue_.clear();
+ } else {
+ send_queue_.clear();
+ }
+}
+
+void
+NameChangeSender::setQueueMaxSize(const size_t new_max) {
+ if (new_max == 0) {
+ isc_throw(NcrSenderError, "NameChangeSender:"
+ " queue size must be greater than zero");
+ }
+
+ send_queue_max_ = new_max;
+}
+
+size_t
+NameChangeSender::getQueueSize() const {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ return (getQueueSizeInternal());
+ } else {
+ return (getQueueSizeInternal());
+ }
+}
+
+size_t
+NameChangeSender::getQueueSizeInternal() const {
+ return (send_queue_.size());
+}
+
+const NameChangeRequestPtr&
+NameChangeSender::peekAt(const size_t index) const {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ return (peekAtInternal(index));
+ } else {
+ return (peekAtInternal(index));
+ }
+}
+
+const NameChangeRequestPtr&
+NameChangeSender::peekAtInternal(const size_t index) const {
+ auto size = getQueueSizeInternal();
+ if (index >= size) {
+ isc_throw(NcrSenderError,
+ "NameChangeSender::peekAt peek beyond end of queue attempted"
+ << " index: " << index << " queue size: " << size);
+ }
+
+ return (send_queue_.at(index));
+}
+
+bool
+NameChangeSender::isSendInProgress() const {
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ return ((ncr_to_send_) ? true : false);
+ } else {
+ return ((ncr_to_send_) ? true : false);
+ }
+}
+
+void
+NameChangeSender::assumeQueue(NameChangeSender& source_sender) {
+ if (source_sender.amSending()) {
+ isc_throw(NcrSenderError, "Cannot assume queue:"
+ " source sender is actively sending");
+ }
+
+ if (amSending()) {
+ isc_throw(NcrSenderError, "Cannot assume queue:"
+ " target sender is actively sending");
+ }
+
+ if (getQueueMaxSize() < source_sender.getQueueSize()) {
+ isc_throw(NcrSenderError, "Cannot assume queue:"
+ " source queue count exceeds target queue max");
+ }
+
+ if (MultiThreadingMgr::instance().getMode()) {
+ lock_guard<mutex> lock(*mutex_);
+ assumeQueueInternal(source_sender);
+ } else {
+ assumeQueueInternal(source_sender);
+ }
+}
+
+void
+NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
+ if (!send_queue_.empty()) {
+ isc_throw(NcrSenderError, "Cannot assume queue:"
+ " target queue is not empty");
+ }
+
+ send_queue_.swap(source_sender.getSendQueue());
+}
+
+int
+NameChangeSender::getSelectFd() {
+ isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
+}
+
+void
+NameChangeSender::runReadyIO() {
+ if (!io_service_) {
+ isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
+ " sender io service is null");
+ }
+
+ // We shouldn't be here if IO isn't ready to execute.
+ // By running poll we're guaranteed not to hang.
+ /// @todo Trac# 3325 requests that asiolink::IOService provide a
+ /// wrapper for poll().
+ io_service_->get_io_service().poll_one();
+}
+
+} // namespace dhcp_ddns
+} // namespace isc