// Copyright (C) 2011-2022 Internet Systems Consortium, Inc. ("ISC") // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // for some IPC/network system calls #include #include #include using namespace boost::asio; using namespace isc::asiolink; using namespace isc::dns; using namespace isc::util; using namespace isc::log; using namespace std; namespace isc { namespace asiodns { // Log debug verbosity const int DBG_IMPORTANT = DBGLVL_TRACE_BASIC; const int DBG_COMMON = DBGLVL_TRACE_DETAIL; const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20; /// \brief IOFetch Data /// /// The data for IOFetch is held in a separate struct pointed to by a shared_ptr /// object. This is because the IOFetch object will be copied often (it is used /// as a coroutine and passed as callback to many async_*() functions) and we /// want keep the same data). Organising the data in this way keeps copying to /// a minimum. struct IOFetchData { // The first two members are shared pointers to a base class because what is // actually instantiated depends on whether the fetch is over UDP or TCP, // which is not known until construction of the IOFetch. Use of a shared // pointer here is merely to ensure deletion when the data object is deleted. boost::scoped_ptr > socket; ///< Socket to use for I/O boost::scoped_ptr remote_snd;///< Where the fetch is sent boost::scoped_ptr remote_rcv;///< Where the response came from OutputBufferPtr msgbuf; ///< Wire buffer for question OutputBufferPtr received; ///< Received data put here IOFetch::Callback* callback; ///< Called on I/O Completion boost::asio::deadline_timer timer; ///< Timer to measure timeouts IOFetch::Protocol protocol; ///< Protocol being used size_t cumulative; ///< Cumulative received amount size_t expected; ///< Expected amount of data size_t offset; ///< Offset to receive data bool stopped; ///< Have we stopped running? int timeout; ///< Timeout in ms bool packet; ///< true if packet was supplied // In case we need to log an error, the origin of the last asynchronous // I/O is recorded. To save time and simplify the code, this is recorded // as the ID of the error message that would be generated if the I/O failed. // This means that we must make sure that all possible "origins" take the // same arguments in their message in the same order. isc::log::MessageID origin; ///< Origin of last asynchronous I/O uint8_t staging[IOFetch::STAGING_LENGTH]; ///< Temporary array for received data isc::dns::qid_t qid; ///< The QID set in the query /// \brief Constructor /// /// Just fills in the data members of the IOFetchData structure /// /// \param proto Either IOFetch::TCP or IOFetch::UDP. /// \param service I/O Service object to handle the asynchronous /// operations. /// \param address IP address of upstream server /// \param port Port to use for the query /// \param buff Output buffer into which the response (in wire format) /// is written (if a response is received). /// \param cb Callback object containing the callback to be called /// when we terminate. The caller is responsible for managing this /// object and deleting it if necessary. /// \param wait Timeout for the fetch (in ms). /// /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554) IOFetchData(IOFetch::Protocol proto, IOService& service, const IOAddress& address, uint16_t port, OutputBufferPtr& buff, IOFetch::Callback* cb, int wait) : socket((proto == IOFetch::UDP) ? static_cast*>( new UDPSocket(service)) : static_cast*>( new TCPSocket(service)) ), remote_snd((proto == IOFetch::UDP) ? static_cast(new UDPEndpoint(address, port)) : static_cast(new TCPEndpoint(address, port)) ), remote_rcv((proto == IOFetch::UDP) ? static_cast(new UDPEndpoint(address, port)) : static_cast(new TCPEndpoint(address, port)) ), msgbuf(new OutputBuffer(512)), received(buff), callback(cb), timer(service.get_io_service()), protocol(proto), cumulative(0), expected(0), offset(0), stopped(false), timeout(wait), packet(false), origin(ASIODNS_UNKNOWN_ORIGIN), staging(), qid(QidGenerator::getInstance().generateQid()) {} // Checks if the response we received was ok; // - data contains the buffer we read, as well as the address // we sent to and the address we received from. // length is provided by the operator() in IOFetch. // Addresses must match, number of octets read must be at least // 2, and the first two octets must match the qid of the message // we sent. bool responseOK() { return (*remote_snd == *remote_rcv && cumulative >= 2 && readUint16(received->getData(), received->getLength()) == qid); } }; /// IOFetch Constructor - just initialize the private data IOFetch::IOFetch(Protocol protocol, IOService& service, const isc::dns::Question& question, const IOAddress& address, uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns) { MessagePtr query_msg(new Message(Message::RENDER)); initIOFetch(query_msg, protocol, service, question, address, port, buff, cb, wait, edns); } IOFetch::IOFetch(Protocol protocol, IOService& service, OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait) : data_(new IOFetchData(protocol, service, address, port, buff, cb, wait)) { data_->msgbuf = outpkt; data_->packet = true; } IOFetch::IOFetch(Protocol protocol, IOService& service, ConstMessagePtr query_message, const IOAddress& address, uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait) { MessagePtr msg(new Message(Message::RENDER)); msg->setHeaderFlag(Message::HEADERFLAG_RD, query_message->getHeaderFlag(Message::HEADERFLAG_RD)); msg->setHeaderFlag(Message::HEADERFLAG_CD, query_message->getHeaderFlag(Message::HEADERFLAG_CD)); initIOFetch(msg, protocol, service, **(query_message->beginQuestion()), address, port, buff, cb, wait); } void IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol, IOService& service, const isc::dns::Question& question, const IOAddress& address, uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns) { data_ = boost::shared_ptr(new IOFetchData( protocol, service, address, port, buff, cb, wait)); query_msg->setQid(data_->qid); query_msg->setOpcode(Opcode::QUERY()); query_msg->setRcode(Rcode::NOERROR()); query_msg->setHeaderFlag(Message::HEADERFLAG_RD); query_msg->addQuestion(question); if (edns) { EDNSPtr edns_query(new EDNS()); edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE); query_msg->setEDNS(edns_query); } MessageRenderer renderer; renderer.setBuffer(data_->msgbuf.get()); query_msg->toWire(renderer); renderer.setBuffer(NULL); } // Return protocol in use. IOFetch::Protocol IOFetch::getProtocol() const { return (data_->protocol); } /// The function operator is implemented with the "stackless coroutine" /// pattern; see boost/asio/coroutine.hpp for details. void IOFetch::operator()(boost::system::error_code ec, size_t length) { if (data_->stopped) { return; // On Debian it has been often observed that boost::asio async // operations result in EINPROGRESS. This doesn't necessarily // indicate an issue. Thus, we continue as if no error occurred. } else if (ec && (ec.value() != boost::asio::error::in_progress)) { logIOFailure(ec); return; } BOOST_ASIO_CORO_REENTER (this) { /// Generate the upstream query and render it to wire format /// This is done in a different scope to allow inline variable /// declarations. { if (data_->packet) { // A packet was given, overwrite the QID (which is in the // first two bytes of the packet). data_->msgbuf->writeUint16At(data_->qid, 0); } } // If we timeout, we stop, which cancels outstanding I/O operations and // shuts down everything. if (data_->timeout != -1) { data_->timer.expires_from_now(boost::posix_time::milliseconds( data_->timeout)); data_->timer.async_wait(std::bind(&IOFetch::stop, *this, TIME_OUT)); } // Open a connection to the target system. For speed, if the operation // is synchronous (i.e. UDP operation) we bypass the yield. data_->origin = ASIODNS_OPEN_SOCKET; if (data_->socket->isOpenSynchronous()) { data_->socket->open(data_->remote_snd.get(), *this); } else { BOOST_ASIO_CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this); } do { // Begin an asynchronous send, and then yield. When the send completes, // we will resume immediately after this point. data_->origin = ASIODNS_SEND_DATA; BOOST_ASIO_CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(), data_->msgbuf->getLength(), data_->remote_snd.get(), *this); // Now receive the response. Since TCP may not receive the entire // message in one operation, we need to loop until we have received // it. (This can't be done within the asyncReceive() method because // each I/O operation will be done asynchronously and between each one // we need to yield ... and we *really* don't want to set up another // coroutine within that method.) So after each receive (and yield), // we check if the operation is complete and if not, loop to read again. // // Another concession to TCP is that the amount of is contained in the // first two bytes. This leads to two problems: // // a) We don't want those bytes in the return buffer. // b) They may not both arrive in the first I/O. // // So... we need to loop until we have at least two bytes, then store // the expected amount of data. Then we need to loop until we have // received all the data before copying it back to the user's buffer. // And we want to minimize the amount of copying... data_->origin = ASIODNS_READ_DATA; data_->cumulative = 0; // No data yet received data_->offset = 0; // First data into start of buffer data_->received->clear(); // Clear the receive buffer do { BOOST_ASIO_CORO_YIELD data_->socket->asyncReceive(data_->staging, static_cast(STAGING_LENGTH), data_->offset, data_->remote_rcv.get(), *this); } while (!data_->socket->processReceivedData(data_->staging, length, data_->cumulative, data_->offset, data_->expected, data_->received)); } while (!data_->responseOK()); // Finished with this socket, so close it. This will not generate an // I/O error, but reset the origin to unknown in case we change this. data_->origin = ASIODNS_UNKNOWN_ORIGIN; data_->socket->close(); /// We are done stop(SUCCESS); } } // Function that stops the coroutine sequence. It is called either when the // query finishes or when the timer times out. Either way, it sets the // "stopped_" flag and cancels anything that is in progress. // // As the function may be entered multiple times as things wind down, it checks // if the stopped_ flag is already set. If it is, the call is a no-op. void IOFetch::stop(Result result) { if (!data_->stopped) { // Mark the fetch as stopped to prevent other completion callbacks // (invoked because of the calls to cancel()) from executing the // cancel calls again. // // In a single threaded environment, the callbacks won't be invoked // until this one completes. In a multi-threaded environment, they may // well be, in which case the testing (and setting) of the stopped_ // variable should be done inside a mutex (and the stopped_ variable // declared as "volatile"). // // TODO: Update testing of stopped_ if threads are used. data_->stopped = true; switch (result) { case TIME_OUT: LOG_DEBUG(logger, DBG_COMMON, ASIODNS_READ_TIMEOUT). arg(data_->remote_snd->getAddress().toText()). arg(data_->remote_snd->getPort()); break; case SUCCESS: LOG_DEBUG(logger, DBG_ALL, ASIODNS_FETCH_COMPLETED). arg(data_->remote_rcv->getAddress().toText()). arg(data_->remote_rcv->getPort()); break; case STOPPED: // Fetch has been stopped for some other reason. This is // allowed but as it is unusual it is logged, but with a lower // debug level than a timeout (which is totally normal). LOG_DEBUG(logger, DBG_IMPORTANT, ASIODNS_FETCH_STOPPED). arg(data_->remote_snd->getAddress().toText()). arg(data_->remote_snd->getPort()); break; default: LOG_ERROR(logger, ASIODNS_UNKNOWN_RESULT). arg(data_->remote_snd->getAddress().toText()). arg(data_->remote_snd->getPort()); } // Stop requested, cancel and I/O's on the socket and shut it down, // and cancel the timer. data_->socket->cancel(); data_->socket->close(); data_->timer.cancel(); // Execute the I/O completion callback (if present). if (data_->callback) { (*(data_->callback))(result); } } } // Log an error - called on I/O failure void IOFetch::logIOFailure(boost::system::error_code ec) { // Should only get here with a known error code. if ((data_->origin != ASIODNS_OPEN_SOCKET) && (data_->origin != ASIODNS_SEND_DATA) && (data_->origin != ASIODNS_READ_DATA) && (data_->origin != ASIODNS_UNKNOWN_ORIGIN)) { isc_throw(isc::Unexpected, "impossible error code " << data_->origin); } LOG_ERROR(logger, data_->origin).arg(ec.value()). arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ? "TCP" : "UDP"). arg(data_->remote_snd->getAddress().toText()). arg(data_->remote_snd->getPort()); } } // namespace asiodns } // namespace isc {