summaryrefslogtreecommitdiffstats
path: root/src/lib/asiodns/io_fetch.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/asiodns/io_fetch.cc')
-rw-r--r--src/lib/asiodns/io_fetch.cc402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/lib/asiodns/io_fetch.cc b/src/lib/asiodns/io_fetch.cc
new file mode 100644
index 0000000..2e29b47
--- /dev/null
+++ b/src/lib/asiodns/io_fetch.cc
@@ -0,0 +1,402 @@
+// Copyright (C) 2011-2021 Internet Systems Consortium, Inc. ("ISC")
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <config.h>
+#include <asiolink/asio_wrapper.h>
+#include <asiolink/io_address.h>
+#include <asiolink/io_asio_socket.h>
+#include <asiolink/io_endpoint.h>
+#include <asiolink/io_service.h>
+#include <asiolink/tcp_endpoint.h>
+#include <asiolink/tcp_socket.h>
+#include <asiolink/udp_endpoint.h>
+#include <asiolink/udp_socket.h>
+#include <asiodns/io_fetch.h>
+#include <asiodns/logger.h>
+#include <dns/messagerenderer.h>
+#include <dns/opcode.h>
+#include <dns/qid_gen.h>
+#include <dns/rcode.h>
+#include <util/buffer.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+
+#include <functional>
+#include <unistd.h> // for some IPC/network system calls
+#include <netinet/in.h>
+#include <stdint.h>
+#include <sys/socket.h>
+
+using namespace boost::asio;
+using namespace isc::asiolink;
+using namespace isc::dns;
+using namespace isc::util;
+using namespace isc::log;
+using namespace std;
+
+namespace isc {
+namespace asiodns {
+
+// Log debug verbosity
+
+const int DBG_IMPORTANT = DBGLVL_TRACE_BASIC;
+const int DBG_COMMON = DBGLVL_TRACE_DETAIL;
+const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20;
+
+/// \brief IOFetch Data
+///
+/// The data for IOFetch is held in a separate struct pointed to by a shared_ptr
+/// object. This is because the IOFetch object will be copied often (it is used
+/// as a coroutine and passed as callback to many async_*() functions) and we
+/// want keep the same data). Organising the data in this way keeps copying to
+/// a minimum.
+struct IOFetchData {
+
+ // The first two members are shared pointers to a base class because what is
+ // actually instantiated depends on whether the fetch is over UDP or TCP,
+ // which is not known until construction of the IOFetch. Use of a shared
+ // pointer here is merely to ensure deletion when the data object is deleted.
+ boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
+ ///< Socket to use for I/O
+ boost::scoped_ptr<IOEndpoint> remote_snd;///< Where the fetch is sent
+ boost::scoped_ptr<IOEndpoint> remote_rcv;///< Where the response came from
+ OutputBufferPtr msgbuf; ///< Wire buffer for question
+ OutputBufferPtr received; ///< Received data put here
+ IOFetch::Callback* callback; ///< Called on I/O Completion
+ boost::asio::deadline_timer timer; ///< Timer to measure timeouts
+ IOFetch::Protocol protocol; ///< Protocol being used
+ size_t cumulative; ///< Cumulative received amount
+ size_t expected; ///< Expected amount of data
+ size_t offset; ///< Offset to receive data
+ bool stopped; ///< Have we stopped running?
+ int timeout; ///< Timeout in ms
+ bool packet; ///< true if packet was supplied
+
+ // In case we need to log an error, the origin of the last asynchronous
+ // I/O is recorded. To save time and simplify the code, this is recorded
+ // as the ID of the error message that would be generated if the I/O failed.
+ // This means that we must make sure that all possible "origins" take the
+ // same arguments in their message in the same order.
+ isc::log::MessageID origin; ///< Origin of last asynchronous I/O
+ uint8_t staging[IOFetch::STAGING_LENGTH];
+ ///< Temporary array for received data
+ isc::dns::qid_t qid; ///< The QID set in the query
+
+ /// \brief Constructor
+ ///
+ /// Just fills in the data members of the IOFetchData structure
+ ///
+ /// \param proto Either IOFetch::TCP or IOFetch::UDP.
+ /// \param service I/O Service object to handle the asynchronous
+ /// operations.
+ /// \param address IP address of upstream server
+ /// \param port Port to use for the query
+ /// \param buff Output buffer into which the response (in wire format)
+ /// is written (if a response is received).
+ /// \param cb Callback object containing the callback to be called
+ /// when we terminate. The caller is responsible for managing this
+ /// object and deleting it if necessary.
+ /// \param wait Timeout for the fetch (in ms).
+ ///
+ /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554)
+ IOFetchData(IOFetch::Protocol proto, IOService& service,
+ const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
+ IOFetch::Callback* cb, int wait) :
+ socket((proto == IOFetch::UDP) ?
+ static_cast<IOAsioSocket<IOFetch>*>(
+ new UDPSocket<IOFetch>(service)) :
+ static_cast<IOAsioSocket<IOFetch>*>(
+ new TCPSocket<IOFetch>(service))
+ ),
+ remote_snd((proto == IOFetch::UDP) ?
+ static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
+ static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
+ ),
+ remote_rcv((proto == IOFetch::UDP) ?
+ static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
+ static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
+ ),
+ msgbuf(new OutputBuffer(512)),
+ received(buff),
+ callback(cb),
+ timer(service.get_io_service()),
+ protocol(proto),
+ cumulative(0),
+ expected(0),
+ offset(0),
+ stopped(false),
+ timeout(wait),
+ packet(false),
+ origin(ASIODNS_UNKNOWN_ORIGIN),
+ staging(),
+ qid(QidGenerator::getInstance().generateQid())
+ {}
+
+ // Checks if the response we received was ok;
+ // - data contains the buffer we read, as well as the address
+ // we sent to and the address we received from.
+ // length is provided by the operator() in IOFetch.
+ // Addresses must match, number of octets read must be at least
+ // 2, and the first two octets must match the qid of the message
+ // we sent.
+ bool responseOK() {
+ return (*remote_snd == *remote_rcv && cumulative >= 2 &&
+ readUint16(received->getData(), received->getLength()) == qid);
+ }
+};
+
+/// IOFetch Constructor - just initialize the private data
+
+IOFetch::IOFetch(Protocol protocol, IOService& service,
+ const isc::dns::Question& question, const IOAddress& address,
+ uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
+ MessagePtr query_msg(new Message(Message::RENDER));
+ initIOFetch(query_msg, protocol, service, question, address, port, buff,
+ cb, wait, edns);
+}
+
+IOFetch::IOFetch(Protocol protocol, IOService& service,
+ OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
+ OutputBufferPtr& buff, Callback* cb, int wait) :
+ data_(new IOFetchData(protocol, service,
+ address, port, buff, cb, wait)) {
+ data_->msgbuf = outpkt;
+ data_->packet = true;
+}
+
+IOFetch::IOFetch(Protocol protocol, IOService& service,
+ ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
+ OutputBufferPtr& buff, Callback* cb, int wait) {
+ MessagePtr msg(new Message(Message::RENDER));
+
+ msg->setHeaderFlag(Message::HEADERFLAG_RD,
+ query_message->getHeaderFlag(Message::HEADERFLAG_RD));
+ msg->setHeaderFlag(Message::HEADERFLAG_CD,
+ query_message->getHeaderFlag(Message::HEADERFLAG_CD));
+
+ initIOFetch(msg, protocol, service,
+ **(query_message->beginQuestion()),
+ address, port, buff, cb, wait);
+}
+
+void
+IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol,
+ IOService& service,
+ const isc::dns::Question& question,
+ const IOAddress& address, uint16_t port,
+ OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
+ data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
+ protocol, service, address, port, buff, cb, wait));
+
+ query_msg->setQid(data_->qid);
+ query_msg->setOpcode(Opcode::QUERY());
+ query_msg->setRcode(Rcode::NOERROR());
+ query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
+ query_msg->addQuestion(question);
+
+ if (edns) {
+ EDNSPtr edns_query(new EDNS());
+ edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
+ query_msg->setEDNS(edns_query);
+ }
+
+ MessageRenderer renderer;
+ renderer.setBuffer(data_->msgbuf.get());
+ query_msg->toWire(renderer);
+ renderer.setBuffer(NULL);
+}
+
+// Return protocol in use.
+
+IOFetch::Protocol
+IOFetch::getProtocol() const {
+ return (data_->protocol);
+}
+
+/// The function operator is implemented with the "stackless coroutine"
+/// pattern; see boost/asio/coroutine.hpp for details.
+
+void
+IOFetch::operator()(boost::system::error_code ec, size_t length) {
+ if (data_->stopped) {
+ return;
+
+ // On Debian it has been often observed that boost::asio async
+ // operations result in EINPROGRESS. This doesn't necessarily
+ // indicate an issue. Thus, we continue as if no error occurred.
+ } else if (ec && (ec.value() != boost::asio::error::in_progress)) {
+ logIOFailure(ec);
+ return;
+ }
+
+ BOOST_ASIO_CORO_REENTER (this) {
+
+ /// Generate the upstream query and render it to wire format
+ /// This is done in a different scope to allow inline variable
+ /// declarations.
+ {
+ if (data_->packet) {
+ // A packet was given, overwrite the QID (which is in the
+ // first two bytes of the packet).
+ data_->msgbuf->writeUint16At(data_->qid, 0);
+
+ }
+ }
+
+ // If we timeout, we stop, which will can cancel outstanding I/Os and
+ // shutdown everything.
+ if (data_->timeout != -1) {
+ data_->timer.expires_from_now(boost::posix_time::milliseconds(
+ data_->timeout));
+ data_->timer.async_wait(std::bind(&IOFetch::stop, *this,
+ TIME_OUT));
+ }
+
+ // Open a connection to the target system. For speed, if the operation
+ // is synchronous (i.e. UDP operation) we bypass the yield.
+ data_->origin = ASIODNS_OPEN_SOCKET;
+ if (data_->socket->isOpenSynchronous()) {
+ data_->socket->open(data_->remote_snd.get(), *this);
+ } else {
+ BOOST_ASIO_CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
+ }
+
+ do {
+ // Begin an asynchronous send, and then yield. When the send completes,
+ // we will resume immediately after this point.
+ data_->origin = ASIODNS_SEND_DATA;
+ BOOST_ASIO_CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
+ data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
+
+ // Now receive the response. Since TCP may not receive the entire
+ // message in one operation, we need to loop until we have received
+ // it. (This can't be done within the asyncReceive() method because
+ // each I/O operation will be done asynchronously and between each one
+ // we need to yield ... and we *really* don't want to set up another
+ // coroutine within that method.) So after each receive (and yield),
+ // we check if the operation is complete and if not, loop to read again.
+ //
+ // Another concession to TCP is that the amount of is contained in the
+ // first two bytes. This leads to two problems:
+ //
+ // a) We don't want those bytes in the return buffer.
+ // b) They may not both arrive in the first I/O.
+ //
+ // So... we need to loop until we have at least two bytes, then store
+ // the expected amount of data. Then we need to loop until we have
+ // received all the data before copying it back to the user's buffer.
+ // And we want to minimize the amount of copying...
+
+ data_->origin = ASIODNS_READ_DATA;
+ data_->cumulative = 0; // No data yet received
+ data_->offset = 0; // First data into start of buffer
+ data_->received->clear(); // Clear the receive buffer
+ do {
+ BOOST_ASIO_CORO_YIELD data_->socket->asyncReceive(data_->staging,
+ static_cast<size_t>(STAGING_LENGTH),
+ data_->offset,
+ data_->remote_rcv.get(), *this);
+ } while (!data_->socket->processReceivedData(data_->staging, length,
+ data_->cumulative, data_->offset,
+ data_->expected, data_->received));
+ } while (!data_->responseOK());
+
+ // Finished with this socket, so close it. This will not generate an
+ // I/O error, but reset the origin to unknown in case we change this.
+ data_->origin = ASIODNS_UNKNOWN_ORIGIN;
+ data_->socket->close();
+
+ /// We are done
+ stop(SUCCESS);
+ }
+}
+
+// Function that stops the coroutine sequence. It is called either when the
+// query finishes or when the timer times out. Either way, it sets the
+// "stopped_" flag and cancels anything that is in progress.
+//
+// As the function may be entered multiple times as things wind down, it checks
+// if the stopped_ flag is already set. If it is, the call is a no-op.
+
+void
+IOFetch::stop(Result result) {
+ if (!data_->stopped) {
+
+ // Mark the fetch as stopped to prevent other completion callbacks
+ // (invoked because of the calls to cancel()) from executing the
+ // cancel calls again.
+ //
+ // In a single threaded environment, the callbacks won't be invoked
+ // until this one completes. In a multi-threaded environment, they may
+ // well be, in which case the testing (and setting) of the stopped_
+ // variable should be done inside a mutex (and the stopped_ variable
+ // declared as "volatile").
+ //
+ // TODO: Update testing of stopped_ if threads are used.
+ data_->stopped = true;
+ switch (result) {
+ case TIME_OUT:
+ LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT).
+ arg(data_->remote_snd->getAddress().toText()).
+ arg(data_->remote_snd->getPort());
+ break;
+
+ case SUCCESS:
+ LOG_DEBUG(logger, DBG_ALL, ASIODNS_FETCH_COMPLETED).
+ arg(data_->remote_rcv->getAddress().toText()).
+ arg(data_->remote_rcv->getPort());
+ break;
+
+ case STOPPED:
+ // Fetch has been stopped for some other reason. This is
+ // allowed but as it is unusual it is logged, but with a lower
+ // debug level than a timeout (which is totally normal).
+ LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED).
+ arg(data_->remote_snd->getAddress().toText()).
+ arg(data_->remote_snd->getPort());
+ break;
+
+ default:
+ LOG_ERROR(logger, ASIODNS_UNKNOWN_RESULT).
+ arg(data_->remote_snd->getAddress().toText()).
+ arg(data_->remote_snd->getPort());
+ }
+
+ // Stop requested, cancel and I/O's on the socket and shut it down,
+ // and cancel the timer.
+ data_->socket->cancel();
+ data_->socket->close();
+
+ data_->timer.cancel();
+
+ // Execute the I/O completion callback (if present).
+ if (data_->callback) {
+ (*(data_->callback))(result);
+ }
+ }
+}
+
+// Log an error - called on I/O failure
+
+void IOFetch::logIOFailure(boost::system::error_code ec) {
+ // Should only get here with a known error code.
+ if ((data_->origin != ASIODNS_OPEN_SOCKET) &&
+ (data_->origin != ASIODNS_SEND_DATA) &&
+ (data_->origin != ASIODNS_READ_DATA) &&
+ (data_->origin != ASIODNS_UNKNOWN_ORIGIN)) {
+ isc_throw(isc::Unexpected, "impossible error code " << data_->origin);
+ }
+
+ LOG_ERROR(logger, data_->origin).arg(ec.value()).
+ arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
+ "TCP" : "UDP").
+ arg(data_->remote_snd->getAddress().toText()).
+ arg(data_->remote_snd->getPort());
+}
+
+} // namespace asiodns
+} // namespace isc {