diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:11:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 21:11:59 +0000 |
commit | 3cd01b932e1c85394272ae64fae67ebeda92fb00 (patch) | |
tree | c5a3115d710afc1879ddea5349362a2bc651733c /test-dnsdisttcp_cc.cc | |
parent | Initial commit. (diff) | |
download | dnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.tar.xz dnsdist-3cd01b932e1c85394272ae64fae67ebeda92fb00.zip |
Adding upstream version 1.8.3.upstream/1.8.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test-dnsdisttcp_cc.cc')
-rw-r--r-- | test-dnsdisttcp_cc.cc | 4164 |
1 files changed, 4164 insertions, 0 deletions
diff --git a/test-dnsdisttcp_cc.cc b/test-dnsdisttcp_cc.cc new file mode 100644 index 0000000..0904441 --- /dev/null +++ b/test-dnsdisttcp_cc.cc @@ -0,0 +1,4164 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_NO_MAIN + +#include <boost/test/unit_test.hpp> + +#include "dnswriter.hh" +#include "dnsdist.hh" +#include "dnsdist-proxy-protocol.hh" +#include "dnsdist-rings.hh" +#include "dnsdist-tcp-downstream.hh" +#include "dnsdist-tcp-upstream.hh" + +struct DNSDistStats g_stats; +GlobalStateHolder<NetmaskGroup> g_ACL; +GlobalStateHolder<vector<DNSDistRuleAction> > g_ruleactions; +GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_respruleactions; +GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cachehitrespruleactions; +GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_cacheInsertedRespRuleActions; +GlobalStateHolder<vector<DNSDistResponseRuleAction> > g_selfansweredrespruleactions; +GlobalStateHolder<servers_t> g_dstates; + +QueryCount g_qcount; + +const bool TCPIOHandler::s_disableConnectForUnitTests = true; + +bool checkDNSCryptQuery(const ClientState& cs, PacketBuffer& query, std::unique_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp) +{ + return false; +} + +bool checkQueryHeaders(const struct dnsheader* dh, ClientState&) +{ + return true; +} + +uint64_t uptimeOfProcess(const std::string& str) +{ + return 0; +} + +void handleResponseSent(const InternalQueryState& ids, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol protocol, bool fromBackend) +{ +} + +static std::function<ProcessQueryResult(DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend)> s_processQuery; + +ProcessQueryResult processQuery(DNSQuestion& dq, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend) +{ + if (s_processQuery) { + return s_processQuery(dq, selectedBackend); + } + + return ProcessQueryResult::Drop; +} + +bool responseContentMatches(const PacketBuffer& response, const DNSName& qname, const uint16_t qtype, const uint16_t qclass, const std::shared_ptr<DownstreamState>& remote, unsigned int& qnameWireLength) +{ + return true; +} + +static std::function<bool(PacketBuffer& response, DNSResponse& dr, bool muted)> s_processResponse; + +bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& localCacheInsertedRespRuleActions, DNSResponse& dr, bool muted) +{ + if (s_processResponse) { + return s_processResponse(response, dr, muted); + } + + return false; +} + +BOOST_AUTO_TEST_SUITE(test_dnsdisttcp_cc) + +struct ExpectedStep +{ +public: + enum class ExpectedRequest { handshakeClient, readFromClient, writeToClient, closeClient, connectToBackend, readFromBackend, writeToBackend, closeBackend }; + + ExpectedStep(ExpectedRequest r, IOState n, size_t b = 0, std::function<void(int descriptor)> fn = nullptr): cb(fn), request(r), nextState(n), bytes(b) + { + } + + std::function<void(int descriptor)> cb{nullptr}; + ExpectedRequest request; + IOState nextState; + size_t bytes{0}; +}; + +static std::deque<ExpectedStep> s_steps; + +static PacketBuffer s_readBuffer; +static PacketBuffer s_writeBuffer; +static PacketBuffer s_backendReadBuffer; +static PacketBuffer s_backendWriteBuffer; + +std::ostream& operator<<(std::ostream &os, const ExpectedStep::ExpectedRequest d); + +std::ostream& operator<<(std::ostream &os, const ExpectedStep::ExpectedRequest d) +{ + static const std::vector<std::string> requests = { "handshake with client", "read from client", "write to client", "close connection to client", "connect to the backend", "read from the backend", "write to the backend", "close connection to backend" }; + os<<requests.at(static_cast<size_t>(d)); + return os; +} + +class MockupTLSConnection : public TLSConnection +{ +public: + MockupTLSConnection(int descriptor, bool client = false): d_descriptor(descriptor), d_client(client) + { + } + + ~MockupTLSConnection() { } + + IOState tryHandshake() override + { + auto step = getStep(); + BOOST_REQUIRE_EQUAL(step.request, ExpectedStep::ExpectedRequest::handshakeClient); + + return step.nextState; + } + + IOState tryWrite(const PacketBuffer& buffer, size_t& pos, size_t toWrite) override + { + auto step = getStep(); + BOOST_REQUIRE_EQUAL(step.request, !d_client ? ExpectedStep::ExpectedRequest::writeToClient : ExpectedStep::ExpectedRequest::writeToBackend); + + if (step.bytes == 0) { + if (step.nextState == IOState::NeedWrite) { + return step.nextState; + } + throw std::runtime_error("Remote host closed the connection"); + } + + toWrite -= pos; + BOOST_REQUIRE_GE(buffer.size(), pos + toWrite); + + if (step.bytes < toWrite) { + toWrite = step.bytes; + } + + auto& externalBuffer = d_client ? s_backendWriteBuffer : s_writeBuffer; + externalBuffer.insert(externalBuffer.end(), buffer.begin() + pos, buffer.begin() + pos + toWrite); + pos += toWrite; + + return step.nextState; + } + + IOState tryRead(PacketBuffer& buffer, size_t& pos, size_t toRead, bool allowIncomplete=false) override + { + auto step = getStep(); + BOOST_REQUIRE_EQUAL(step.request, !d_client ? ExpectedStep::ExpectedRequest::readFromClient : ExpectedStep::ExpectedRequest::readFromBackend); + + if (step.bytes == 0) { + if (step.nextState == IOState::NeedRead) { + return step.nextState; + } + throw std::runtime_error("Remote host closed the connection"); + } + + auto& externalBuffer = d_client ? s_backendReadBuffer : s_readBuffer; + toRead -= pos; + + if (step.bytes < toRead) { + toRead = step.bytes; + } + + BOOST_REQUIRE_GE(buffer.size(), toRead); + BOOST_REQUIRE_GE(externalBuffer.size(), toRead); + + std::copy(externalBuffer.begin(), externalBuffer.begin() + toRead, buffer.begin() + pos); + pos += toRead; + externalBuffer.erase(externalBuffer.begin(), externalBuffer.begin() + toRead); + + return step.nextState; + } + + IOState tryConnect(bool fastOpen, const ComboAddress& remote) override + { + auto step = getStep(); + BOOST_REQUIRE_EQUAL(step.request, ExpectedStep::ExpectedRequest::connectToBackend); + + return step.nextState; + } + + void close() override + { + auto step = getStep(); + BOOST_REQUIRE_EQUAL(step.request, !d_client ? ExpectedStep::ExpectedRequest::closeClient : ExpectedStep::ExpectedRequest::closeBackend); + } + + bool hasBufferedData() const override + { + return false; + } + + bool isUsable() const override + { + return true; + } + + std::string getServerNameIndication() const override + { + return ""; + } + + std::vector<uint8_t> getNextProtocol() const override + { + return std::vector<uint8_t>(); + } + + LibsslTLSVersion getTLSVersion() const override + { + return LibsslTLSVersion::TLS13; + } + + bool hasSessionBeenResumed() const override + { + return false; + } + + std::vector<std::unique_ptr<TLSSession>> getSessions() override + { + return {}; + } + + std::vector<int> getAsyncFDs() override + { + return {}; + } + + void setSession(std::unique_ptr<TLSSession>& session) override + { + } + + /* unused in that context, don't bother */ + void doHandshake() override + { + } + + void connect(bool fastOpen, const ComboAddress& remote, const struct timeval& timeout) override + { + } + + size_t read(void* buffer, size_t bufferSize, const struct timeval&readTimeout, const struct timeval& totalTimeout={0,0}, bool allowIncomplete=false) override + { + return 0; + } + + size_t write(const void* buffer, size_t bufferSize, const struct timeval& writeTimeout) override + { + return 0; + } +private: + ExpectedStep getStep() const + { + BOOST_REQUIRE(!s_steps.empty()); + auto step = s_steps.front(); + s_steps.pop_front(); + + if (step.cb) { + step.cb(d_descriptor); + } + + return step; + } + + const int d_descriptor; + bool d_client{false}; +}; + +class MockupTLSCtx : public TLSCtx +{ +public: + ~MockupTLSCtx() + { + } + + std::unique_ptr<TLSConnection> getConnection(int socket, const struct timeval& timeout, time_t now) override + { + return std::make_unique<MockupTLSConnection>(socket); + } + + std::unique_ptr<TLSConnection> getClientConnection(const std::string& host, bool hostIsAddr, int socket, const struct timeval& timeout) override + { + return std::make_unique<MockupTLSConnection>(socket, true); + } + + void rotateTicketsKey(time_t now) override + { + } + + size_t getTicketsKeysCount() override + { + return 0; + } + + std::string getName() const override + { + return "Mockup TLS"; + } +}; + +class MockupFDMultiplexer : public FDMultiplexer +{ +public: + MockupFDMultiplexer() + { + } + + ~MockupFDMultiplexer() + { + } + + int run(struct timeval* tv, int timeout=500) override + { + int ret = 0; + + gettimeofday(tv, nullptr); // MANDATORY + + /* 'ready' might be altered by a callback while we are iterating */ + const auto readyFDs = ready; + for (const auto fd : readyFDs) { + { + const auto& it = d_readCallbacks.find(fd); + + if (it != d_readCallbacks.end()) { + it->d_callback(it->d_fd, it->d_parameter); + continue; // so we don't refind ourselves as writable! + } + } + + { + const auto& it = d_writeCallbacks.find(fd); + + if (it != d_writeCallbacks.end()) { + it->d_callback(it->d_fd, it->d_parameter); + } + } + } + + return ret; + } + + void getAvailableFDs(std::vector<int>& fds, int timeout) override + { + } + + void addFD(int fd, FDMultiplexer::EventKind kind) override + { + } + + void removeFD(int fd, FDMultiplexer::EventKind) override + { + } + + string getName() const override + { + return "mockup"; + } + + void setReady(int fd) + { + ready.insert(fd); + } + + void setNotReady(int fd) + { + ready.erase(fd); + } + +private: + std::set<int> ready; +}; + +static bool isIPv6Supported() +{ + try { + ComboAddress addr("[2001:db8:53::1]:53"); + auto socket = std::make_unique<Socket>(addr.sin4.sin_family, SOCK_STREAM, 0); + socket->setNonBlocking(); + int res = SConnectWithTimeout(socket->getHandle(), addr, timeval{0, 0}); + if (res == 0 || res == EINPROGRESS) { + return true; + } + return false; + } + catch (const std::exception& e) { + return false; + } +} + +static ComboAddress getBackendAddress(const std::string& lastDigit, uint16_t port) +{ + static const bool useV6 = isIPv6Supported(); + + if (useV6) { + return ComboAddress("2001:db8:53::" + lastDigit, port); + } + + return ComboAddress("192.0.2." + lastDigit, port); +} + +static void appendPayloadEditingID(PacketBuffer& buffer, const PacketBuffer& payload, uint16_t newID) +{ + PacketBuffer newPayload(payload); + dnsheader dh; + memcpy(&dh, &newPayload.at(sizeof(uint16_t)), sizeof(dh)); + dh.id = htons(newID); + memcpy(&newPayload.at(sizeof(uint16_t)), &dh, sizeof(dh)); + buffer.insert(buffer.end(), newPayload.begin(), newPayload.end()); +} + +static void prependPayloadEditingID(PacketBuffer& buffer, const PacketBuffer& payload, uint16_t newID) +{ + PacketBuffer newPayload(payload); + dnsheader dh; + memcpy(&dh, &newPayload.at(sizeof(uint16_t)), sizeof(dh)); + dh.id = htons(newID); + memcpy(&newPayload.at(sizeof(uint16_t)), &dh, sizeof(dh)); + buffer.insert(buffer.begin(), newPayload.begin(), newPayload.end()); +} + +static void testInit(const std::string& name, TCPClientThreadData& threadData) +{ +#ifdef DEBUGLOG_ENABLED + cerr<<name<<endl; +#else + (void) name; +#endif + + s_steps.clear(); + s_readBuffer.clear(); + s_writeBuffer.clear(); + s_backendReadBuffer.clear(); + s_backendWriteBuffer.clear(); + + g_proxyProtocolACL.clear(); + g_verbose = false; + IncomingTCPConnectionState::clearAllDownstreamConnections(); + + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); +} + +#define TEST_INIT(str) testInit(str, threadData) + +BOOST_AUTO_TEST_CASE(test_IncomingConnection_SelfAnswered) +{ + auto local = getBackendAddress("1", 80); + ClientState localCS(local, true, false, false, "", {}); + auto tlsCtx = std::make_shared<MockupTLSCtx>(); + localCS.tlsFrontend = std::make_shared<TLSFrontend>(tlsCtx); + + TCPClientThreadData threadData; + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); + + struct timeval now; + gettimeofday(&now, nullptr); + + PacketBuffer query; + GenericDNSPacketWriter<PacketBuffer> pwQ(query, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwQ.getHeader()->rd = 1; + + uint16_t querySize = static_cast<uint16_t>(query.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(querySize / 256), static_cast<uint8_t>(querySize % 256) }; + query.insert(query.begin(), sizeBytes, sizeBytes + 2); + + { + /* drop right away */ + TEST_INIT("=> drop right away"); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::Drop; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } + + { + /* self-generated REFUSED, client closes connection right away */ + TEST_INIT("=> self-gen"); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 65537 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + // Would be nicer to actually turn it into a response + return ProcessQueryResult::SendAnswer; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size()); + BOOST_CHECK(s_writeBuffer == query); + } + + { + TEST_INIT("=> shorts"); + /* need write then read during handshake, + short read on the size, then on the query itself, + self-generated REFUSED, short write on the response, + client closes connection right away */ + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::NeedWrite }, + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::NeedRead }, + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 1 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 1 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, query.size() - 3 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 1 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, query.size() - 1}, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 1 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + // Would be nicer to actually turn it into a response + return ProcessQueryResult::SendAnswer; + }; + + /* mark the incoming FD as always ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size()); + BOOST_CHECK(s_writeBuffer == query); + } + + { + TEST_INIT("=> exception while handling the query"); + /* Exception raised while handling the query */ + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + throw std::runtime_error("Something unexpected happened"); + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } + + { +#if 1 + TEST_INIT("=> 10k self-generated pipelined on the same connection"); + + /* 10k self-generated REFUSED pipelined on the same connection */ + size_t count = 10000; + + s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done } }; + + for (size_t idx = 0; idx < count; idx++) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() + 2 }); + }; + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::closeClient, IOState::Done }); + + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + // Would be nicer to actually turn it into a response + return ProcessQueryResult::SendAnswer; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size() * count); +#endif + } + + { + TEST_INIT("=> timeout while reading the query"); + /* timeout while reading the query */ + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, query.size() - 2 - 2 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + /* should not be reached */ + BOOST_CHECK(false); + return ProcessQueryResult::SendAnswer; + }; + + /* mark the incoming FD as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(threadData.mplexer->run(&now), 0); + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredReadConns = threadData.mplexer->getTimeouts(later, false); + for (const auto& cbData : expiredReadConns) { + BOOST_CHECK_EQUAL(cbData.first, state->d_handler.getDescriptor()); + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + BOOST_CHECK_EQUAL(cbData.first, cbState->d_handler.getDescriptor()); + cbState->handleTimeout(cbState, false); + } + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } + + { + TEST_INIT("=> timeout while writing the response"); + /* timeout while writing the response */ + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, 1 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + + /* mark the incoming FD as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(threadData.mplexer->run(&now), 0); + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredWriteConns = threadData.mplexer->getTimeouts(later, true); + for (const auto& cbData : expiredWriteConns) { + BOOST_CHECK_EQUAL(cbData.first, state->d_handler.getDescriptor()); + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + BOOST_CHECK_EQUAL(cbData.first, cbState->d_handler.getDescriptor()); + cbState->handleTimeout(cbState, false); + } + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 1U); + } + + { + TEST_INIT("=> Client closes the connection while writing the response (self-answered)"); + + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } +} + +BOOST_AUTO_TEST_CASE(test_IncomingConnectionWithProxyProtocol_SelfAnswered) +{ + auto local = getBackendAddress("1", 80); + ClientState localCS(local, true, false, false, "", {}); + auto tlsCtx = std::make_shared<MockupTLSCtx>(); + localCS.tlsFrontend = std::make_shared<TLSFrontend>(tlsCtx); + + TCPClientThreadData threadData; + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); + + struct timeval now; + gettimeofday(&now, nullptr); + + PacketBuffer query; + GenericDNSPacketWriter<PacketBuffer> pwQ(query, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwQ.getHeader()->rd = 1; + + uint16_t querySize = static_cast<uint16_t>(query.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(querySize / 256), static_cast<uint8_t>(querySize % 256) }; + query.insert(query.begin(), sizeBytes, sizeBytes + 2); + + { + TEST_INIT("=> reading PP"); + + g_proxyProtocolACL.addMask("0.0.0.0/0"); + g_proxyProtocolACL.addMask("::0/0"); + + auto proxyPayload = makeProxyHeader(true, ComboAddress("192.0.2.1"), ComboAddress("192.0.2.2"), {}); + BOOST_REQUIRE_GT(proxyPayload.size(), s_proxyProtocolMinimumHeaderSize); + s_readBuffer = query; + // preprend the proxy protocol payload + s_readBuffer.insert(s_readBuffer.begin(), proxyPayload.begin(), proxyPayload.end()); + // append a second query + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, s_proxyProtocolMinimumHeaderSize }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, proxyPayload.size() - s_proxyProtocolMinimumHeaderSize }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 65537 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 65537 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + + /* mark the incoming FD as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(threadData.mplexer->run(&now), 0); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size() * 2U); + } + + { + TEST_INIT("=> Invalid PP"); + + g_proxyProtocolACL.addMask("0.0.0.0/0"); + g_proxyProtocolACL.addMask("::0/0"); + auto proxyPayload = std::vector<uint8_t>(s_proxyProtocolMinimumHeaderSize); + std::fill(proxyPayload.begin(), proxyPayload.end(), 0); + + s_readBuffer = query; + // preprend the proxy protocol payload + s_readBuffer.insert(s_readBuffer.begin(), proxyPayload.begin(), proxyPayload.end()); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, s_proxyProtocolMinimumHeaderSize }, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } + + { + TEST_INIT("=> timeout while reading PP"); + + g_proxyProtocolACL.addMask("0.0.0.0/0"); + g_proxyProtocolACL.addMask("::0/0"); + auto proxyPayload = makeProxyHeader(true, ComboAddress("192.0.2.1"), ComboAddress("192.0.2.2"), {}); + BOOST_REQUIRE_GT(proxyPayload.size(), s_proxyProtocolMinimumHeaderSize); + s_readBuffer = query; + // preprend the proxy protocol payload + s_readBuffer.insert(s_readBuffer.begin(), proxyPayload.begin(), proxyPayload.end()); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, s_proxyProtocolMinimumHeaderSize }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, proxyPayload.size() - s_proxyProtocolMinimumHeaderSize - 1}, + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + + /* mark the incoming FD as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(threadData.mplexer->run(&now), 0); + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredReadConns = threadData.mplexer->getTimeouts(later, false); + for (const auto& cbData : expiredReadConns) { + BOOST_CHECK_EQUAL(cbData.first, state->d_handler.getDescriptor()); + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + BOOST_CHECK_EQUAL(cbData.first, cbState->d_handler.getDescriptor()); + cbState->handleTimeout(cbState, false); + } + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + } +} + +BOOST_AUTO_TEST_CASE(test_IncomingConnection_BackendNoOOOR) +{ + auto local = getBackendAddress("1", 80); + ClientState localCS(local, true, false, false, "", {}); + auto tlsCtx = std::make_shared<MockupTLSCtx>(); + localCS.tlsFrontend = std::make_shared<TLSFrontend>(tlsCtx); + + TCPClientThreadData threadData; + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); + + struct timeval now; + gettimeofday(&now, nullptr); + + PacketBuffer query; + GenericDNSPacketWriter<PacketBuffer> pwQ(query, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwQ.getHeader()->rd = 1; + pwQ.getHeader()->id = 0; + + auto shortQuery = query; + shortQuery.resize(sizeof(dnsheader) - 1); + uint16_t shortQuerySize = static_cast<uint16_t>(shortQuery.size()); + const uint8_t shortSizeBytes[] = { static_cast<uint8_t>(shortQuerySize / 256), static_cast<uint8_t>(shortQuerySize % 256) }; + shortQuery.insert(shortQuery.begin(), shortSizeBytes, shortSizeBytes + 2); + + uint16_t querySize = static_cast<uint16_t>(query.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(querySize / 256), static_cast<uint8_t>(querySize % 256) }; + query.insert(query.begin(), sizeBytes, sizeBytes + 2); + + auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + backend->d_tlsCtx = tlsCtx; + + { + /* pass to backend, backend answers right away, client closes the connection */ + TEST_INIT("=> Query to backend, backend answers right away"); + s_readBuffer = query; + + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size()); + BOOST_CHECK(s_writeBuffer == query); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK(s_backendWriteBuffer == query); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* pass to backend, backend answers right away, exception while handling the response */ + TEST_INIT("=> Exception while handling the response sent by the backend"); + s_readBuffer = query; + + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + throw std::runtime_error("Unexpected error while processing the response"); + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK(s_backendWriteBuffer == query); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* pass to backend, backend answers right away, processResponse() fails */ + TEST_INIT("=> Response processing fails "); + s_readBuffer = query; + + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return false; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK(s_backendWriteBuffer == query); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* pass to backend, backend answers right away, ID matching fails */ + TEST_INIT("=> ID matching fails "); + s_readBuffer = query; + + auto responsePacket = query; + /* mess with the transaction ID */ + responsePacket.at(3) ^= 42; + + s_backendReadBuffer = responsePacket; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK(s_backendWriteBuffer == query); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> Short (too short) query"); + s_readBuffer = shortQuery; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + s_processQuery = [](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + return ProcessQueryResult::SendAnswer; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> Short (too short) response from backend"); + s_readBuffer = query; + + s_backendReadBuffer = shortQuery; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connect in progress, short write to the backend, short read from the backend, client */ + TEST_INIT("=> Short read and write to backend"); + s_readBuffer = query; + // append a second query + appendPayloadEditingID(s_readBuffer, query, 1); + + s_backendReadBuffer = query; + // append a second query + appendPayloadEditingID(s_backendReadBuffer, query, 1); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* connect to backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::NeedWrite, 0, [&threadData](int desc) { + /* set the outgoing descriptor (backend connection) as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } + }, + /* send query */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::NeedWrite, 1 }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() - 1 }, + /* read response */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 1 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 1 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, query.size() - 3 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 1 }, + /* write response to client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, query.size() - 1 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 1 }, + /* read second query */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* write second query to backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + /* read second response */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* write second response */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() }, + /* read from client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* close connection to client */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* close connection to the backend, eventually */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + /* set the incoming descriptor as ready! */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size() * 2U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * 2U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connection refused by the backend */ + TEST_INIT("=> Connection refused by the backend "); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend (5 tries by default) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* timeout from the backend (write) */ + TEST_INIT("=> Timeout from the backend (write) "); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend (retrying 5 times) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::NeedWrite }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + struct timeval later = now; + later.tv_sec += backend->d_config.tcpSendTimeout + 1; + auto expiredWriteConns = threadData.mplexer->getTimeouts(later, true); + BOOST_CHECK_EQUAL(expiredWriteConns.size(), 1U); + for (const auto& cbData : expiredWriteConns) { + if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { + auto cbState = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); + cbState->handleTimeout(later, true); + } + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* timeout from the backend (read) */ + TEST_INIT("=> Timeout from the backend (read) "); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + struct timeval later = now; + later.tv_sec += backend->d_config.tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { + auto cbState = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); + cbState->handleTimeout(later, false); + } + } + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connection closed from the backend (write) */ + TEST_INIT("=> Connection closed from the backend (write) "); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend, connection closed on first write (5 attempts) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connection closed from the backend (write) 4 times then succeeds */ + TEST_INIT("=> Connection closed from the backend (write) 4 times then succeeds"); + s_readBuffer = query; + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend, connection closed on first write (5 attempts) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + /* reading the response */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* send the response to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() }, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* then eventually the backend one */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size()); + BOOST_CHECK(s_writeBuffer == query); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> connection closed by the backend on write, then refused"); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend, connection closed on first write */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* and now reconnection fails (1) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* 2 */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* 3 */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* 4 */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int descriptor) { + throw NetworkError("Connection refused by the backend"); + } + }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connection closed from the backend (read) */ + TEST_INIT("=> Connection closed from the backend (read) "); + s_readBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend, connection closed on read, 5 attempts, last one succeeds */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_config.d_retries); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + /* connection closed from the backend (read) 4 times then succeeds */ + TEST_INIT("=> Connection closed from the backend (read) 4 times then succeeds "); + s_readBuffer = query; + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend, connection closed on read, 5 attempts, last one succeeds */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* this time it works */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* sending the response to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() }, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* the eventually the backend one */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size()); + BOOST_CHECK(s_writeBuffer == query); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size() * backend->d_config.d_retries); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> Connection closed by the client when trying to send the response received from the backend"); + s_readBuffer = query; + s_backendReadBuffer = query; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + /* sending the response to the client, the connection has been closed */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* and eventually the backend one */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), query.size()); + BOOST_CHECK(s_backendWriteBuffer == query); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { +#if 1 + /* 101 queries on the same connection, check that the maximum number of queries kicks in */ + TEST_INIT("=> 101 queries on the same connection"); + + g_maxTCPQueriesPerConn = 100; + + size_t count = 101; + + s_readBuffer = query; + + for (size_t idx = 0; idx < count; idx++) { + appendPayloadEditingID(s_readBuffer, query, idx); + appendPayloadEditingID(s_backendReadBuffer, query, idx); + } + + s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() + 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }, + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() + 2 } + }; + + for (size_t idx = 0; idx < count - 1; idx++) { + /* read a new query */ + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }); + /* pass it to the backend */ + s_steps.push_back({ ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, query.size() + 2 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }); + s_steps.push_back({ ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, query.size() - 2 }); + /* send the response */ + s_steps.push_back({ ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, query.size() + 2 }); + }; + /* close the connection with the backend */ + s_steps.push_back({ ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }); + /* close the connection with the client */ + s_steps.push_back({ ExpectedStep::ExpectedRequest::closeClient, IOState::Done }); + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(s_writeBuffer.size(), query.size() * count); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + + g_maxTCPQueriesPerConn = 0; +#endif + } + + { + /* 2 queries on the same connection, asynchronously handled, check that we only read the first one (no OOOR as maxInFlight is 0) */ + TEST_INIT("=> 2 queries on the same connection, async"); + + size_t count = 2; + + s_readBuffer = query; + + for (size_t idx = 0; idx < count; idx++) { + appendPayloadEditingID(s_readBuffer, query, idx); + appendPayloadEditingID(s_backendReadBuffer, query, idx); + } + + s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, query.size() - 2 }, + /* close the connection with the client */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done } + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + dq.asynchronous = true; + /* note that we do nothing with the query, we just tell the frontend it was dealt with */ + return ProcessQueryResult::Asynchronous; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } +} + +BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendOOOR) +{ + auto local = getBackendAddress("1", 80); + ClientState localCS(local, true, false, false, "", {}); + /* enable out-of-order on the front side */ + localCS.d_maxInFlightQueriesPerConn = 65536; + + auto tlsCtx = std::make_shared<MockupTLSCtx>(); + localCS.tlsFrontend = std::make_shared<TLSFrontend>(tlsCtx); + + ConnectionInfo connInfo(&localCS); + connInfo.remote = getBackendAddress("84", 4242); + + auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + backend->d_tlsCtx = tlsCtx; + /* enable out-of-order on the backend side as well */ + backend->d_config.d_maxInFlightQueriesPerConn = 65536; + /* shorter than the client one */ + backend->d_config.tcpRecvTimeout = 1; + + TCPClientThreadData threadData; + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); + + struct timeval now; + gettimeofday(&now, nullptr); + + std::vector<PacketBuffer> queries(5); + std::vector<PacketBuffer> responses(5); + + size_t counter = 0; + size_t totalQueriesSize = 0; + for (auto& query : queries) { + GenericDNSPacketWriter<PacketBuffer> pwQ(query, DNSName("powerdns" + std::to_string(counter) + ".com."), QType::A, QClass::IN, 0); + pwQ.getHeader()->rd = 1; + pwQ.getHeader()->id = htons(counter); + uint16_t querySize = static_cast<uint16_t>(query.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(querySize / 256), static_cast<uint8_t>(querySize % 256) }; + query.insert(query.begin(), sizeBytes, sizeBytes + 2); + totalQueriesSize += query.size(); + ++counter; + } + + counter = 0; + size_t totalResponsesSize = 0; + for (auto& response : responses) { + DNSName name("powerdns" + std::to_string(counter) + ".com."); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->rd = 1; + pwR.getHeader()->ra = 1; + pwR.getHeader()->id = htons(counter); + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + totalResponsesSize += response.size(); + ++counter; + } + + { + TEST_INIT("=> 5 OOOR queries to the backend, backend responds in reverse order"); + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + uint16_t backendCounter = 0; + for (const auto& query : queries) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + appendPayloadEditingID(expectedBackendWriteBuffer, query, backendCounter++); + } + + backendCounter = 0; + for (const auto& response : responses) { + /* reverse order */ + prependPayloadEditingID(s_backendReadBuffer, response, backendCounter++); + expectedWriteBuffer.insert(expectedWriteBuffer.begin(), response.begin(), response.end()); + } + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (4) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(3).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(3).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (5) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + /* no response ready yet, but the backend becomes ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* set the outgoing descriptor (backend connection) as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + + /* no more queries from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + + /* reading a response from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size() }, + + /* reading a response from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(3).size() }, + + /* reading a response from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(2).size() }, + + /* reading a response from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size() }, + + /* reading a response from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() }, + /* sending it to the client, the client descriptor becomes ready */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + /* set the incoming descriptor (client connection) as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + + /* client is closing the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), totalResponsesSize); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), totalQueriesSize); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> 3 queries sent to the backend, 1 self-answered, 1 new query sent to the backend which responds to the first query right away, then to the last one, then the connection to the backend times out"); + + // increase the client timeout for that test, we want the backend to timeout first + g_tcpRecvTimeout = 5; + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + for (const auto& query : queries) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + } + + uint16_t backendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(4), backendCounter++); + + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(4), 3); + + /* self-answered */ + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(3).begin(), responses.at(3).end()); + /* from backend */ + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(4).begin(), responses.at(4).end()); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (4) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(3).size() - 2 }, + /* sending the response right away (self-answered) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(3).size() }, + /* reading a query from the client (5) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2 }, + /* sending query to the backend (5) */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + /* reading a response from the backend (1) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + /* set the backend descriptor as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* sending it to the client (1) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + /* set the client descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* reading from the client (not ready) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* reading a response from the backend (5) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() }, + /* sending it to the client (5) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size() }, + + /* try to read from the backend but there is no answer ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData, &timeout](int desc) { + /* set the backend descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + timeout = true; + } }, + + /* A timeout occurs */ + + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend,&responses](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + static size_t count = 0; + if (count++ == 3) { + /* self answered */ + dq.getMutableData() = responses.at(3); + /* remove the length */ + dq.getMutableData().erase(dq.getMutableData().begin(), dq.getMutableData().begin() + 2); + + return ProcessQueryResult::SendAnswer; + } + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += backend->d_config.tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { + auto cbState = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); + cbState->handleTimeout(later, false); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + + // restore the client timeout + g_tcpRecvTimeout = 2; + } + + { + TEST_INIT("=> 1 query sent to the backend, short read from the backend, 1 new query arrives in the meantime, the first answer is sent to the client, then the second query is handled, a new one arrives but the connection to the backend dies on us, short write on a new connection, the last query arrives and both are answered"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + for (const auto& query : queries) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + } + for (const auto& response : responses) { + expectedWriteBuffer.insert(expectedWriteBuffer.end(), response.begin(), response.end()); + } + + uint16_t backendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), backendCounter); + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), backendCounter); + appendPayloadEditingID(s_backendReadBuffer, responses.at(1), backendCounter++); + + // new connection + backendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), backendCounter); + appendPayloadEditingID(s_backendReadBuffer, responses.at(2), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(3), backendCounter); + appendPayloadEditingID(s_backendReadBuffer, responses.at(3), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(4), backendCounter); + appendPayloadEditingID(s_backendReadBuffer, responses.at(4), backendCounter++); + + bool timeout = false; + int backendDesc; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* read response size and the beginning of the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 1, [&threadData](int desc) { + /* set the backend descriptor as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* trying to read an additional query, if any */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData](int desc) { + /* set the client descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* reading the remaining bytes of response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 3 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size() }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* the response (2) is already there */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + /* sending response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size(), [&threadData](int desc) { + /* set the client descriptor as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* reading a query from the client (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (3) to the backend, short write */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::NeedWrite, 1, [&threadData,&backendDesc](int desc) { + /* set the backend descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + backendDesc = desc; + /* but client is ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* reading a query from the client (4) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(3).size() - 2 }, + /* reading a query from the client (5) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2, [&threadData,&backendDesc](int desc) { + /* set the backend descriptor as ready now */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDesc); + } }, + /* nothing else to read from the client for now */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData](int desc) { + /* set the client descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* finishing sending the query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() - 1 }, + /* sending the query (4) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(3).size() }, + /* sending the query (5) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + /* reading a response from the backend (3) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() - 2 }, + /* sending it to the client (3) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(2).size() }, + /* reading a response from the backend (4) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() - 2 }, + /* sending it to the client (4) but short write */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, responses.at(3).size() - 1, [&threadData](int desc) { + /* set the client descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* reading a response from the backend (5) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2, [&threadData](int desc) { + /* set the client descriptor as ready to resume sending */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* resume sending it to the client (4) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, 1 }, + /* sending it to the client (5) */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size() }, + + /* nothing to read from the client, then timeout later */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData,&timeout](int desc) { + /* set the client descriptor as NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + timeout = true; + } }, + + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + cbState->handleTimeout(cbState, false); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> 1 query to the backend, second query from the client is dropped, backend times out"); + // useful to tests that we check that the client connection is alive in notifyAllQueriesFailed() + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + + // only the first query is passed to the backend + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* query is dropped, closing the connection to the client */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0, [&timeout](int desc) { + timeout = true; + } }, + /* closing a connection to the backend after a timeout */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + counter = 0; + s_processQuery = [backend,&counter](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + if (counter == 0) { + ++counter; + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + } + return ProcessQueryResult::Drop; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += backend->d_config.tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { + auto cbState = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); + cbState->handleTimeout(later, false); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> 1 query to the backend, second query from the client is dropped, backend sends the answer"); + // useful to tests that we check that the client connection is alive in handleResponse() + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + + // only the first query is passed to the backend + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + + s_backendReadBuffer.insert(s_backendReadBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + + int backendDescriptor = -1; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptor](int desc) { + backendDescriptor = desc; + } }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* query is dropped, closing the connection to the client */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0, [&threadData,&backendDescriptor](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptor); + } }, + /* reading the response to the first query from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + counter = 0; + s_processQuery = [backend,&counter](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + if (counter == 0) { + ++counter; + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + } + return ProcessQueryResult::Drop; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while ((threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> 2 queries to the backend, client times out, responses arrive and are delivered, we start reading from the client again"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(4).begin(), queries.at(4).end()); + + uint16_t backendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(4), backendCounter++); + + appendPayloadEditingID(s_backendReadBuffer, responses.at(1), 1); + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(4), 2); + + appendPayloadEditingID(expectedWriteBuffer, responses.at(1), 1); + appendPayloadEditingID(expectedWriteBuffer, responses.at(0), 0); + appendPayloadEditingID(expectedWriteBuffer, responses.at(4), 4); + + /* make sure that the backend's timeout is longer than the client's */ + backend->d_config.tcpRecvTimeout = 30; + + bool timeout = false; + int backendDescriptor = -1; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&timeout,&backendDescriptor](int desc) { + backendDescriptor = desc; + timeout = true; + } }, + /* nothing from the client either */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* the client times out, and we will set the backend descriptor to ready at that point */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + /* sending response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size() }, + /* reading the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + /* setting the client descriptor ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* try to read from the client again, get query (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2 }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* setting the backend descriptor NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* try to read from the client again, nothing yet */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData,&backendDescriptor](int desc) { + /* the client descriptor becomes NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + /* the backend one is ready, though */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backendDescriptor); + } }, + /* reading the response (3) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 }, + /* sending response (3) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size(), [&timeout](int desc) { + timeout = true; + } }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* client times out again, this time we close the connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + cbState->handleTimeout(cbState, false); + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptor); + } + } + timeout = false; + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + expiredConns = threadData.mplexer->getTimeouts(later, false); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + cbState->handleTimeout(cbState, false); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> 3 queries to the backend, the first 2 responses arrive and are queued (client write blocks), and the backend closes the connection before sending the last one"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(2).begin(), queries.at(2).end()); + + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), queries.at(2).begin(), queries.at(2).end()); + + s_backendReadBuffer.insert(s_backendReadBuffer.end(), responses.at(1).begin(), responses.at(1).end()); + s_backendReadBuffer.insert(s_backendReadBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + + expectedWriteBuffer = s_backendReadBuffer; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a third query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* no response ready yet but the backend descriptor becomes ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* nothing from the client either */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData](int desc) { + /* the client descriptor is NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* read the response (2) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + /* trying to send response (2) to the client but blocking */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::NeedWrite, 0 }, + /* reading the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + /* trying to read from the backend again, connection closes on us */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + /* so we close the connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* try opening a new connection to the backend, it fails (5) times */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [](int desc) { + throw NetworkError("Connection refused by the backend"); + } }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* try opening a new connection to the backend, it fails (5) times */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done,0, [](int desc) { + throw NetworkError("Connection refused by the backend"); + } }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* try opening a new connection to the backend, it fails (5) times */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done,0, [](int desc) { + throw NetworkError("Connection refused by the backend"); + } }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* try opening a new connection to the backend, it fails (5) times */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done,0, [](int desc) { + throw NetworkError("Connection refused by the backend"); + } }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* try opening a new connection to the backend, it fails (5) times */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done,0, [](int desc) { + throw NetworkError("Connection refused by the backend"); + } }, + /* closing a connection to the backend, client becomes ready */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done, 0, [&threadData](int desc) { + /* the client descriptor is ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* sending response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size() }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size() }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> AXFR"); + + PacketBuffer axfrQuery; + PacketBuffer secondQuery; + std::vector<PacketBuffer> axfrResponses(3); + PacketBuffer secondResponse; + + GenericDNSPacketWriter<PacketBuffer> pwAXFRQuery(axfrQuery, DNSName("powerdns.com."), QType::AXFR, QClass::IN, 0); + pwAXFRQuery.getHeader()->rd = 0; + pwAXFRQuery.getHeader()->id = 42; + uint16_t axfrQuerySize = static_cast<uint16_t>(axfrQuery.size()); + const uint8_t axfrQuerySizeBytes[] = { static_cast<uint8_t>(axfrQuerySize / 256), static_cast<uint8_t>(axfrQuerySize % 256) }; + axfrQuery.insert(axfrQuery.begin(), axfrQuerySizeBytes, axfrQuerySizeBytes + 2); + + const DNSName name("powerdns.com."); + { + /* first message */ + auto& response = axfrResponses.at(0); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* insert SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + { + /* second message */ + auto& response = axfrResponses.at(1); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + { + /* third message */ + auto& response = axfrResponses.at(2); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + /* final SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwSecondQuery(secondQuery, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondQuery.getHeader()->rd = 1; + pwSecondQuery.getHeader()->id = 84; + uint16_t secondQuerySize = static_cast<uint16_t>(secondQuery.size()); + const uint8_t secondQuerySizeBytes[] = { static_cast<uint8_t>(secondQuerySize / 256), static_cast<uint8_t>(secondQuerySize % 256) }; + secondQuery.insert(secondQuery.begin(), secondQuerySizeBytes, secondQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwSecondResponse(secondResponse, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondResponse.getHeader()->qr = 1; + pwSecondResponse.getHeader()->rd = 1; + pwSecondResponse.getHeader()->ra = 1; + pwSecondResponse.getHeader()->id = 84; + pwSecondResponse.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwSecondResponse.xfr32BitInt(0x01020304); + pwSecondResponse.commit(); + uint16_t responseSize = static_cast<uint16_t>(secondResponse.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + secondResponse.insert(secondResponse.begin(), sizeBytes, sizeBytes + 2); + } + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer = axfrQuery; + s_readBuffer.insert(s_readBuffer.end(), secondQuery.begin(), secondQuery.end()); + + uint16_t backendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, axfrQuery, backendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, secondQuery, backendCounter++); + + for (const auto& response : axfrResponses) { + appendPayloadEditingID(s_backendReadBuffer, response, 0); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), response.begin(), response.end()); + } + appendPayloadEditingID(s_backendReadBuffer, secondResponse, 1); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), secondResponse.begin(), secondResponse.end()); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, axfrQuery.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, axfrQuery.size() }, + /* no response ready yet, but setting the backend descriptor readable */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* no more query from the client for now */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 , [&threadData](int desc) { + /* the client descriptor becomes NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + } }, + /* read the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(0).size() - 2 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(0).size() }, + /* reading the response (2) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(1).size() - 2 }, + /* sending response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(1).size() }, + /* reading the response (3) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(2).size() - 2 }, + /* sending response (3) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(2).size(), [&threadData](int desc) { + /* the client descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* trying to read from the client, getting a second query */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, secondQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, secondQuery.size() }, + /* reading the response (4) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, secondResponse.size() - 2 }, + /* sending response (4) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, secondResponse.size() }, + /* trying to read from the client, getting EOF */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> Interrupted AXFR"); + + PacketBuffer axfrQuery; + PacketBuffer secondQuery; + std::vector<PacketBuffer> axfrResponses(3); + PacketBuffer secondResponse; + + auto proxyPayload = makeProxyHeader(true, getBackendAddress("84", 4242), local, {}); + BOOST_REQUIRE_GT(proxyPayload.size(), s_proxyProtocolMinimumHeaderSize); + + auto proxyEnabledBackend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + proxyEnabledBackend->d_tlsCtx = tlsCtx; + proxyEnabledBackend->d_config.useProxyProtocol = true; + + GenericDNSPacketWriter<PacketBuffer> pwAXFRQuery(axfrQuery, DNSName("powerdns.com."), QType::AXFR, QClass::IN, 0); + pwAXFRQuery.getHeader()->rd = 0; + pwAXFRQuery.getHeader()->id = 42; + uint16_t axfrQuerySize = static_cast<uint16_t>(axfrQuery.size()); + const uint8_t axfrQuerySizeBytes[] = { static_cast<uint8_t>(axfrQuerySize / 256), static_cast<uint8_t>(axfrQuerySize % 256) }; + axfrQuery.insert(axfrQuery.begin(), axfrQuerySizeBytes, axfrQuerySizeBytes + 2); + + const DNSName name("powerdns.com."); + { + /* first message */ + auto& response = axfrResponses.at(0); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* insert SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + { + /* second message */ + auto& response = axfrResponses.at(1); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + { + /* third message */ + auto& response = axfrResponses.at(2); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + /* final SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer = axfrQuery; + + uint16_t backendCounter = 0; + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end()); + appendPayloadEditingID(expectedBackendWriteBuffer, axfrQuery, backendCounter++); + + for (const auto& response : axfrResponses) { + appendPayloadEditingID(s_backendReadBuffer, response, 0); + } + expectedWriteBuffer.insert(expectedWriteBuffer.end(), axfrResponses.at(0).begin(), axfrResponses.at(0).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), axfrResponses.at(1).begin(), axfrResponses.at(1).end()); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, axfrQuery.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, proxyPayload.size() + axfrQuery.size() }, + /* no response ready yet, but setting the backend descriptor readable */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* no more query from the client for now */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 , [&threadData](int desc) { + /* the client descriptor becomes NOT ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(-1); + } }, + /* read the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(0).size() - 2 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(0).size() }, + /* reading the response (2) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, axfrResponses.at(1).size() - 2 }, + /* sending response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, axfrResponses.at(1).size() }, + /* reading the response (3) from the backend, get EOF!! */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = proxyEnabledBackend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> IXFR"); + + PacketBuffer firstQuery; + PacketBuffer ixfrQuery; + PacketBuffer secondQuery; + PacketBuffer firstResponse; + std::vector<PacketBuffer> ixfrResponses(1); + PacketBuffer secondResponse; + + { + GenericDNSPacketWriter<PacketBuffer> pwFirstQuery(firstQuery, DNSName("powerdns.com."), QType::SOA, QClass::IN, 0); + pwFirstQuery.getHeader()->rd = 1; + pwFirstQuery.getHeader()->id = 84; + uint16_t firstQuerySize = static_cast<uint16_t>(firstQuery.size()); + const uint8_t firstQuerySizeBytes[] = { static_cast<uint8_t>(firstQuerySize / 256), static_cast<uint8_t>(firstQuerySize % 256) }; + firstQuery.insert(firstQuery.begin(), firstQuerySizeBytes, firstQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwFirstResponse(firstResponse, DNSName("powerdns.com."), QType::SOA, QClass::IN, 0); + pwFirstResponse.getHeader()->qr = 1; + pwFirstResponse.getHeader()->rd = 1; + pwFirstResponse.getHeader()->ra = 1; + pwFirstResponse.getHeader()->id = 84; + pwFirstResponse.startRecord(DNSName("powerdns.com."), QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwFirstResponse.xfrName(g_rootdnsname, true); + pwFirstResponse.xfrName(g_rootdnsname, true); + pwFirstResponse.xfr32BitInt(3 /* serial */); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.xfr32BitInt(0); + pwFirstResponse.commit(); + + uint16_t responseSize = static_cast<uint16_t>(firstResponse.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + firstResponse.insert(firstResponse.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwIXFRQuery(ixfrQuery, DNSName("powerdns.com."), QType::IXFR, QClass::IN, 0); + pwIXFRQuery.getHeader()->rd = 0; + pwIXFRQuery.getHeader()->id = 42; + uint16_t ixfrQuerySize = static_cast<uint16_t>(ixfrQuery.size()); + const uint8_t ixfrQuerySizeBytes[] = { static_cast<uint8_t>(ixfrQuerySize / 256), static_cast<uint8_t>(ixfrQuerySize % 256) }; + ixfrQuery.insert(ixfrQuery.begin(), ixfrQuerySizeBytes, ixfrQuerySizeBytes + 2); + } + + const DNSName name("powerdns.com."); + { + /* first message */ + auto& response = ixfrResponses.at(0); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->id = 42; + + /* insert final SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* insert first SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(1 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* removals */ + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + /* additions */ + /* insert second SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(2 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + /* A record */ + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020305); + pwR.commit(); + /* done with 1 -> 2 */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(2 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* no removal */ + + /* additions */ + /* insert second SOA */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + /* actually no addition either */ + /* done */ + pwR.startRecord(name, QType::SOA, 3600, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfrName(g_rootdnsname, true); + pwR.xfrName(g_rootdnsname, true); + pwR.xfr32BitInt(3 /* serial */); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.xfr32BitInt(0); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwSecondQuery(secondQuery, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondQuery.getHeader()->rd = 1; + pwSecondQuery.getHeader()->id = 84; + uint16_t secondQuerySize = static_cast<uint16_t>(secondQuery.size()); + const uint8_t secondQuerySizeBytes[] = { static_cast<uint8_t>(secondQuerySize / 256), static_cast<uint8_t>(secondQuerySize % 256) }; + secondQuery.insert(secondQuery.begin(), secondQuerySizeBytes, secondQuerySizeBytes + 2); + } + + { + GenericDNSPacketWriter<PacketBuffer> pwSecondResponse(secondResponse, DNSName("powerdns.com."), QType::A, QClass::IN, 0); + pwSecondResponse.getHeader()->qr = 1; + pwSecondResponse.getHeader()->rd = 1; + pwSecondResponse.getHeader()->ra = 1; + pwSecondResponse.getHeader()->id = 84; + pwSecondResponse.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwSecondResponse.xfr32BitInt(0x01020304); + pwSecondResponse.commit(); + uint16_t responseSize = static_cast<uint16_t>(secondResponse.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + secondResponse.insert(secondResponse.begin(), sizeBytes, sizeBytes + 2); + } + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer = firstQuery; + s_readBuffer.insert(s_readBuffer.end(), ixfrQuery.begin(), ixfrQuery.end()); + s_readBuffer.insert(s_readBuffer.end(), secondQuery.begin(), secondQuery.end()); + + appendPayloadEditingID(expectedBackendWriteBuffer, firstQuery, 0); + appendPayloadEditingID(expectedBackendWriteBuffer, ixfrQuery, 1); + appendPayloadEditingID(expectedBackendWriteBuffer, secondQuery, 2); + + appendPayloadEditingID(s_backendReadBuffer, firstResponse, 0); + expectedWriteBuffer.insert(expectedWriteBuffer.begin(), firstResponse.begin(), firstResponse.end()); + for (const auto& response : ixfrResponses) { + appendPayloadEditingID(s_backendReadBuffer, response, 1); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), response.begin(), response.end()); + } + appendPayloadEditingID(s_backendReadBuffer, secondResponse, 2); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), secondResponse.begin(), secondResponse.end()); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, firstQuery.size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, firstQuery.size() }, + /* no response ready yet, but setting the backend descriptor readable */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* try to read a second query from the client, none yet */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* read the response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, firstResponse.size() - 2 }, + /* sending response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, firstResponse.size(), [&threadData](int desc) { + /* client descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, ixfrQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, ixfrQuery.size() }, + /* read the response (ixfr 1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, ixfrResponses.at(0).size() - 2 }, + /* sending response (ixfr 1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, ixfrResponses.at(0).size(), [&threadData](int desc) { + /* the client descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* trying to read from the client, getting a second query */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, secondQuery.size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, secondQuery.size() }, + /* reading the response (4) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, secondResponse.size() - 2 }, + /* sending response (4) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, secondResponse.size() }, + /* trying to read from the client, getting EOF */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } + + { + TEST_INIT("=> Outgoing proxy protocol, 3 queries to the backend, first response is sent, connection closed while reading the second one"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + auto proxyPayload = makeProxyHeader(true, getBackendAddress("84", 4242), local, {}); + BOOST_REQUIRE_GT(proxyPayload.size(), s_proxyProtocolMinimumHeaderSize); + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(2).begin(), queries.at(2).end()); + + auto proxyEnabledBackend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + proxyEnabledBackend->d_tlsCtx = tlsCtx; + /* enable out-of-order on the backend side as well */ + proxyEnabledBackend->d_config.d_maxInFlightQueriesPerConn = 65536; + proxyEnabledBackend->d_config.useProxyProtocol = true; + + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end()); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), 1); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), 2); + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end()); + /* we are using an unordered_map, so all bets are off here :-/ */ + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), 1); + + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 0); + /* after the reconnection */ + appendPayloadEditingID(s_backendReadBuffer, responses.at(1), 1); + appendPayloadEditingID(s_backendReadBuffer, responses.at(2), 0); + + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(1).begin(), responses.at(1).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(2).begin(), responses.at(2).end()); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, proxyPayload.size() + queries.at(0).size() }, + /* got the response */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() }, + /* sending the response (1) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size() }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* backend is not ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a third query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* backend is not ready yet, but the descriptor becomes ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + }}, + /* nothing from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* backend closes the connection on us */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 0 }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* opening a new connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, proxyPayload.size() + queries.at(1).size() }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* got the response for 2 */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() }, + /* sending the response (2) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size() }, + /* got the response for 3 */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() }, + /* sending the response (3) to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(2).size(), [&threadData](int desc) { + /* the client descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 }, + }; + + s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = proxyEnabledBackend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(proxyEnabledBackend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + /* we should have nothing to clear since the connection cannot be reused due to the Proxy Protocol payload */ + BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 0U); + } + + { + TEST_INIT("=> Outgoing proxy protocol, 3 queries to the backend, the client closes while sending the first response"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + auto proxyPayload = makeProxyHeader(true, getBackendAddress("84", 4242), local, {}); + BOOST_REQUIRE_GT(proxyPayload.size(), s_proxyProtocolMinimumHeaderSize); + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(2).begin(), queries.at(2).end()); + + auto proxyEnabledBackend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + proxyEnabledBackend->d_tlsCtx = tlsCtx; + /* enable out-of-order on the backend side as well */ + proxyEnabledBackend->d_config.d_maxInFlightQueriesPerConn = 65536; + proxyEnabledBackend->d_config.useProxyProtocol = true; + + expectedBackendWriteBuffer.insert(expectedBackendWriteBuffer.end(), proxyPayload.begin(), proxyPayload.end()); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), 1); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), 2); + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, proxyPayload.size() + queries.at(0).size() }, + /* we try to read the response, not ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* backend is not ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a third query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* backend is not ready yet, but the descriptor becomes ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* the backend descriptor becomes ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + }}, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 }, + }; + + s_processQuery = [proxyEnabledBackend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = proxyEnabledBackend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(proxyEnabledBackend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + /* we should have nothing to clear since the connection cannot be reused due to the Proxy Protocol payload */ + BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 0U); + } + + { + TEST_INIT("=> I/O error with the backend with queries not sent to the backend yet"); + + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(1).begin(), queries.at(1).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(2).begin(), queries.at(2).end()); + + /* make sure that the backend's timeout is shorter than the client's */ + backend->d_config.tcpConnectTimeout = 1; + g_tcpRecvTimeout = 5; + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* backend is not ready yet */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::NeedWrite, 0 }, + /* reading a second query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* reading a third query from the client */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2, [&timeout](int desc) { + timeout = true; + } }, + /* trying to read more from the client but nothing to read */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* closing the client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done, 0 }, + /* closing the backend connection */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done, 0 }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += backend->d_config.tcpConnectTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later, true); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { + auto cbState = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); + cbState->handleTimeout(later, true); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), 0U); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), 0U); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* restore */ + backend->d_config.tcpSendTimeout = 30; + g_tcpRecvTimeout = 2; + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + /* we have one connection to clear, no proxy protocol */ + BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 1U); + } + + { + TEST_INIT("=> 5 OOOR queries, backend only accepts two at a time"); + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + for (const auto& query : queries) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + } + + /* queries 0, 1 and 4 are sent to the first backend, 2 and 3 to the second */ + uint16_t firstBackendCounter = 0; + uint16_t secondBackendCounter = 0; + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), firstBackendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), firstBackendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), secondBackendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(3), secondBackendCounter++); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(4), firstBackendCounter++); + + firstBackendCounter = 0; + secondBackendCounter = 0; + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), firstBackendCounter++); + appendPayloadEditingID(s_backendReadBuffer, responses.at(1), firstBackendCounter++); + appendPayloadEditingID(s_backendReadBuffer, responses.at(2), secondBackendCounter++); + appendPayloadEditingID(s_backendReadBuffer, responses.at(4), firstBackendCounter++); + appendPayloadEditingID(s_backendReadBuffer, responses.at(3), secondBackendCounter++); + + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(1).begin(), responses.at(1).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(2).begin(), responses.at(2).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(4).begin(), responses.at(4).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(3).begin(), responses.at(3).end()); + + auto backend1 = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + backend1->d_tlsCtx = tlsCtx; + /* only two queries in flight! */ + backend1->d_config.d_maxInFlightQueriesPerConn = 2; + + int backend1Desc = -1; + int backend2Desc = -1; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend (1) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backend1Desc](int desc) { + backend1Desc = desc; + } }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* opening a connection to the SECOND backend (2) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backend2Desc](int desc) { + backend2Desc = desc; + } }, + /* sending query (3) to backend 2 */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (4) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(3).size() - 2 }, + /* sending query to the second backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(3).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* nothing more to read from the client at that moment */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&threadData, &backend1Desc](int desc) { + /* but the first backend becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend1Desc); + } }, + /* reading response (1) from the first backend (1) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + /* client becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* reading a query from the client (5) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2, [&threadData](int desc) { + /* client is not ready anymore */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* sending query (5) to the first backend (1) */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + /* no response ready yet, but the first backend becomes ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + /* set the outgoing descriptor (backend connection) as ready */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* trying to read from client, nothing yet */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + /* reading response (2) from the first backend (1) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size(), [&threadData,&backend1Desc,&backend2Desc](int desc) { + /* client is NOT readable, backend1 is not readable, backend 2 becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend1Desc); + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend2Desc); + } }, + /* reading response (3) from the second backend (2) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(2).size(), [&threadData,&backend1Desc,&backend2Desc](int desc) { + /* backend 2 is no longer readable, backend 1 becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend2Desc); + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend1Desc); + } }, + /* reading response (5) from the first backend (1) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size(), [&threadData,&backend1Desc,&backend2Desc](int desc) { + /* backend 1 is no longer readable, backend 2 becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend1Desc); + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backend2Desc); + } }, + /* reading response (4) from the second backend (2) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(3).size(), [&threadData,&backend2Desc](int desc) { + /* backend 2 is no longer readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(backend2Desc); + /* client becomes readable */ + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backends */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend1](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend1; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), totalResponsesSize); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), totalQueriesSize); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend1->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 2U); + } + + { + TEST_INIT("=> 2 OOOR queries to the backend with duplicated IDs"); + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + s_readBuffer.insert(s_readBuffer.end(), queries.at(0).begin(), queries.at(0).end()); + + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 1); + + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 1); + + appendPayloadEditingID(expectedWriteBuffer, responses.at(0), 0); + appendPayloadEditingID(expectedWriteBuffer, responses.at(0), 0); + + bool timeout = false; + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* sending query to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet, but mark the descriptor as ready */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(desc); + } }, + /* nothing more from the client either */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 }, + + /* reading response (1) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size()}, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size()}, + /* reading response (2) from the backend */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size(), [&threadData](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* sending it to the client. we don't have anything else to send to the client, no new query from it either, until we time out */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&timeout](int desc) { + timeout = true; + } }, + /* closing a connection to the backend */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + cbState->handleTimeout(cbState, false); + } + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), expectedWriteBuffer.size()); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), expectedBackendWriteBuffer.size()); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } +} + +BOOST_AUTO_TEST_CASE(test_IncomingConnectionOOOR_BackendNotOOOR) +{ + auto local = getBackendAddress("1", 80); + ClientState localCS(local, true, false, false, "", {}); + /* enable out-of-order on the front side */ + localCS.d_maxInFlightQueriesPerConn = 65536; + + auto tlsCtx = std::make_shared<MockupTLSCtx>(); + localCS.tlsFrontend = std::make_shared<TLSFrontend>(tlsCtx); + + auto backend = std::make_shared<DownstreamState>(getBackendAddress("42", 53)); + backend->d_tlsCtx = tlsCtx; + /* shorter than the client one */ + backend->d_config.tcpRecvTimeout = 1; + + TCPClientThreadData threadData; + threadData.mplexer = std::make_unique<MockupFDMultiplexer>(); + + struct timeval now; + gettimeofday(&now, nullptr); + + std::vector<PacketBuffer> queries(5); + std::vector<PacketBuffer> responses(5); + + size_t counter = 0; + size_t totalQueriesSize = 0; + for (auto& query : queries) { + GenericDNSPacketWriter<PacketBuffer> pwQ(query, DNSName("powerdns" + std::to_string(counter) + ".com."), QType::A, QClass::IN, 0); + pwQ.getHeader()->rd = 1; + pwQ.getHeader()->id = counter; + uint16_t querySize = static_cast<uint16_t>(query.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(querySize / 256), static_cast<uint8_t>(querySize % 256) }; + query.insert(query.begin(), sizeBytes, sizeBytes + 2); + totalQueriesSize += query.size(); + ++counter; + } + + counter = 0; + size_t totalResponsesSize = 0; + for (auto& response : responses) { + DNSName name("powerdns" + std::to_string(counter) + ".com."); + GenericDNSPacketWriter<PacketBuffer> pwR(response, name, QType::A, QClass::IN, 0); + pwR.getHeader()->qr = 1; + pwR.getHeader()->rd = 1; + pwR.getHeader()->ra = 1; + pwR.getHeader()->id = counter; + pwR.startRecord(name, QType::A, 7200, QClass::IN, DNSResourceRecord::ANSWER); + pwR.xfr32BitInt(0x01020304); + pwR.commit(); + + uint16_t responseSize = static_cast<uint16_t>(response.size()); + const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; + response.insert(response.begin(), sizeBytes, sizeBytes + 2); + totalResponsesSize += response.size(); + ++counter; + } + + { + TEST_INIT("=> 5 OOOR queries, we will need to open 5 backend connections"); + PacketBuffer expectedWriteBuffer; + PacketBuffer expectedBackendWriteBuffer; + + for (const auto& query : queries) { + s_readBuffer.insert(s_readBuffer.end(), query.begin(), query.end()); + } + + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(0), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(1), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(2), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(3), 0); + appendPayloadEditingID(expectedBackendWriteBuffer, queries.at(4), 0); + + appendPayloadEditingID(s_backendReadBuffer, responses.at(0), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(2), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(1), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(4), 0); + appendPayloadEditingID(s_backendReadBuffer, responses.at(3), 0); + + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(0).begin(), responses.at(0).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(2).begin(), responses.at(2).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(1).begin(), responses.at(1).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(4).begin(), responses.at(4).end()); + expectedWriteBuffer.insert(expectedWriteBuffer.end(), responses.at(3).begin(), responses.at(3).end()); + + std::vector<int> backendDescriptors = { -1, -1, -1, -1, -1 }; + + s_steps = { + { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + /* reading a query from the client (1) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + /* opening a connection to the backend (1) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptors](int desc) { + backendDescriptors.at(0) = desc; + } }, + /* sending query (1) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(0).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (2) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + /* opening a connection to the backend (2) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptors](int desc) { + backendDescriptors.at(1) = desc; + } }, + /* sending query (2) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(1).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (3) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(2).size() - 2 }, + /* opening a connection to the backend (3) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptors](int desc) { + backendDescriptors.at(2) = desc; + } }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(2).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (4) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(3).size() - 2 }, + /* opening a connection to the backend (4) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptors](int desc) { + backendDescriptors.at(3) = desc; + } }, + /* sending query (3) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(3).size() }, + /* no response ready yet */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0 }, + /* reading a query from the client (5) */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(4).size() - 2 }, + /* opening a connection to the backend (5) */ + { ExpectedStep::ExpectedRequest::connectToBackend, IOState::Done, 0, [&backendDescriptors](int desc) { + backendDescriptors.at(4) = desc; + } }, + /* sending query (5) to the backend */ + { ExpectedStep::ExpectedRequest::writeToBackend, IOState::Done, queries.at(4).size() }, + /* no response ready yet, client stops being readable, first backend has a response */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::NeedRead, 0, [&threadData,&backendDescriptors](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptors.at(0)); + } }, + /* trying to read from the client but nothing yet */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0 , [&threadData](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setNotReady(desc); + } }, + /* reading response (1) from the first backend (1) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(0).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(0).size(), [&threadData,&backendDescriptors](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptors.at(2)); + } }, + /* reading response (3) from the third backend (3) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(2).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(2).size(), [&threadData,&backendDescriptors](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptors.at(1)); + } }, + /* reading response (2) from the second backend (2) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(1).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(1).size(), [&threadData,&backendDescriptors](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptors.at(4)); + } }, + /* reading response (5) from the fifth backend (5) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(4).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(4).size(), [&threadData,&backendDescriptors](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(backendDescriptors.at(3)); + } }, + /* reading response (4) from the fourth backend (4) */ + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromBackend, IOState::Done, responses.at(3).size() - 2 }, + /* sending it to the client */ + { ExpectedStep::ExpectedRequest::writeToClient, IOState::Done, responses.at(3).size(), [&threadData](int desc) { + dynamic_cast<MockupFDMultiplexer*>(threadData.mplexer.get())->setReady(-1); + } }, + /* client closes the connection */ + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 0 }, + /* closing client connection */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done }, + /* closing a connection to the backends */ + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + { ExpectedStep::ExpectedRequest::closeBackend, IOState::Done }, + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + return ProcessQueryResult::PassToBackend; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0) { + threadData.mplexer->run(&now); + } + + BOOST_CHECK_EQUAL(s_writeBuffer.size(), totalResponsesSize); + BOOST_CHECK(s_writeBuffer == expectedWriteBuffer); + BOOST_CHECK_EQUAL(s_backendWriteBuffer.size(), totalQueriesSize); + BOOST_CHECK(s_backendWriteBuffer == expectedBackendWriteBuffer); + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + BOOST_CHECK_EQUAL(IncomingTCPConnectionState::clearAllDownstreamConnections(), 5U); + } + + { + /* 2 queries on the same connection, asynchronously handled, check that we only read all of them (OOOR as maxInFlight is 65535) */ + TEST_INIT("=> 2 queries on the same connection, async with OOOR"); + + size_t count = 2; + + s_readBuffer = queries.at(0); + + for (size_t idx = 0; idx < count; idx++) { + appendPayloadEditingID(s_readBuffer, queries.at(idx), idx); + appendPayloadEditingID(s_backendReadBuffer, queries.at(idx), idx); + } + + bool timeout = false; + s_steps = { { ExpectedStep::ExpectedRequest::handshakeClient, IOState::Done }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(0).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::Done, queries.at(1).size() - 2 }, + { ExpectedStep::ExpectedRequest::readFromClient, IOState::NeedRead, 0, [&timeout](int desc) { + timeout = true; + }}, + /* close the connection with the client */ + { ExpectedStep::ExpectedRequest::closeClient, IOState::Done } + }; + + s_processQuery = [backend](DNSQuestion& dq, std::shared_ptr<DownstreamState>& selectedBackend) -> ProcessQueryResult { + selectedBackend = backend; + dq.asynchronous = true; + /* note that we do nothing with the query, we just tell the frontend it was dealt with */ + return ProcessQueryResult::Asynchronous; + }; + s_processResponse = [](PacketBuffer& response, DNSResponse& dr, bool muted) -> bool { + return true; + }; + + auto state = std::make_shared<IncomingTCPConnectionState>(ConnectionInfo(&localCS, getBackendAddress("84", 4242)), threadData, now); + IncomingTCPConnectionState::handleIO(state, now); + while (!timeout && (threadData.mplexer->getWatchedFDCount(false) != 0 || threadData.mplexer->getWatchedFDCount(true) != 0)) { + threadData.mplexer->run(&now); + } + + struct timeval later = now; + later.tv_sec += g_tcpRecvTimeout + 1; + auto expiredConns = threadData.mplexer->getTimeouts(later); + BOOST_CHECK_EQUAL(expiredConns.size(), 1U); + for (const auto& cbData : expiredConns) { + if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { + auto cbState = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); + cbState->handleTimeout(cbState, false); + } + } + + BOOST_CHECK_EQUAL(backend->outstanding.load(), 0U); + + /* we need to clear them now, otherwise we end up with dangling pointers to the steps via the TLS context, etc */ + IncomingTCPConnectionState::clearAllDownstreamConnections(); + } +} + +BOOST_AUTO_TEST_SUITE_END(); |