summaryrefslogtreecommitdiffstats
path: root/xsk.hh
blob: 8d2b57d2d95c2379e4b18b2edf0c48520d050035 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/*
 * This file is part of PowerDNS or dnsdist.
 * Copyright -- PowerDNS.COM B.V. and its contributors
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of version 2 of the GNU General Public License as
 * published by the Free Software Foundation.
 *
 * In addition, for the avoidance of any doubt, permission is granted to
 * link this program with OpenSSL and to (re)distribute the binaries
 * produced as the result of such linking.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#pragma once
#include "config.h"

#ifdef HAVE_XSK
#include <array>
#include <bits/types/struct_timespec.h>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <cstdint>
#include <memory>
#include <poll.h>
#include <queue>
#include <stdexcept>
#include <string>
#include <unistd.h>
#include <unordered_map>
#include <vector>

#include <xdp/xsk.h>

#include "iputils.hh"
#include "lock.hh"
#include "misc.hh"
#include "noinitvector.hh"

class XskPacket;
class XskWorker;
class XskSocket;

using MACAddr = std::array<uint8_t, 6>;

// We use an XskSocket to manage an AF_XDP Socket corresponding to a NIC queue.
// The XDP program running in the kernel redirects the data to the XskSocket in userspace.
// We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
// Once we have read the descriptors from the rx queue we release them, but we own the frames.
// After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).

// XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
// The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.

class XskSocket
{
  struct XskUmem
  {
    xsk_umem* umem{nullptr};
    uint8_t* bufBase{nullptr};
    size_t size{0};
    void umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config);
    ~XskUmem();
    XskUmem() = default;
  };
  using WorkerContainer = std::unordered_map<int, std::shared_ptr<XskWorker>>;
  WorkerContainer d_workers;
  using WorkerRoutesMap = std::unordered_map<ComboAddress, std::shared_ptr<XskWorker>, ComboAddress::addressPortOnlyHash>;
  // it might be better to move to a StateHolder for performance
  LockGuarded<WorkerRoutesMap> d_workerRoutes;
  // number of frames to keep in sharedEmptyFrameOffset
  static constexpr size_t holdThreshold = 256;
  // number of frames to insert into the fill queue
  static constexpr size_t fillThreshold = 128;
  static constexpr size_t frameSize = 2048;
  // number of entries (frames) in the umem
  const size_t frameNum;
  // responses that have been delayed
  std::priority_queue<XskPacket> waitForDelay;
  MACAddr source{};
  const std::string ifName;
  // AF_XDP socket then worker waker sockets
  vector<pollfd> fds;
  // list of frames, aka (indexes of) umem entries that can be reused to fill fq,
  // collected from packets that we could not route (unknown destination),
  // could not parse, were dropped during processing (!UPDATE), or
  // simply recycled from cq after being processed by the kernel
  vector<uint64_t> uniqueEmptyFrameOffset;
  // completion ring: queue where sent packets are stored by the kernel
  xsk_ring_cons cq{};
  // rx ring: queue where the incoming packets are stored, read by XskRouter
  xsk_ring_cons rx{};
  // fill ring: queue where umem entries available to be filled (put into rx) are stored
  xsk_ring_prod fq{};
  // tx ring: queue where outgoing packets are stored
  xsk_ring_prod tx{};
  std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
  XskUmem umem;

  static constexpr uint32_t fqCapacity = XSK_RING_PROD__DEFAULT_NUM_DESCS * 4;
  static constexpr uint32_t cqCapacity = XSK_RING_CONS__DEFAULT_NUM_DESCS * 4;
  static constexpr uint32_t rxCapacity = XSK_RING_CONS__DEFAULT_NUM_DESCS * 2;
  static constexpr uint32_t txCapacity = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2;

  constexpr static bool isPowOfTwo(uint32_t value) noexcept;
  [[nodiscard]] static int timeDifference(const timespec& lhs, const timespec& rhs) noexcept;

  [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
  [[nodiscard]] int firstTimeout();
  void getMACFromIfName();

public:
  static void clearDestinationMap(const std::string& mapPath, bool isV6);
  static void addDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
  static void removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
  static constexpr size_t getFrameSize()
  {
    return frameSize;
  }
  // list of free umem entries that can be reused
  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
  XskSocket(size_t frameNum, std::string ifName, uint32_t queue_id, const std::string& xskMapPath);
  [[nodiscard]] int xskFd() const noexcept;
  // wait until one event has occurred
  [[nodiscard]] int wait(int timeout);
  // add as many packets as possible to the rx queue for sending */
  void send(std::vector<XskPacket>& packets);
  // look at incoming packets in rx, return them if parsing succeeeded
  [[nodiscard]] std::vector<XskPacket> recv(uint32_t recvSizeMax, uint32_t* failedCount);
  void addWorker(std::shared_ptr<XskWorker> worker);
  void addWorkerRoute(const std::shared_ptr<XskWorker>& worker, const ComboAddress& dest);
  void removeWorkerRoute(const ComboAddress& dest);
  [[nodiscard]] std::string getMetrics() const;
  [[nodiscard]] std::string getXDPMode() const;
  void markAsFree(const XskPacket& packet);
  [[nodiscard]] const std::shared_ptr<XskWorker>& getWorkerByDescriptor(int desc) const
  {
    return d_workers.at(desc);
  }
  [[nodiscard]] std::shared_ptr<XskWorker> getWorkerByDestination(const ComboAddress& destination)
  {
    auto routes = d_workerRoutes.lock();
    auto workerIt = routes->find(destination);
    if (workerIt == routes->end()) {
      return nullptr;
    }
    return workerIt->second;
  }
  [[nodiscard]] const std::vector<pollfd>& getDescriptors() const
  {
    return fds;
  }
  [[nodiscard]] MACAddr getSourceMACAddress() const
  {
    return source;
  }
  [[nodiscard]] const std::string& getInterfaceName() const
  {
    return ifName;
  }
  // pick ups available frames from uniqueEmptyFrameOffset
  // insert entries from uniqueEmptyFrameOffset into fq
  void fillFq(uint32_t fillSize = fillThreshold) noexcept;
  // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
  void recycle(size_t size) noexcept;
  // look at delayed packets, and send the ones that are ready
  void pickUpReadyPacket(std::vector<XskPacket>& packets);
  void pushDelayed(XskPacket& packet)
  {
    waitForDelay.push(packet);
  }
};

