diff options
Diffstat (limited to '')
-rw-r--r-- | xsk.cc | 164 |
1 files changed, 97 insertions, 67 deletions
@@ -82,20 +82,21 @@ struct UmemEntryStatus Status status{Status::Free}; }; -LockGuarded<std::unordered_map<uint64_t, UmemEntryStatus>> s_umems; +LockGuarded<std::map<std::pair<void*, uint64_t>, UmemEntryStatus>> s_umems; -void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus) +void checkUmemIntegrity(const char* function, int line, std::shared_ptr<LockGuarded<vector<uint64_t>>> vect, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus) { auto umems = s_umems.lock(); - if (validStatuses.count(umems->at(offset).status) == 0) { - std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast<int>(umems->at(offset).status) << ", expected: "; + auto& umemState = umems->at({vect.get(), offset}); + if (validStatuses.count(umemState.status) == 0) { + std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status of " << (void*)vect.get() << ", " << offset << " is " << static_cast<int>(umemState.status) << ", expected: "; for (const auto status : validStatuses) { std::cerr << static_cast<int>(status) << " "; } std::cerr << std::endl; abort(); } - (*umems)[offset].status = newStatus; + umemState.status = newStatus; } } #endif /* DEBUG_UMEM */ @@ -164,7 +165,7 @@ XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, c #ifdef DEBUG_UMEM { auto umems = s_umems.lock(); - (*umems)[idx * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus(); + (*umems)[{sharedEmptyFrameOffset.get(), idx * frameSize + XDP_PACKET_HEADROOM}] = UmemEntryStatus(); } #endif /* DEBUG_UMEM */ } @@ -275,7 +276,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo void XskSocket::fillFq(uint32_t fillSize) noexcept { - { + if (uniqueEmptyFrameOffset.size() < fillSize) { + auto frames = sharedEmptyFrameOffset->lock(); + const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size()); + if (moveSize > 0) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); + frames->resize(frames->size() - moveSize); + } + } + else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) { // if we have less than holdThreshold frames in the shared queue (which might be an issue // when the XskWorker needs empty frames), move frames from the unique container into the // shared one. This might not be optimal right now. @@ -290,7 +300,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept } } - if (uniqueEmptyFrameOffset.size() < fillSize) { + fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size())); + if (fillSize == 0) { + auto frames = sharedEmptyFrameOffset->lock(); return; } @@ -303,7 +315,7 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept for (; processed < toFill; processed++) { *xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back(); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue); #endif /* DEBUG_UMEM */ uniqueEmptyFrameOffset.pop_back(); } @@ -351,7 +363,7 @@ void XskSocket::send(std::vector<XskPacket>& packets) .len = packet.getFrameLen(), .options = 0}; #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); #endif /* DEBUG_UMEM */ queued++; } @@ -381,7 +393,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,performance-no-int-to-ptr) XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); #endif /* DEBUG_UMEM */ if (!packet.parse(false)) { @@ -393,11 +405,13 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou } } catch (const std::exception& exp) { - std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl; + ++failed; + ++processed; break; } catch (...) { - std::cerr << "Exception while processing the XSK RX queue" << std::endl; + ++failed; + ++processed; break; } } @@ -437,7 +451,7 @@ void XskSocket::recycle(size_t size) noexcept for (; processed < completeSize; ++processed) { uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++)); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ } xsk_ring_cons__release(&cq, processed); @@ -510,9 +524,8 @@ void XskSocket::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ - uniqueEmptyFrameOffset.push_back(offset); } @@ -717,7 +730,7 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept // IPV6 auto ipv6 = getIPv6Header(); std::swap(ipv6.daddr, ipv6.saddr); - assert(ipv6.nexthdr == IPPROTO_UDP); + ipv6.nexthdr = IPPROTO_UDP; auto udp = getUDPHeader(); std::swap(udp.dest, udp.source); @@ -726,16 +739,18 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept /* needed to get the correct checksum */ setIPv6Header(ipv6); setUDPHeader(udp); - udp.check = tcp_udp_v6_checksum(&ipv6); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udp.check = tcp_udp_v6_checksum(&ipv6); rewriteIpv6Header(&ipv6, getFrameLen()); setIPv6Header(ipv6); setUDPHeader(udp); } - else { + else if (ethHeader.h_proto == htons(ETH_P_IP)) { // IPV4 auto ipv4 = getIPv4Header(); std::swap(ipv4.daddr, ipv4.saddr); - assert(ipv4.protocol == IPPROTO_UDP); + ipv4.protocol = IPPROTO_UDP; auto udp = getUDPHeader(); std::swap(udp.dest, udp.source); @@ -744,7 +759,9 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept /* needed to get the correct checksum */ setIPv4Header(ipv4); setUDPHeader(udp); - udp.check = tcp_udp_v4_checksum(&ipv4); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udp.check = tcp_udp_v4_checksum(&ipv4); rewriteIpv4Header(&ipv4, getFrameLen()); setIPv4Header(ipv4); setUDPHeader(udp); @@ -844,29 +861,24 @@ void XskWorker::notify(int desc) } } -XskWorker::XskWorker() : - workerWaker(createEventfd()), xskSocketWaker(createEventfd()) +XskWorker::XskWorker(XskWorker::Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames) : + d_sharedEmptyFrameOffset(frames), d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd()) { } void XskWorker::pushToProcessingQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!incomingPacketsQueue.lock()->push(packet)) { -#else - if (!incomingPacketsQueue.push(packet)) { -#endif + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker"); + } + if (!d_incomingPacketsQueue.push(packet)) { markAsFree(packet); } } void XskWorker::pushToSendQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!outgoingPacketsQueue.lock()->push(packet)) { -#else - if (!outgoingPacketsQueue.push(packet)) { -#endif + if (!d_outgoingPacketsQueue.push(packet)) { markAsFree(packet); } } @@ -901,9 +913,9 @@ void XskPacket::rewrite() noexcept auto ipHeader = getIPv4Header(); ipHeader.daddr = to.sin4.sin_addr.s_addr; ipHeader.saddr = from.sin4.sin_addr.s_addr; + ipHeader.protocol = IPPROTO_UDP; auto udpHeader = getUDPHeader(); - ipHeader.protocol = IPPROTO_UDP; udpHeader.source = from.sin4.sin_port; udpHeader.dest = to.sin4.sin_port; udpHeader.len = htons(getDataSize() + sizeof(udpHeader)); @@ -911,7 +923,9 @@ void XskPacket::rewrite() noexcept /* needed to get the correct checksum */ setIPv4Header(ipHeader); setUDPHeader(udpHeader); - udpHeader.check = tcp_udp_v4_checksum(&ipHeader); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udpHeader.check = tcp_udp_v4_checksum(&ipHeader); rewriteIpv4Header(&ipHeader, getFrameLen()); setIPv4Header(ipHeader); setUDPHeader(udpHeader); @@ -922,9 +936,9 @@ void XskPacket::rewrite() noexcept auto ipHeader = getIPv6Header(); memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr)); memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr)); + ipHeader.nexthdr = IPPROTO_UDP; auto udpHeader = getUDPHeader(); - ipHeader.nexthdr = IPPROTO_UDP; udpHeader.source = from.sin6.sin6_port; udpHeader.dest = to.sin6.sin6_port; udpHeader.len = htons(getDataSize() + sizeof(udpHeader)); @@ -932,7 +946,9 @@ void XskPacket::rewrite() noexcept /* needed to get the correct checksum */ setIPv6Header(ipHeader); setUDPHeader(udpHeader); - udpHeader.check = tcp_udp_v6_checksum(&ipHeader); + // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP + // implementations do the same + // udpHeader.check = tcp_udp_v6_checksum(&ipHeader); setIPv6Header(ipHeader); setUDPHeader(udpHeader); } @@ -1119,15 +1135,15 @@ void XskWorker::notifyXskSocket() const notify(xskSocketWaker); } -std::shared_ptr<XskWorker> XskWorker::create() +std::shared_ptr<XskWorker> XskWorker::create(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames) { - return std::make_shared<XskWorker>(); + return std::make_shared<XskWorker>(type, frames); } void XskSocket::addWorker(std::shared_ptr<XskWorker> worker) { const auto socketWaker = worker->xskSocketWaker.getHandle(); - worker->umemBufBase = umem.bufBase; + worker->setUmemBufBase(umem.bufBase); d_workers.insert({socketWaker, std::move(worker)}); fds.push_back(pollfd{ .fd = socketWaker, @@ -1145,9 +1161,14 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest) d_workerRoutes.lock()->erase(dest); } +void XskWorker::setUmemBufBase(uint8_t* base) +{ + d_umemBufBase = base; +} + uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept { - return packet.getFrameOffsetFrom(umemBufBase); + return packet.getFrameOffsetFrom(d_umemBufBase); } void XskWorker::notifyWorker() const @@ -1155,6 +1176,29 @@ void XskWorker::notifyWorker() const notify(workerWaker); } +bool XskWorker::hasIncomingFrames() +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + return d_incomingPacketsQueue.read_available() != 0U; +} + +void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback) +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + d_incomingPacketsQueue.consume_all(callback); +} + +void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback) +{ + d_outgoingPacketsQueue.consume_all(callback); +} + void XskSocket::getMACFromIfName() { ifreq ifr{}; @@ -1213,42 +1257,28 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info) return fds; } -void XskWorker::fillUniqueEmptyOffset() -{ - auto frames = sharedEmptyFrameOffset->lock(); - const auto moveSize = std::min(static_cast<size_t>(32), frames->size()); - if (moveSize > 0) { - // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) - uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); - frames->resize(frames->size() - moveSize); - } -} - std::optional<XskPacket> XskWorker::getEmptyFrame() { - if (!uniqueEmptyFrameOffset.empty()) { - auto offset = uniqueEmptyFrameOffset.back(); - uniqueEmptyFrameOffset.pop_back(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - return XskPacket(offset + umemBufBase, 0, frameSize); + auto frames = d_sharedEmptyFrameOffset->lock(); + if (frames->empty()) { + return std::nullopt; } - fillUniqueEmptyOffset(); - if (!uniqueEmptyFrameOffset.empty()) { - auto offset = uniqueEmptyFrameOffset.back(); - uniqueEmptyFrameOffset.pop_back(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - return XskPacket(offset + umemBufBase, 0, frameSize); - } - return std::nullopt; + auto offset = frames->back(); + frames->pop_back(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + return XskPacket(offset + d_umemBufBase, 0, d_frameSize); } void XskWorker::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, d_sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ - uniqueEmptyFrameOffset.push_back(offset); + { + auto frames = d_sharedEmptyFrameOffset->lock(); + frames->push_back(offset); + } } uint32_t XskPacket::getFlags() const noexcept |