summaryrefslogtreecommitdiffstats
path: root/xsk.hh
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 10:29:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-08-26 10:29:59 +0000
commitfead91cfb1835d16049c615e1f1c87caa606ff06 (patch)
tree0d7fbbff7ff88b0873a99ead502d6ebb12beb6d6 /xsk.hh
parentAdding debian version 1.9.5-1. (diff)
downloaddnsdist-fead91cfb1835d16049c615e1f1c87caa606ff06.tar.xz
dnsdist-fead91cfb1835d16049c615e1f1c87caa606ff06.zip
Merging upstream version 1.9.6.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--xsk.hh61
1 files changed, 31 insertions, 30 deletions
diff --git a/xsk.hh b/xsk.hh
index 8d2b57d..2f3e7c3 100644
--- a/xsk.hh
+++ b/xsk.hh
@@ -58,7 +58,7 @@ using MACAddr = std::array<uint8_t, 6>;
// We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
// Once we have read the descriptors from the rx queue we release them, but we own the frames.
// After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
-// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).
+// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue).
// XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
// The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.
@@ -192,8 +192,10 @@ class XskPacket
public:
enum Flags : uint32_t
{
+ /* whether the payload has been modified */
UPDATE = 1 << 0,
DELAY = 1 << 1,
+ /* whether the headers have already been updated */
REWRITE = 1 << 2
};
@@ -234,6 +236,7 @@ private:
void setIPv6Header(const ipv6hdr& ipv6Header) noexcept;
[[nodiscard]] udphdr getUDPHeader() const noexcept;
void setUDPHeader(const udphdr& udpHeader) noexcept;
+ /* exchange the source and destination addresses (ethernet and IP) */
void changeDirectAndUpdateChecksum() noexcept;
constexpr static uint8_t DefaultTTL = 64;
@@ -250,10 +253,13 @@ public:
[[nodiscard]] PacketBuffer cloneHeaderToPacketBuffer() const;
void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept;
bool setPayload(const PacketBuffer& buf);
+ /* rewrite the headers, usually after setAddr() and setPayload() have been called */
void rewrite() noexcept;
void setHeader(PacketBuffer& buf);
XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
void addDelay(int relativeMilliseconds) noexcept;
+ /* if the payload have been updated, and the headers have not been rewritten, exchange the source
+ and destination addresses (ethernet and IP) and rewrite the headers */
void updatePacket() noexcept;
// parse IP and UDP payloads
bool parse(bool fromSetHeader);
@@ -269,47 +275,45 @@ public:
};
bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept;
-/* g++ defines __SANITIZE_THREAD__
- clang++ supports the nice __has_feature(thread_sanitizer),
- let's merge them */
-#if defined(__has_feature)
-#if __has_feature(thread_sanitizer)
-#define __SANITIZE_THREAD__ 1
-#endif
-#endif
-
// XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
// After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
// XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
class XskWorker
{
-#if defined(__SANITIZE_THREAD__)
- using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>;
-#else
- using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
-#endif
-
public:
+ enum class Type : uint8_t
+ {
+ OutgoingOnly,
+ Bidirectional
+ };
+
+private:
+ using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
// queue of packets to be processed by this worker
- XskPacketRing incomingPacketsQueue;
+ XskPacketRing d_incomingPacketsQueue;
// queue of packets processed by this worker (to be sent, or discarded)
- XskPacketRing outgoingPacketsQueue;
-
- uint8_t* umemBufBase{nullptr};
+ XskPacketRing d_outgoingPacketsQueue;
// list of frames that are shared with the XskRouter
- std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
- // list of frames that we own, used to generate new packets (health-check)
- vector<uint64_t> uniqueEmptyFrameOffset;
- const size_t frameSize{XskSocket::getFrameSize()};
+ std::shared_ptr<LockGuarded<vector<uint64_t>>> d_sharedEmptyFrameOffset;
+ uint8_t* d_umemBufBase{nullptr};
+ const size_t d_frameSize{XskSocket::getFrameSize()};
+ Type d_type;
+
+public:
FDWrapper workerWaker;
FDWrapper xskSocketWaker;
- XskWorker();
static int createEventfd();
static void notify(int desc);
- static std::shared_ptr<XskWorker> create();
+ static std::shared_ptr<XskWorker> create(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames);
+
+ XskWorker(Type type, const std::shared_ptr<LockGuarded<std::vector<uint64_t>>>& frames);
+ void setUmemBufBase(uint8_t* base);
void pushToProcessingQueue(XskPacket& packet);
void pushToSendQueue(XskPacket& packet);
+ bool hasIncomingFrames();
+ void processIncomingFrames(const std::function<void(XskPacket& packet)>& callback);
+ void processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback);
void markAsFree(const XskPacket& packet);
// notify worker that at least one packet is available for processing
void notifyWorker() const;
@@ -319,10 +323,7 @@ public:
void cleanWorkerNotification() const noexcept;
void cleanSocketNotification() const noexcept;
[[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
- // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
- void fillUniqueEmptyOffset();
- // look for an empty umem entry in uniqueEmptyFrameOffset
- // then sharedEmptyFrameOffset if needed
+ // get an empty umem entry from sharedEmptyFrameOffset
std::optional<XskPacket> getEmptyFrame();
};
std::vector<pollfd> getPollFdsForWorker(XskWorker& info);