struct ethhdr;
struct iphdr;
struct ipv6hdr;
struct udphdr;

class XskPacket
{
public:
  enum Flags : uint32_t
  {
    UPDATE = 1 << 0,
    DELAY = 1 << 1,
    REWRITE = 1 << 2
  };

private:
  ComboAddress from;
  ComboAddress to;
  timespec sendTime{};
  uint8_t* frame{nullptr};
  size_t frameLength{0};
  size_t frameSize{0};
  uint32_t flags{0};
  bool v6{false};

  // You must set ipHeader.check = 0 before calling this method
  [[nodiscard]] static __be16 ipv4Checksum(const struct iphdr*) noexcept;
  [[nodiscard]] static uint64_t ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept;
  [[nodiscard]] static __be16 ip_checksum_fold(uint64_t sum) noexcept;
  [[nodiscard]] static uint64_t tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept;
  [[nodiscard]] static uint64_t tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept;
  static void rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept;
  static void rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept;

  // You must set l4Header.check = 0 before calling this method
  // ip options is not supported
  [[nodiscard]] __be16 tcp_udp_v4_checksum(const struct iphdr*) const noexcept;
  // You must set l4Header.check = 0 before calling this method
  [[nodiscard]] __be16 tcp_udp_v6_checksum(const struct ipv6hdr*) const noexcept;
  /* offset of the L4 (udphdr) header (after ethhdr and iphdr/ipv6hdr) */
  [[nodiscard]] size_t getL4HeaderOffset() const noexcept;
  /* offset of the data after the UDP header */
  [[nodiscard]] size_t getDataOffset() const noexcept;
  [[nodiscard]] size_t getDataSize() const noexcept;
  [[nodiscard]] ethhdr getEthernetHeader() const noexcept;
  void setEthernetHeader(const ethhdr& ethHeader) noexcept;
  [[nodiscard]] iphdr getIPv4Header() const noexcept;
  void setIPv4Header(const iphdr& ipv4Header) noexcept;
  [[nodiscard]] ipv6hdr getIPv6Header() const noexcept;
  void setIPv6Header(const ipv6hdr& ipv6Header) noexcept;
  [[nodiscard]] udphdr getUDPHeader() const noexcept;
  void setUDPHeader(const udphdr& udpHeader) noexcept;
  void changeDirectAndUpdateChecksum() noexcept;

