summaryrefslogtreecommitdiffstats
path: root/src/lib/config/command_mgr.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:15:43 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:15:43 +0000
commitf5f56e1a1c4d9e9496fcb9d81131066a964ccd23 (patch)
tree49e44c6f87febed37efb953ab5485aa49f6481a7 /src/lib/config/command_mgr.cc
parentInitial commit. (diff)
downloadisc-kea-upstream.tar.xz
isc-kea-upstream.zip
Adding upstream version 2.4.1.upstream/2.4.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/lib/config/command_mgr.cc')
-rw-r--r--src/lib/config/command_mgr.cc667
1 files changed, 667 insertions, 0 deletions
diff --git a/src/lib/config/command_mgr.cc b/src/lib/config/command_mgr.cc
new file mode 100644
index 0000000..831d56e
--- /dev/null
+++ b/src/lib/config/command_mgr.cc
@@ -0,0 +1,667 @@
+// Copyright (C) 2015-2022 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/interval_timer.h>
+#include <asiolink/io_service.h>
+#include <asiolink/unix_domain_socket.h>
+#include <asiolink/unix_domain_socket_acceptor.h>
+#include <asiolink/unix_domain_socket_endpoint.h>
+#include <config/command_mgr.h>
+#include <cc/data.h>
+#include <cc/command_interpreter.h>
+#include <cc/json_feed.h>
+#include <dhcp/iface_mgr.h>
+#include <config/config_log.h>
+#include <config/timeouts.h>
+#include <util/watch_socket.h>
+#include <boost/enable_shared_from_this.hpp>
+#include <array>
+#include <functional>
+#include <unistd.h>
+#include <sys/file.h>
+
+using namespace isc;
+using namespace isc::asiolink;
+using namespace isc::config;
+using namespace isc::data;
+namespace ph = std::placeholders;
+
+namespace {
+
+/// @brief Maximum size of the data chunk sent/received over the socket.
+const size_t BUF_SIZE = 32768;
+
+class ConnectionPool;
+
+/// @brief Represents a single connection over control socket.
+///
+/// An instance of this object is created when the @c CommandMgr acceptor
+/// receives new connection from a controlling client.
+class Connection : public boost::enable_shared_from_this<Connection> {
+public:
+
+ /// @brief Constructor.
+ ///
+ /// This constructor registers a socket of this connection in the Interface
+ /// Manager to cause the blocking call to @c select() to return as soon as
+ /// a transmission over the control socket is received.
+ ///
+ /// It installs two external sockets on the @IfaceMgr to break synchronous
+ /// calls to @select(). The @c WatchSocket is used for send operations
+ /// over the connection. The native socket is used for signaling reads
+ /// over the connection.
+ ///
+ /// @param io_service IOService object used to handle the asio operations
+ /// @param socket Pointer to the object representing a socket which is used
+ /// for data transmission.
+ /// @param connection_pool Reference to the connection pool to which this
+ /// connection belongs.
+ /// @param timeout Connection timeout (in seconds).
+ Connection(const IOServicePtr& io_service,
+ const boost::shared_ptr<UnixDomainSocket>& socket,
+ ConnectionPool& connection_pool,
+ const long timeout)
+ : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
+ buf_(), response_(), connection_pool_(connection_pool), feed_(),
+ response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
+
+ LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_CONNECTION_OPENED)
+ .arg(socket_->getNative());
+
+ // Callback value of 0 is used to indicate that callback function is
+ // not installed.
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
+
+ // Initialize state model for receiving and preparsing commands.
+ feed_.initModel();
+
+ // Start timer for detecting timeouts.
+ scheduleTimer();
+ }
+
+ /// @brief Destructor.
+ ///
+ /// Cancels timeout timer if one is scheduled.
+ ~Connection() {
+ timeout_timer_.cancel();
+ }
+
+ /// @brief This method schedules timer or reschedules existing timer.
+ void scheduleTimer() {
+ timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
+ timeout_, IntervalTimer::ONE_SHOT);
+ }
+
+ /// @brief Close current connection.
+ ///
+ /// Connection is not closed if the invocation of this method is a result of
+ /// server reconfiguration. The connection will be closed once a response is
+ /// sent to the client. Closing a socket during processing a request would
+ /// cause the server to not send a response to the client.
+ void stop() {
+ if (!response_in_progress_) {
+ LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_CONNECTION_CLOSED)
+ .arg(socket_->getNative());
+
+ isc::dhcp::IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
+ isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
+
+ // Close watch socket and log errors if occur.
+ std::string watch_error;
+ if (!watch_socket_->closeSocket(watch_error)) {
+ LOG_ERROR(command_logger, COMMAND_WATCH_SOCKET_CLOSE_ERROR)
+ .arg(watch_error);
+ }
+
+ socket_->close();
+ timeout_timer_.cancel();
+ }
+ }
+
+ /// @brief Gracefully terminates current connection.
+ ///
+ /// This method should be called prior to closing the socket to initiate
+ /// graceful shutdown.
+ void terminate();
+
+ /// @brief Start asynchronous read over the unix domain socket.
+ ///
+ /// This method doesn't block. Once the transmission is received over the
+ /// socket, the @c Connection::receiveHandler callback is invoked to
+ /// process received data.
+ void doReceive() {
+ socket_->asyncReceive(&buf_[0], sizeof(buf_),
+ std::bind(&Connection::receiveHandler,
+ shared_from_this(), ph::_1, ph::_2));
+ }
+
+ /// @brief Starts asynchronous send over the unix domain socket.
+ ///
+ /// This method doesn't block. Once the send operation (that covers the whole
+ /// data if it's small or first BUF_SIZE bytes if its large) is completed, the
+ /// @c Connection::sendHandler callback is invoked. That handler will either
+ /// close the connection gracefully if all data has been sent, or will
+ /// call @ref doSend() again to send the next chunk of data.
+ void doSend() {
+ size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
+ socket_->asyncSend(&response_[0], chunk_size,
+ std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
+
+ // Asynchronous send has been scheduled and we need to indicate this
+ // to break the synchronous select(). The handler should clear this
+ // status when invoked.
+ try {
+ watch_socket_->markReady();
+
+ } catch (const std::exception& ex) {
+ LOG_ERROR(command_logger, COMMAND_WATCH_SOCKET_MARK_READY_ERROR)
+ .arg(ex.what());
+ }
+ }
+
+ /// @brief Handler invoked when the data is received over the control
+ /// socket.
+ ///
+ /// It collects received data into the @c isc::config::JSONFeed object and
+ /// schedules additional asynchronous read of data if this object signals
+ /// that command is incomplete. When the entire command is received, the
+ /// handler processes this command and asynchronously responds to the
+ /// controlling client.
+ //
+ ///
+ /// @param ec Error code.
+ /// @param bytes_transferred Number of bytes received.
+ void receiveHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred);
+
+
+ /// @brief Handler invoked when the data is sent over the control socket.
+ ///
+ /// If there are still data to be sent, another asynchronous send is
+ /// scheduled. When the entire command is sent, the connection is shutdown
+ /// and closed.
+ ///
+ /// @param ec Error code.
+ /// @param bytes_transferred Number of bytes sent.
+ void sendHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred);
+
+ /// @brief Handler invoked when timeout has occurred.
+ ///
+ /// Asynchronously sends a response to the client indicating that the
+ /// timeout has occurred.
+ void timeoutHandler();
+
+private:
+
+ /// @brief Pointer to the socket used for transmission.
+ boost::shared_ptr<UnixDomainSocket> socket_;
+
+ /// @brief Interval timer used to detect connection timeouts.
+ IntervalTimer timeout_timer_;
+
+ /// @brief Connection timeout (in milliseconds)
+ long timeout_;
+
+ /// @brief Buffer used for received data.
+ std::array<char, BUF_SIZE> buf_;
+
+ /// @brief Response created by the server.
+ std::string response_;
+
+ /// @brief Reference to the pool of connections.
+ ConnectionPool& connection_pool_;
+
+ /// @brief State model used to receive data over the connection and detect
+ /// when the command ends.
+ JSONFeed feed_;
+
+ /// @brief Boolean flag indicating if the request to stop connection is a
+ /// result of server reconfiguration.
+ bool response_in_progress_;
+
+ /// @brief Pointer to watch socket instance used to signal that the socket
+ /// is ready for read or write.
+ util::WatchSocketPtr watch_socket_;
+};
+
+/// @brief Pointer to the @c Connection.
+typedef boost::shared_ptr<Connection> ConnectionPtr;
+
+/// @brief Holds all open connections.
+class ConnectionPool {
+public:
+
+ /// @brief Starts new connection.
+ ///
+ /// @param connection Pointer to the new connection object.
+ void start(const ConnectionPtr& connection) {
+ connection->doReceive();
+ connections_.insert(connection);
+ }
+
+ /// @brief Stops running connection.
+ ///
+ /// @param connection Pointer to the new connection object.
+ void stop(const ConnectionPtr& connection) {
+ try {
+ connection->stop();
+ connections_.erase(connection);
+ } catch (const std::exception& ex) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_CLOSE_FAIL)
+ .arg(ex.what());
+ }
+ }
+
+ /// @brief Stops all connections which are allowed to stop.
+ void stopAll() {
+ for (auto conn = connections_.begin(); conn != connections_.end();
+ ++conn) {
+ (*conn)->stop();
+ }
+ connections_.clear();
+ }
+
+private:
+
+ /// @brief Pool of connections.
+ std::set<ConnectionPtr> connections_;
+
+};
+
+void
+Connection::terminate() {
+ try {
+ socket_->shutdown();
+
+ } catch (const std::exception& ex) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL)
+ .arg(ex.what());
+ }
+}
+
+void
+Connection::receiveHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ if (ec) {
+ if (ec.value() == boost::asio::error::eof) {
+ std::stringstream os;
+ if (feed_.getProcessedText().empty()) {
+ os << "no input data to discard";
+ } else {
+ os << "discarding partial command of "
+ << feed_.getProcessedText().size() << " bytes";
+ }
+
+ // Foreign host has closed the connection. We should remove it from the
+ // connection pool.
+ LOG_INFO(command_logger, COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST)
+ .arg(socket_->getNative()).arg(os.str());
+ } else if (ec.value() != boost::asio::error::operation_aborted) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_READ_FAIL)
+ .arg(ec.value()).arg(socket_->getNative());
+ }
+
+ connection_pool_.stop(shared_from_this());
+ return;
+
+ } else if (bytes_transferred == 0) {
+ // Nothing received. Close the connection.
+ connection_pool_.stop(shared_from_this());
+ return;
+ }
+
+ LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ)
+ .arg(bytes_transferred).arg(socket_->getNative());
+
+ // Reschedule the timer because the transaction is ongoing.
+ scheduleTimer();
+
+ ConstElementPtr cmd;
+ ConstElementPtr rsp;
+
+ try {
+ // Received some data over the socket. Append them to the JSON feed
+ // to see if we have reached the end of command.
+ feed_.postBuffer(&buf_[0], bytes_transferred);
+ feed_.poll();
+ // If we haven't yet received the full command, continue receiving.
+ if (feed_.needData()) {
+ doReceive();
+ return;
+ }
+
+ // Received entire command. Parse the command into JSON.
+ if (feed_.feedOk()) {
+ cmd = feed_.toElement();
+ response_in_progress_ = true;
+
+ // Cancel the timer to make sure that long lasting command
+ // processing doesn't cause the timeout.
+ timeout_timer_.cancel();
+
+ // If successful, then process it as a command.
+ rsp = CommandMgr::instance().processCommand(cmd);
+
+ response_in_progress_ = false;
+
+ } else {
+ // Failed to parse command as JSON or process the received command.
+ // This exception will be caught below and the error response will
+ // be sent.
+ isc_throw(BadValue, feed_.getErrorMessage());
+ }
+
+ } catch (const Exception& ex) {
+ LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
+ rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
+ }
+
+ // No response generated. Connection will be closed.
+ if (!rsp) {
+ LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR)
+ .arg(cmd ? cmd->str() : "unknown");
+ rsp = createAnswer(CONTROL_RESULT_ERROR,
+ "internal server error: no response generated");
+
+ } else {
+
+ // Reschedule the timer as it may be either canceled or need to be
+ // updated to not timeout before we manage to the send the reply.
+ scheduleTimer();
+
+ // Let's convert JSON response to text. Note that at this stage
+ // the rsp pointer is always set.
+ response_ = rsp->str();
+
+ doSend();
+ return;
+ }
+
+ // Close the connection if we have sent the entire response.
+ connection_pool_.stop(shared_from_this());
+}
+
+void
+Connection::sendHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ // Clear the watch socket so as the future send operation can mark it
+ // again to interrupt the synchronous select() call.
+ try {
+ watch_socket_->clearReady();
+
+ } catch (const std::exception& ex) {
+ LOG_ERROR(command_logger, COMMAND_WATCH_SOCKET_CLEAR_ERROR)
+ .arg(ex.what());
+ }
+
+ if (ec) {
+ // If an error occurred, log this error and stop the connection.
+ if (ec.value() != boost::asio::error::operation_aborted) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
+ .arg(socket_->getNative()).arg(ec.message());
+ }
+
+ } else {
+
+ // Reschedule the timer because the transaction is ongoing.
+ scheduleTimer();
+
+ // No error. We are in a process of sending a response. Need to
+ // remove the chunk that we have managed to sent with the previous
+ // attempt.
+ response_.erase(0, bytes_transferred);
+
+ LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_WRITE)
+ .arg(bytes_transferred).arg(response_.size())
+ .arg(socket_->getNative());
+
+ // Check if there is any data left to be sent and sent it.
+ if (!response_.empty()) {
+ doSend();
+ return;
+ }
+
+ // Gracefully shutdown the connection and close the socket if
+ // we have sent the whole response.
+ terminate();
+ }
+
+ // All data sent or an error has occurred. Close the connection.
+ connection_pool_.stop(shared_from_this());
+}
+
+void
+Connection::timeoutHandler() {
+ LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_TIMEOUT)
+ .arg(socket_->getNative());
+
+ try {
+ socket_->cancel();
+
+ } catch (const std::exception& ex) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_CONNECTION_CANCEL_FAIL)
+ .arg(socket_->getNative())
+ .arg(ex.what());
+ }
+
+ std::stringstream os;
+ os << "Connection over control channel timed out";
+ if (!feed_.getProcessedText().empty()) {
+ os << ", discarded partial command of "
+ << feed_.getProcessedText().size() << " bytes";
+ }
+
+ ConstElementPtr rsp = createAnswer(CONTROL_RESULT_ERROR, os.str());
+ response_ = rsp->str();
+ doSend();
+}
+
+
+}
+
+namespace isc {
+namespace config {
+
+/// @brief Implementation of the @c CommandMgr.
+class CommandMgrImpl {
+public:
+
+ /// @brief Constructor.
+ CommandMgrImpl()
+ : io_service_(), acceptor_(), socket_(), socket_name_(),
+ connection_pool_(), timeout_(TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND) {
+ }
+
+ /// @brief Opens acceptor service allowing the control clients to connect.
+ ///
+ /// @param socket_info Configuration information for the control socket.
+ /// @throw BadSocketInfo When socket configuration is invalid.
+ /// @throw SocketError When socket operation fails.
+ void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
+
+ /// @brief Asynchronously accepts next connection.
+ void doAccept();
+
+ /// @brief Returns the lock file name
+ std::string getLockName() {
+ return (std::string(socket_name_ + ".lock"));
+ }
+
+ /// @brief Pointer to the IO service used by the server process for running
+ /// asynchronous tasks.
+ IOServicePtr io_service_;
+
+ /// @brief Pointer to the acceptor service.
+ boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
+
+ /// @brief Pointer to the socket into which the new connection is accepted.
+ boost::shared_ptr<UnixDomainSocket> socket_;
+
+ /// @brief Path to the unix domain socket descriptor.
+ ///
+ /// This is used to remove the socket file once the connection terminates.
+ std::string socket_name_;
+
+ /// @brief Pool of connections.
+ ConnectionPool connection_pool_;
+
+ /// @brief Connection timeout
+ long timeout_;
+};
+
+void
+CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
+ socket_name_.clear();
+
+ if(!socket_info) {
+ isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
+ }
+
+ ConstElementPtr type = socket_info->get("socket-type");
+ if (!type) {
+ isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
+ }
+
+ // Only supporting unix sockets right now.
+ if (type->stringValue() != "unix") {
+ isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
+ << type->stringValue());
+ }
+
+ // UNIX socket is requested. It takes one parameter: socket-name that
+ // specifies UNIX path of the socket.
+ ConstElementPtr name = socket_info->get("socket-name");
+ if (!name) {
+ isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
+ }
+
+ if (name->getType() != Element::string) {
+ isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
+ }
+
+ socket_name_ = name->stringValue();
+
+ // First let's open lock file.
+ std::string lock_name = getLockName();
+ int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
+ if (lock_fd == -1) {
+ std::string errmsg = strerror(errno);
+ isc_throw(SocketError, "cannot create socket lockfile, "
+ << lock_name << ", : " << errmsg);
+ }
+
+ // Try to acquire lock. If we can't somebody else is actively
+ // using it.
+ int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
+ if (ret != 0) {
+ std::string errmsg = strerror(errno);
+ isc_throw(SocketError, "cannot lock socket lockfile, "
+ << lock_name << ", : " << errmsg);
+ }
+
+ // We have the lock, so let's remove the pre-existing socket
+ // file if it exists.
+ static_cast<void>(::remove(socket_name_.c_str()));
+
+ LOG_INFO(command_logger, COMMAND_ACCEPTOR_START)
+ .arg(socket_name_);
+
+ try {
+ // Start asynchronous acceptor service.
+ acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
+ UnixDomainSocketEndpoint endpoint(socket_name_);
+ acceptor_->open(endpoint);
+ acceptor_->bind(endpoint);
+ acceptor_->listen();
+ // Install this socket in Interface Manager.
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
+
+ doAccept();
+
+ } catch (const std::exception& ex) {
+ isc_throw(SocketError, ex.what());
+ }
+}
+
+void
+CommandMgrImpl::doAccept() {
+ // Create a socket into which the acceptor will accept new connection.
+ socket_.reset(new UnixDomainSocket(*io_service_));
+ acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
+ if (!ec) {
+ // New connection is arriving. Start asynchronous transmission.
+ ConnectionPtr connection(new Connection(io_service_, socket_,
+ connection_pool_,
+ timeout_));
+ connection_pool_.start(connection);
+
+ } else if (ec.value() != boost::asio::error::operation_aborted) {
+ LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
+ .arg(acceptor_->getNative()).arg(ec.message());
+ }
+
+ // Unless we're stopping the service, start accepting connections again.
+ if (ec.value() != boost::asio::error::operation_aborted) {
+ doAccept();
+ }
+ });
+}
+
+CommandMgr::CommandMgr()
+ : HookedCommandMgr(), impl_(new CommandMgrImpl()) {
+}
+
+void
+CommandMgr::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
+ impl_->openCommandSocket(socket_info);
+}
+
+void CommandMgr::closeCommandSocket() {
+ // Close acceptor if the acceptor is open.
+ if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
+ isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
+ impl_->acceptor_->close();
+ static_cast<void>(::remove(impl_->socket_name_.c_str()));
+ static_cast<void>(::remove(impl_->getLockName().c_str()));
+ }
+
+ // Stop all connections which can be closed. The only connection that won't
+ // be closed is the one over which we have received a request to reconfigure
+ // the server. This connection will be held until the CommandMgr responds to
+ // such request.
+ impl_->connection_pool_.stopAll();
+}
+
+int
+CommandMgr::getControlSocketFD() {
+ return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
+}
+
+
+CommandMgr&
+CommandMgr::instance() {
+ static CommandMgr cmd_mgr;
+ return (cmd_mgr);
+}
+
+void
+CommandMgr::setIOService(const IOServicePtr& io_service) {
+ impl_->io_service_ = io_service;
+}
+
+void
+CommandMgr::setConnectionTimeout(const long timeout) {
+ impl_->timeout_ = timeout;
+}
+
+
+}; // end of isc::config
+}; // end of isc