summaryrefslogtreecommitdiffstats
path: root/xsk.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xsk.cc164
1 files changed, 97 insertions, 67 deletions
diff --git a/xsk.cc b/xsk.cc
index e507c0f..5851f1b 100644
--- a/xsk.cc
+++ b/xsk.cc
@@ -82,20 +82,21 @@ struct UmemEntryStatus
Status status{Status::Free};
};
-LockGuarded<std::unordered_map<uint64_t, UmemEntryStatus>> s_umems;
+LockGuarded<std::map<std::pair<void*, uint64_t>, UmemEntryStatus>> s_umems;
-void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus)
+void checkUmemIntegrity(const char* function, int line, std::shared_ptr<LockGuarded<vector<uint64_t>>> vect, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus)
{
auto umems = s_umems.lock();
- if (validStatuses.count(umems->at(offset).status) == 0) {
- std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast<int>(umems->at(offset).status) << ", expected: ";
+ auto& umemState = umems->at({vect.get(), offset});
+ if (validStatuses.count(umemState.status) == 0) {
+ std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status of " << (void*)vect.get() << ", " << offset << " is " << static_cast<int>(umemState.status) << ", expected: ";
for (const auto status : validStatuses) {
std::cerr << static_cast<int>(status) << " ";
}
std::cerr << std::endl;
abort();
}
- (*umems)[offset].status = newStatus;
+ umemState.status = newStatus;
}
}
#endif /* DEBUG_UMEM */
@@ -164,7 +165,7 @@ XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, c
#ifdef DEBUG_UMEM
{
auto umems = s_umems.lock();
- (*umems)[idx * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus();
+ (*umems)[{sharedEmptyFrameOffset.get(), idx * frameSize + XDP_PACKET_HEADROOM}] = UmemEntryStatus();
}
#endif /* DEBUG_UMEM */
}
@@ -275,7 +276,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo
void XskSocket::fillFq(uint32_t fillSize) noexcept
{
- {
+ if (uniqueEmptyFrameOffset.size() < fillSize) {
+ auto frames = sharedEmptyFrameOffset->lock();
+ const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size());
+ if (moveSize > 0) {
+ // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
+ uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
+ frames->resize(frames->size() - moveSize);
+ }
+ }
+ else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) {
// if we have less than holdThreshold frames in the shared queue (which might be an issue
// when the XskWorker needs empty frames), move frames from the unique container into the
// shared one. This might not be optimal right now.
@@ -290,7 +300,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept
}
}
- if (uniqueEmptyFrameOffset.size() < fillSize) {
+ fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size()));
+ if (fillSize == 0) {
+ auto frames = sharedEmptyFrameOffset->lock();
return;
}
@@ -303,7 +315,7 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept
for (; processed < toFill; processed++) {
*xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back();
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue);
#endif /* DEBUG_UMEM */
uniqueEmptyFrameOffset.pop_back();
}
@@ -351,7 +363,7 @@ void XskSocket::send(std::vector<XskPacket>& packets)
.len = packet.getFrameLen(),
.options = 0};
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
#endif /* DEBUG_UMEM */
queued++;
}
@@ -381,7 +393,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,performance-no-int-to-ptr)
XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, frameOffset(packet), {UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
#endif /* DEBUG_UMEM */
if (!packet.parse(false)) {
@@ -393,11 +405,13 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
}
}
catch (const std::exception& exp) {
- std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
+ ++failed;
+ ++processed;
break;
}
catch (...) {
- std::cerr << "Exception while processing the XSK RX queue" << std::endl;
+ ++failed;
+ ++processed;
break;
}
}
@@ -437,7 +451,7 @@ void XskSocket::recycle(size_t size) noexcept
for (; processed < completeSize; ++processed) {
uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++));
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
}
xsk_ring_cons__release(&cq, processed);
@@ -510,9 +524,8 @@ void XskSocket::markAsFree(const XskPacket& packet)
{
auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
-
uniqueEmptyFrameOffset.push_back(offset);
}
@@ -717,7 +730,7 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept
// IPV6
auto ipv6 = getIPv6Header();
std::swap(ipv6.daddr, ipv6.saddr);
- assert(ipv6.nexthdr == IPPROTO_UDP);
+ ipv6.nexthdr = IPPROTO_UDP;
auto udp = getUDPHeader();
std::swap(udp.dest, udp.source);
@@ -726,16 +739,18 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept
/* needed to get the correct checksum */
setIPv6Header(ipv6);
setUDPHeader(udp);
- udp.check = tcp_udp_v6_checksum(&ipv6);
+ // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP
+ // implementations do the same
+ // udp.check = tcp_udp_v6_checksum(&ipv6);
rewriteIpv6Header(&ipv6, getFrameLen());
setIPv6Header(ipv6);
setUDPHeader(udp);
}
- else {
+ else if (ethHeader.h_proto == htons(ETH_P_IP)) {
// IPV4
auto ipv4 = getIPv4Header();
std::swap(ipv4.daddr, ipv4.saddr);
- assert(ipv4.protocol == IPPROTO_UDP);
+ ipv4.protocol = IPPROTO_UDP;
auto udp = getUDPHeader();
std::swap(udp.dest, udp.source);
@@ -744,7 +759,9 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept
/* needed to get the correct checksum */
setIPv4Header(ipv4);
setUDPHeader(udp);
- udp.check = tcp_udp_v4_checksum(&ipv4);
+ // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP
+ // implementations do the same
+ // udp.check = tcp_udp_v4_checksum(&ipv4);
rewriteIpv4Header(&ipv4, getFrameLen());
setIPv4Header(ipv4);
setUDPHeader(udp);
@@ -844,29 +861,24 @@ void XskWorker::notify(int desc)
}
}
-XskWorker::XskWorker() :
- workerWaker(createEventfd()), xskSocketWaker(createEventfd())
+XskWorker::XskWorker(XskWorker::Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames) :
+ d_sharedEmptyFrameOffset(frames), d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd())
{
}
void XskWorker::pushToProcessingQueue(XskPacket& packet)
{
-#if defined(__SANITIZE_THREAD__)
- if (!incomingPacketsQueue.lock()->push(packet)) {
-#else
- if (!incomingPacketsQueue.push(packet)) {
-#endif
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker");
+ }
+ if (!d_incomingPacketsQueue.push(packet)) {
markAsFree(packet);
}
}
void XskWorker::pushToSendQueue(XskPacket& packet)
{
-#if defined(__SANITIZE_THREAD__)
- if (!outgoingPacketsQueue.lock()->push(packet)) {
-#else
- if (!outgoingPacketsQueue.push(packet)) {
-#endif
+ if (!d_outgoingPacketsQueue.push(packet)) {
markAsFree(packet);
}
}
@@ -901,9 +913,9 @@ void XskPacket::rewrite() noexcept
auto ipHeader = getIPv4Header();
ipHeader.daddr = to.sin4.sin_addr.s_addr;
ipHeader.saddr = from.sin4.sin_addr.s_addr;
+ ipHeader.protocol = IPPROTO_UDP;
auto udpHeader = getUDPHeader();
- ipHeader.protocol = IPPROTO_UDP;
udpHeader.source = from.sin4.sin_port;
udpHeader.dest = to.sin4.sin_port;
udpHeader.len = htons(getDataSize() + sizeof(udpHeader));
@@ -911,7 +923,9 @@ void XskPacket::rewrite() noexcept
/* needed to get the correct checksum */
setIPv4Header(ipHeader);
setUDPHeader(udpHeader);
- udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
+ // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP
+ // implementations do the same
+ // udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
rewriteIpv4Header(&ipHeader, getFrameLen());
setIPv4Header(ipHeader);
setUDPHeader(udpHeader);
@@ -922,9 +936,9 @@ void XskPacket::rewrite() noexcept
auto ipHeader = getIPv6Header();
memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr));
memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr));
+ ipHeader.nexthdr = IPPROTO_UDP;
auto udpHeader = getUDPHeader();
- ipHeader.nexthdr = IPPROTO_UDP;
udpHeader.source = from.sin6.sin6_port;
udpHeader.dest = to.sin6.sin6_port;
udpHeader.len = htons(getDataSize() + sizeof(udpHeader));
@@ -932,7 +946,9 @@ void XskPacket::rewrite() noexcept
/* needed to get the correct checksum */
setIPv6Header(ipHeader);
setUDPHeader(udpHeader);
- udpHeader.check = tcp_udp_v6_checksum(&ipHeader);
+ // do not bother setting the UDP checksum: 0 is a valid value and most AF_XDP
+ // implementations do the same
+ // udpHeader.check = tcp_udp_v6_checksum(&ipHeader);
setIPv6Header(ipHeader);
setUDPHeader(udpHeader);
}
@@ -1119,15 +1135,15 @@ void XskWorker::notifyXskSocket() const
notify(xskSocketWaker);
}
-std::shared_ptr<XskWorker> XskWorker::create()
+std::shared_ptr<XskWorker> XskWorker::create(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames)
{
- return std::make_shared<XskWorker>();
+ return std::make_shared<XskWorker>(type, frames);
}
void XskSocket::addWorker(std::shared_ptr<XskWorker> worker)
{
const auto socketWaker = worker->xskSocketWaker.getHandle();
- worker->umemBufBase = umem.bufBase;
+ worker->setUmemBufBase(umem.bufBase);
d_workers.insert({socketWaker, std::move(worker)});
fds.push_back(pollfd{
.fd = socketWaker,
@@ -1145,9 +1161,14 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest)
d_workerRoutes.lock()->erase(dest);
}
+void XskWorker::setUmemBufBase(uint8_t* base)
+{
+ d_umemBufBase = base;
+}
+
uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
{
- return packet.getFrameOffsetFrom(umemBufBase);
+ return packet.getFrameOffsetFrom(d_umemBufBase);
}
void XskWorker::notifyWorker() const
@@ -1155,6 +1176,29 @@ void XskWorker::notifyWorker() const
notify(workerWaker);
}
+bool XskWorker::hasIncomingFrames()
+{
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+ }
+
+ return d_incomingPacketsQueue.read_available() != 0U;
+}
+
+void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+ }
+
+ d_incomingPacketsQueue.consume_all(callback);
+}
+
+void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+ d_outgoingPacketsQueue.consume_all(callback);
+}
+
void XskSocket::getMACFromIfName()
{
ifreq ifr{};
@@ -1213,42 +1257,28 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
return fds;
}
-void XskWorker::fillUniqueEmptyOffset()
-{
- auto frames = sharedEmptyFrameOffset->lock();
- const auto moveSize = std::min(static_cast<size_t>(32), frames->size());
- if (moveSize > 0) {
- // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
- uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
- frames->resize(frames->size() - moveSize);
- }
-}
-
std::optional<XskPacket> XskWorker::getEmptyFrame()
{
- if (!uniqueEmptyFrameOffset.empty()) {
- auto offset = uniqueEmptyFrameOffset.back();
- uniqueEmptyFrameOffset.pop_back();
- // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
- return XskPacket(offset + umemBufBase, 0, frameSize);
+ auto frames = d_sharedEmptyFrameOffset->lock();
+ if (frames->empty()) {
+ return std::nullopt;
}
- fillUniqueEmptyOffset();
- if (!uniqueEmptyFrameOffset.empty()) {
- auto offset = uniqueEmptyFrameOffset.back();
- uniqueEmptyFrameOffset.pop_back();
- // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
- return XskPacket(offset + umemBufBase, 0, frameSize);
- }
- return std::nullopt;
+ auto offset = frames->back();
+ frames->pop_back();
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ return XskPacket(offset + d_umemBufBase, 0, d_frameSize);
}
void XskWorker::markAsFree(const XskPacket& packet)
{
auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, d_sharedEmptyFrameOffset, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
- uniqueEmptyFrameOffset.push_back(offset);
+ {
+ auto frames = d_sharedEmptyFrameOffset->lock();
+ frames->push_back(offset);
+ }
}
uint32_t XskPacket::getFlags() const noexcept