  constexpr static uint8_t DefaultTTL = 64;

public:
  [[nodiscard]] const ComboAddress& getFromAddr() const noexcept;
  [[nodiscard]] const ComboAddress& getToAddr() const noexcept;
  [[nodiscard]] const void* getPayloadData() const;
  [[nodiscard]] bool isIPV6() const noexcept;
  [[nodiscard]] size_t getCapacity() const noexcept;
  [[nodiscard]] uint32_t getDataLen() const noexcept;
  [[nodiscard]] uint32_t getFrameLen() const noexcept;
  [[nodiscard]] PacketBuffer clonePacketBuffer() const;
  [[nodiscard]] PacketBuffer cloneHeaderToPacketBuffer() const;
  void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept;
  bool setPayload(const PacketBuffer& buf);
  void rewrite() noexcept;
  void setHeader(PacketBuffer& buf);
  XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
  void addDelay(int relativeMilliseconds) noexcept;
  void updatePacket() noexcept;
  // parse IP and UDP payloads
  bool parse(bool fromSetHeader);
  [[nodiscard]] uint32_t getFlags() const noexcept;
  [[nodiscard]] timespec getSendTime() const noexcept
  {
    return sendTime;
  }
  [[nodiscard]] uint64_t getFrameOffsetFrom(const uint8_t* base) const noexcept
  {
    return frame - base;
  }
};
bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept;

/* g++ defines __SANITIZE_THREAD__
   clang++ supports the nice __has_feature(thread_sanitizer),
   let's merge them */
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define __SANITIZE_THREAD__ 1
#endif
#endif

// XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
// After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
// XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
class XskWorker
{
#if defined(__SANITIZE_THREAD__)
  using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>;
#else
  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
#endif

public:
  // queue of packets to be processed by this worker
  XskPacketRing incomingPacketsQueue;
  // queue of packets processed by this worker (to be sent, or discarded)
  XskPacketRing outgoingPacketsQueue;

  uint8_t* umemBufBase{nullptr};
  // list of frames that are shared with the XskRouter
  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
  // list of frames that we own, used to generate new packets (health-check)
  vector<uint64_t> uniqueEmptyFrameOffset;
  const size_t frameSize{XskSocket::getFrameSize()};
  FDWrapper workerWaker;
  FDWrapper xskSocketWaker;

  XskWorker();
  static int createEventfd();
  static void notify(int desc);
  static std::shared_ptr<XskWorker> create();
  void pushToProcessingQueue(XskPacket& packet);
  void pushToSendQueue(XskPacket& packet);
  void markAsFree(const XskPacket& packet);
  // notify worker that at least one packet is available for processing
  void notifyWorker() const;
  // notify the router that packets are ready to be sent
  void notifyXskSocket() const;
  void waitForXskSocket() const noexcept;
  void cleanWorkerNotification() const noexcept;
  void cleanSocketNotification() const noexcept;
  [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
  // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
  void fillUniqueEmptyOffset();
  // look for an empty umem entry in uniqueEmptyFrameOffset
  // then sharedEmptyFrameOffset if needed
  std::optional<XskPacket> getEmptyFrame();
};
std::vector<pollfd> getPollFdsForWorker(XskWorker& info);
#else
class XskSocket
{
};
class XskPacket
{
};
class XskWorker
{
};

#endif /* HAVE_XSK */