summaryrefslogtreecommitdiffstats
path: root/src/filter
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-13 07:54:12 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-03-13 07:54:12 +0000
commit4754ed45b607e82450a5e31fea1da3ba61433b04 (patch)
tree3554490bdc003e6004f605abe41929cdf98b0651 /src/filter
parentInitial commit. (diff)
downloaddnsjit-4754ed45b607e82450a5e31fea1da3ba61433b04.tar.xz
dnsjit-4754ed45b607e82450a5e31fea1da3ba61433b04.zip
Adding upstream version 1.1.0+debian.upstream/1.1.0+debian
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/filter')
-rw-r--r--src/filter/copy.c192
-rw-r--r--src/filter/copy.h30
-rw-r--r--src/filter/copy.hh40
-rw-r--r--src/filter/copy.lua74
-rw-r--r--src/filter/ipsplit.c270
-rw-r--r--src/filter/ipsplit.h29
-rw-r--r--src/filter/ipsplit.hh61
-rw-r--r--src/filter/ipsplit.lua122
-rw-r--r--src/filter/layer.c689
-rw-r--r--src/filter/layer.h44
-rw-r--r--src/filter/layer.hh70
-rw-r--r--src/filter/layer.lua93
-rw-r--r--src/filter/split.c114
-rw-r--r--src/filter/split.h29
-rw-r--r--src/filter/split.hh50
-rw-r--r--src/filter/split.lua80
-rw-r--r--src/filter/timing.c557
-rw-r--r--src/filter/timing.h30
-rw-r--r--src/filter/timing.hh52
-rw-r--r--src/filter/timing.lua123
20 files changed, 2749 insertions, 0 deletions
diff --git a/src/filter/copy.c b/src/filter/copy.c
new file mode 100644
index 0000000..04a90ef
--- /dev/null
+++ b/src/filter/copy.c
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2019, CZ.NIC, z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+
+#include "filter/copy.h"
+#include "core/assert.h"
+
+static core_log_t _log = LOG_T_INIT("filter.copy");
+static filter_copy_t _defaults = {
+ LOG_T_INIT_OBJ("filter.copy"),
+ 0, 0,
+ 0
+};
+
+core_log_t* filter_copy_log()
+{
+ return &_log;
+}
+
+void filter_copy_init(filter_copy_t* self)
+{
+ mlassert_self();
+
+ *self = _defaults;
+}
+
+void filter_copy_destroy(filter_copy_t* self)
+{
+ mlassert_self();
+}
+
+void filter_copy_set(filter_copy_t* self, int32_t obj_type)
+{
+ mlassert_self();
+
+ switch (obj_type) {
+ case CORE_OBJECT_NONE:
+ self->copy |= 0x1;
+ break;
+ case CORE_OBJECT_PCAP:
+ self->copy |= 0x2;
+ break;
+ case CORE_OBJECT_ETHER:
+ self->copy |= 0x4;
+ break;
+ case CORE_OBJECT_NULL:
+ self->copy |= 0x8;
+ break;
+ case CORE_OBJECT_LOOP:
+ self->copy |= 0x10;
+ break;
+ case CORE_OBJECT_LINUXSLL:
+ self->copy |= 0x20;
+ break;
+ case CORE_OBJECT_IEEE802:
+ self->copy |= 0x40;
+ break;
+ case CORE_OBJECT_GRE:
+ self->copy |= 0x80;
+ break;
+ case CORE_OBJECT_IP:
+ self->copy |= 0x100;
+ break;
+ case CORE_OBJECT_IP6:
+ self->copy |= 0x200;
+ break;
+ case CORE_OBJECT_ICMP:
+ self->copy |= 0x400;
+ break;
+ case CORE_OBJECT_ICMP6:
+ self->copy |= 0x800;
+ break;
+ case CORE_OBJECT_UDP:
+ self->copy |= 0x1000;
+ break;
+ case CORE_OBJECT_TCP:
+ self->copy |= 0x2000;
+ break;
+ case CORE_OBJECT_PAYLOAD:
+ self->copy |= 0x4000;
+ break;
+ case CORE_OBJECT_DNS:
+ self->copy |= 0x8000;
+ break;
+ default:
+ lfatal("unknown type %d", obj_type);
+ }
+}
+
+uint64_t filter_copy_get(filter_copy_t* self, int32_t obj_type)
+{
+ mlassert_self();
+
+ switch (obj_type) {
+ case CORE_OBJECT_NONE:
+ return self->copy & 0x1;
+ case CORE_OBJECT_PCAP:
+ return self->copy & 0x2;
+ case CORE_OBJECT_ETHER:
+ return self->copy & 0x4;
+ case CORE_OBJECT_NULL:
+ return self->copy & 0x8;
+ case CORE_OBJECT_LOOP:
+ return self->copy & 0x10;
+ case CORE_OBJECT_LINUXSLL:
+ return self->copy & 0x20;
+ case CORE_OBJECT_IEEE802:
+ return self->copy & 0x40;
+ case CORE_OBJECT_GRE:
+ return self->copy & 0x80;
+ case CORE_OBJECT_IP:
+ return self->copy & 0x100;
+ case CORE_OBJECT_IP6:
+ return self->copy & 0x200;
+ case CORE_OBJECT_ICMP:
+ return self->copy & 0x400;
+ case CORE_OBJECT_ICMP6:
+ return self->copy & 0x800;
+ case CORE_OBJECT_UDP:
+ return self->copy & 0x1000;
+ case CORE_OBJECT_TCP:
+ return self->copy & 0x2000;
+ case CORE_OBJECT_PAYLOAD:
+ return self->copy & 0x4000;
+ case CORE_OBJECT_DNS:
+ return self->copy & 0x8000;
+ default:
+ lfatal("unknown type %d", obj_type);
+ }
+ return 0;
+}
+
+static void _receive(filter_copy_t* self, const core_object_t* obj)
+{
+ mlassert_self();
+ lassert(obj, "obj is nil");
+
+ core_object_t* outobj = NULL;
+ core_object_t* next = NULL;
+ core_object_t* current = NULL;
+ const core_object_t* srcobj = obj;
+
+ do {
+ if (filter_copy_get(self, srcobj->obj_type)) {
+ next = current;
+ current = core_object_copy(srcobj);
+ if (next == NULL) {
+ next = current;
+ outobj = current;
+ } else {
+ next->obj_prev = current;
+ }
+ }
+ srcobj = srcobj->obj_prev;
+ } while (srcobj != NULL);
+
+ if (outobj == NULL) {
+ lnotice("object discarded (no types to copy)");
+ return;
+ }
+
+ self->recv(self->recv_ctx, outobj);
+}
+
+core_receiver_t filter_copy_receiver(filter_copy_t* self)
+{
+ mlassert_self();
+
+ if (!self->recv) {
+ lfatal("no receiver(s) set");
+ }
+
+ return (core_receiver_t)_receive;
+}
diff --git a/src/filter/copy.h b/src/filter/copy.h
new file mode 100644
index 0000000..96c8db0
--- /dev/null
+++ b/src/filter/copy.h
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2019, CZ.NIC, z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "core/log.h"
+#include "core/object.h"
+#include "core/receiver.h"
+
+#ifndef __dnsjit_filter_copy_h
+#define __dnsjit_filter_copy_h
+
+#include "filter/copy.hh"
+
+#endif
diff --git a/src/filter/copy.hh b/src/filter/copy.hh
new file mode 100644
index 0000000..edf7fc7
--- /dev/null
+++ b/src/filter/copy.hh
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2019, CZ.NIC z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+//lua:require("dnsjit.core.log")
+//lua:require("dnsjit.core.receiver_h")
+
+typedef struct filter_copy {
+ core_log_t _log;
+
+ core_receiver_t recv;
+ void* recv_ctx;
+
+ uint64_t copy;
+} filter_copy_t;
+
+core_log_t* filter_copy_log();
+
+void filter_copy_init(filter_copy_t* self);
+void filter_copy_destroy(filter_copy_t* self);
+void filter_copy_set(filter_copy_t* self, int32_t obj_type);
+uint64_t filter_copy_get(filter_copy_t* self, int32_t obj_type);
+
+core_receiver_t filter_copy_receiver(filter_copy_t* self);
diff --git a/src/filter/copy.lua b/src/filter/copy.lua
new file mode 100644
index 0000000..4be469f
--- /dev/null
+++ b/src/filter/copy.lua
@@ -0,0 +1,74 @@
+-- Copyright (c) 2019 CZ.NIC, z.s.p.o.
+-- All rights reserved.
+--
+-- This file is part of dnsjit.
+--
+-- dnsjit is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License as published by
+-- the Free Software Foundation, either version 3 of the License, or
+-- (at your option) any later version.
+--
+-- dnsjit is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+
+-- dnsjit.filter.copy
+-- Creates a copy of an object chain with selected object types.
+-- local copy = require("dnsjit.filter.copy").new()
+-- local object = require("dnsjit.core.objects")
+-- copy:obj_type(object.PAYLOAD)
+-- copy:obj_type(object.IP6)
+-- channel:receiver(copy)
+--
+-- Filter to create a copy of the object chain with selected object types.
+-- The user is responsible for manually freeing the created object chain.
+module(...,package.seeall)
+
+require("dnsjit.filter.copy_h")
+local object = require("dnsjit.core.object")
+local ffi = require("ffi")
+local C = ffi.C
+
+local t_name = "filter_copy_t"
+local filter_copy_t = ffi.typeof(t_name)
+local Copy = {}
+
+-- Create a new Copy filter.
+function Copy.new()
+ local self = {
+ obj = filter_copy_t(),
+ }
+ C.filter_copy_init(self.obj)
+ ffi.gc(self.obj, C.filter_copy_destroy)
+ return setmetatable(self, { __index = Copy })
+end
+
+-- Return the Log object to control logging of this instance or module.
+function Copy:log()
+ if self == nil then
+ return C.filter_copy_log()
+ end
+ return self.obj._log
+end
+
+-- Set the object type to be copied. Can be called multiple times to copy
+-- multiple object types from the object chain.
+function Copy:obj_type(obj_type)
+ C.filter_copy_set(self.obj, obj_type)
+end
+
+-- Return the C functions and context for receiving objects.
+function Copy:receive()
+ return C.filter_copy_receiver(self.obj), self.obj
+end
+
+-- Set the receiver to pass objects to.
+function Copy:receiver(o)
+ self.obj.recv, self.obj.recv_ctx = o:receive()
+end
+
+return Copy
diff --git a/src/filter/ipsplit.c b/src/filter/ipsplit.c
new file mode 100644
index 0000000..8632e2e
--- /dev/null
+++ b/src/filter/ipsplit.c
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2019-2020, CZ.NIC, z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+
+#include "filter/ipsplit.h"
+#include "core/assert.h"
+#include "core/object/ip.h"
+#include "core/object/ip6.h"
+#include "lib/trie.h"
+
+#include <string.h>
+
+typedef struct _filter_ipsplit {
+ filter_ipsplit_t pub;
+
+ trie_t* trie;
+ uint32_t weight_total;
+} _filter_ipsplit_t;
+
+typedef struct _client {
+ /* Receiver-specific client ID (1..N) in host byte order. */
+ /* Client ID starts at 1 to avoid issues with lua. */
+ uint8_t id[4];
+
+ filter_ipsplit_recv_t* recv;
+} _client_t;
+
+#define _self ((_filter_ipsplit_t*)self)
+
+static core_log_t _log = LOG_T_INIT("filter.ipsplit");
+static filter_ipsplit_t _defaults = {
+ LOG_T_INIT_OBJ("filter.ipsplit"),
+ IPSPLIT_MODE_SEQUENTIAL, IPSPLIT_OVERWRITE_NONE,
+ 0,
+ NULL
+};
+
+core_log_t* filter_ipsplit_log()
+{
+ return &_log;
+}
+
+filter_ipsplit_t* filter_ipsplit_new()
+{
+ filter_ipsplit_t* self;
+
+ mlfatal_oom(self = malloc(sizeof(_filter_ipsplit_t)));
+ *self = _defaults;
+ lfatal_oom(_self->trie = trie_create(NULL));
+ _self->weight_total = 0;
+
+ return self;
+}
+
+static int _free_trie_value(trie_val_t* val, void* ctx)
+{
+ free(*val);
+ return 0;
+}
+
+void filter_ipsplit_free(filter_ipsplit_t* self)
+{
+ filter_ipsplit_recv_t* first;
+ filter_ipsplit_recv_t* r;
+ mlassert_self();
+
+ trie_apply(_self->trie, _free_trie_value, NULL);
+ trie_free(_self->trie);
+
+ if (self->recv) {
+ first = self->recv;
+ do {
+ r = self->recv->next;
+ free(self->recv);
+ self->recv = r;
+ } while (self->recv != first);
+ }
+
+ free(self);
+}
+
+void filter_ipsplit_add(filter_ipsplit_t* self, core_receiver_t recv, void* ctx, uint32_t weight)
+{
+ filter_ipsplit_recv_t* r;
+ mlassert_self();
+ lassert(recv, "recv is nil");
+ lassert(weight > 0, "weight must be positive integer");
+
+ _self->weight_total += weight;
+
+ lfatal_oom(r = malloc(sizeof(filter_ipsplit_recv_t)));
+ r->recv = recv;
+ r->ctx = ctx;
+ r->n_clients = 0;
+ r->weight = weight;
+
+ if (!self->recv) {
+ r->next = r;
+ self->recv = r;
+ } else {
+ r->next = self->recv->next;
+ self->recv->next = r;
+ }
+}
+
+/*
+ * Use portable pseudo-random number generator.
+ */
+static uint32_t _rand_val = 1;
+
+static uint32_t _rand()
+{
+ _rand_val = ((_rand_val * 1103515245) + 12345) & 0x7fffffff;
+ return _rand_val;
+}
+
+void filter_ipsplit_srand(uint32_t seed)
+{
+ _rand_val = seed;
+}
+
+static void _assign_client_to_receiver(filter_ipsplit_t* self, _client_t* client)
+{
+ uint32_t id = 0;
+ filter_ipsplit_recv_t* recv = 0;
+
+ switch (self->mode) {
+ case IPSPLIT_MODE_SEQUENTIAL:
+ recv = self->recv;
+ id = ++recv->n_clients;
+ /* When *weight* clients are assigned, switch to next receiver. */
+ if (recv->n_clients % recv->weight == 0)
+ self->recv = recv->next;
+ break;
+ case IPSPLIT_MODE_RANDOM: {
+ /* Get random number from [1, weight_total], then iterate through
+ * receivers until their weights add up to at least this value. */
+ int32_t random = (int32_t)(_rand() % _self->weight_total) + 1;
+ while (random > 0) {
+ random -= self->recv->weight;
+ if (random > 0)
+ self->recv = self->recv->next;
+ }
+ recv = self->recv;
+ id = ++recv->n_clients;
+ break;
+ }
+ default:
+ lfatal("invalid ipsplit mode");
+ }
+
+ client->recv = recv;
+ memcpy(client->id, &id, sizeof(client->id));
+}
+
+/*
+ * Optionally, write client ID into byte 0-3 of src/dst IP address in the packet.
+ *
+ * Client ID is a 4-byte array in host byte order.
+ */
+static void _overwrite(filter_ipsplit_t* self, core_object_t* obj, _client_t* client)
+{
+ mlassert_self();
+ lassert(obj, "invalid object");
+ lassert(client, "invalid client");
+
+ core_object_ip_t* ip;
+ core_object_ip6_t* ip6;
+
+ switch (self->overwrite) {
+ case IPSPLIT_OVERWRITE_NONE:
+ return;
+ case IPSPLIT_OVERWRITE_SRC:
+ if (obj->obj_type == CORE_OBJECT_IP) {
+ ip = (core_object_ip_t*)obj;
+ memcpy(&ip->src, client->id, sizeof(client->id));
+ } else if (obj->obj_type == CORE_OBJECT_IP6) {
+ ip6 = (core_object_ip6_t*)obj;
+ memcpy(&ip6->src, client->id, sizeof(client->id));
+ }
+ break;
+ case IPSPLIT_OVERWRITE_DST:
+ if (obj->obj_type == CORE_OBJECT_IP) {
+ ip = (core_object_ip_t*)obj;
+ memcpy(&ip->dst, client->id, sizeof(client->id));
+ } else if (obj->obj_type == CORE_OBJECT_IP6) {
+ ip6 = (core_object_ip6_t*)obj;
+ memcpy(&ip6->dst, client->id, sizeof(client->id));
+ }
+ break;
+ default:
+ lfatal("invalid overwrite mode");
+ }
+}
+
+static void _receive(filter_ipsplit_t* self, const core_object_t* obj)
+{
+ mlassert_self();
+
+ /* Find ip/ip6 object in chain. */
+ core_object_t* pkt = (core_object_t*)obj;
+ while (pkt != NULL) {
+ if (pkt->obj_type == CORE_OBJECT_IP || pkt->obj_type == CORE_OBJECT_IP6)
+ break;
+ pkt = (core_object_t*)pkt->obj_prev;
+ }
+ if (pkt == NULL) {
+ self->discarded++;
+ lwarning("packet discarded (missing ip/ip6 object)");
+ return;
+ }
+
+ /* Lookup IPv4/IPv6 address in trie (prefix-tree). Inserts new node if not found. */
+ trie_val_t* node = 0;
+ switch (pkt->obj_type) {
+ case CORE_OBJECT_IP: {
+ core_object_ip_t* ip = (core_object_ip_t*)pkt;
+ node = trie_get_ins(_self->trie, ip->src, sizeof(ip->src));
+ break;
+ }
+ case CORE_OBJECT_IP6: {
+ core_object_ip6_t* ip6 = (core_object_ip6_t*)pkt;
+ node = trie_get_ins(_self->trie, ip6->src, sizeof(ip6->src));
+ break;
+ }
+ default:
+ lfatal("unsupported object type");
+ }
+ lassert(node, "trie failure");
+
+ _client_t* client;
+ if (*node == NULL) { /* IP address not found in tree -> create new client. */
+ lfatal_oom(client = malloc(sizeof(_client_t)));
+ *node = (void*)client;
+ _assign_client_to_receiver(self, client);
+ }
+
+ client = (_client_t*)*node;
+ _overwrite(self, pkt, client);
+ client->recv->recv(client->recv->ctx, obj);
+}
+
+core_receiver_t filter_ipsplit_receiver(filter_ipsplit_t* self)
+{
+ mlassert_self();
+
+ if (!self->recv) {
+ lfatal("no receiver(s) set");
+ }
+
+ return (core_receiver_t)_receive;
+}
diff --git a/src/filter/ipsplit.h b/src/filter/ipsplit.h
new file mode 100644
index 0000000..16c932e
--- /dev/null
+++ b/src/filter/ipsplit.h
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2019-2020 CZ.NIC, z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "core/log.h"
+#include "core/receiver.h"
+
+#ifndef __dnsjit_filter_ipsplit_h
+#define __dnsjit_filter_ipsplit_h
+
+#include "filter/ipsplit.hh"
+
+#endif
diff --git a/src/filter/ipsplit.hh b/src/filter/ipsplit.hh
new file mode 100644
index 0000000..c362ac3
--- /dev/null
+++ b/src/filter/ipsplit.hh
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2019-2020, CZ.NIC, z.s.p.o.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+//lua:require("dnsjit.core.log")
+//lua:require("dnsjit.core.receiver_h")
+
+typedef struct filter_ipsplit_recv filter_ipsplit_recv_t;
+struct filter_ipsplit_recv {
+ filter_ipsplit_recv_t* next;
+
+ core_receiver_t recv;
+ void* ctx;
+
+ uint32_t n_clients; /* Total number of clients assigned to this receiver. */
+
+ uint32_t weight;
+};
+
+typedef struct filter_ipsplit {
+ core_log_t _log;
+
+ enum {
+ IPSPLIT_MODE_SEQUENTIAL = 0,
+ IPSPLIT_MODE_RANDOM = 1
+ } mode;
+ enum {
+ IPSPLIT_OVERWRITE_NONE = 0,
+ IPSPLIT_OVERWRITE_SRC = 1,
+ IPSPLIT_OVERWRITE_DST = 2
+ } overwrite;
+
+ uint64_t discarded;
+
+ filter_ipsplit_recv_t* recv;
+} filter_ipsplit_t;
+
+core_log_t* filter_ipsplit_log();
+
+filter_ipsplit_t* filter_ipsplit_new();
+void filter_ipsplit_free(filter_ipsplit_t* self);
+void filter_ipsplit_add(filter_ipsplit_t* self, core_receiver_t recv, void* ctx, uint32_t weight);
+void filter_ipsplit_srand(unsigned int seed);
+
+core_receiver_t filter_ipsplit_receiver(filter_ipsplit_t* self);
diff --git a/src/filter/ipsplit.lua b/src/filter/ipsplit.lua
new file mode 100644
index 0000000..cca0249
--- /dev/null
+++ b/src/filter/ipsplit.lua
@@ -0,0 +1,122 @@
+-- Copyright (c) 2019-2020 CZ.NIC, z.s.p.o.
+-- All rights reserved.
+--
+-- This file is part of dnsjit.
+--
+-- dnsjit is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License as published by
+-- the Free Software Foundation, either version 3 of the License, or
+-- (at your option) any later version.
+--
+-- dnsjit is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+
+-- dnsjit.filter.ipsplit
+-- Pass objects to receivers based on the source IP address
+-- local ipsplit = require("dnsjit.filter.ipsplit").new()
+-- ipsplit.receiver(...)
+-- ipsplit.receiver(...)
+-- ipsplit.receiver(...)
+-- input.receiver(ipsplit)
+--
+-- The filter passes objects to other receivers.
+-- Object chains without IPv4/IPv6 packet are discarded.
+-- Packets which have the same source IP address are considered to be sent
+-- from the same "client".
+-- When the first packet from a client is processed, the client is assigned
+-- to a receiver.
+-- All objects from this client will be passed to the assigned receiver.
+-- The filter can also write a receiver-specific client ID (starting from 1)
+-- to the source or destination IP in the packet.
+module(...,package.seeall)
+
+require("dnsjit.filter.ipsplit_h")
+local bit = require("bit")
+local object = require("dnsjit.core.objects")
+local ffi = require("ffi")
+local C = ffi.C
+
+local IpSplit = {}
+
+-- Create a new IpSplit filter.
+function IpSplit.new()
+ local self = {
+ obj = C.filter_ipsplit_new(),
+ }
+ ffi.gc(self.obj, C.filter_ipsplit_free)
+ return setmetatable(self, { __index = IpSplit })
+end
+
+-- Return the Log object to control logging of this instance or module.
+function IpSplit:log()
+ if self == nil then
+ return C.filter_ipsplit_log()
+ end
+ return self.obj._log
+end
+
+-- Return the C functions and context for receiving objects.
+function IpSplit:receive()
+ local recv = C.filter_ipsplit_receiver(self.obj)
+ return recv, self.obj
+end
+
+-- Set the receiver to pass objects to, this can be called multiple times to
+-- set additional receivers.
+-- The weight parameter can be used to adjust distribution of clients among
+-- receivers.
+-- Weight must be a positive integer (default is 1).
+function IpSplit:receiver(o, weight)
+ local recv, ctx = o:receive()
+ if weight == nil then
+ weight = 1
+ end
+ C.filter_ipsplit_add(self.obj, recv, ctx, weight)
+end
+
+-- Number of input packets discarded due to various reasons.
+-- To investigate causes, run with increased logging level.
+function IpSplit:discarded()
+ return tonumber(self.obj.discarded)
+end
+
+-- Set the client assignment mode to sequential.
+-- Assigns `weight` clients to a receiver before continuing with the next
+-- receiver (default mode).
+function IpSplit:sequential()
+ self.obj.mode = "IPSPLIT_MODE_SEQUENTIAL"
+end
+
+-- Set the client assignment mode to random.
+-- Each client is randomly assigned to a receiver (weight affects the
+-- probability).
+-- The client assignment is stable (and portable) for given seed.
+function IpSplit:random(seed)
+ self.obj.mode = "IPSPLIT_MODE_RANDOM"
+ if seed then
+ C.filter_ipsplit_srand(seed)
+ end
+end
+
+-- Don't overwrite source or destination IP (default).
+function IpSplit:overwrite_none()
+ self.obj.overwrite = "IPSPLIT_OVERWRITE_NONE"
+end
+
+-- Write receiver-specific client ID to bytes 0-3 of source IP (host byte order).
+function IpSplit:overwrite_src()
+ self.obj.overwrite = "IPSPLIT_OVERWRITE_SRC"
+end
+
+-- Write receiver-specific client ID to bytes 0-3 of destination IP (host byte
+-- order).
+function IpSplit:overwrite_dst()
+ self.obj.overwrite = "IPSPLIT_OVERWRITE_DST"
+end
+
+return IpSplit
diff --git a/src/filter/layer.c b/src/filter/layer.c
new file mode 100644
index 0000000..b360ca3
--- /dev/null
+++ b/src/filter/layer.c
@@ -0,0 +1,689 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+
+#include "filter/layer.h"
+#include "core/assert.h"
+
+#include <string.h>
+#include <pcap/pcap.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#ifdef HAVE_NET_ETHERNET_H
+#include <net/ethernet.h>
+#endif
+#ifdef HAVE_NET_ETHERTYPES_H
+#include <net/ethertypes.h>
+#endif
+#ifdef HAVE_ENDIAN_H
+#include <endian.h>
+#else
+#ifdef HAVE_SYS_ENDIAN_H
+#include <sys/endian.h>
+#else
+#ifdef HAVE_MACHINE_ENDIAN_H
+#include <machine/endian.h>
+#endif
+#endif
+#endif
+#ifdef HAVE_BYTESWAP_H
+#include <byteswap.h>
+#endif
+#ifndef bswap_16
+#ifndef bswap16
+#define bswap_16(x) swap16(x)
+#define bswap_32(x) swap32(x)
+#define bswap_64(x) swap64(x)
+#else
+#define bswap_16(x) bswap16(x)
+#define bswap_32(x) bswap32(x)
+#define bswap_64(x) bswap64(x)
+#endif
+#endif
+
+#define N_IEEE802 3
+
+static core_log_t _log = LOG_T_INIT("filter.layer");
+static filter_layer_t _defaults = {
+ LOG_T_INIT_OBJ("filter.layer"),
+ 0, 0,
+ 0, 0,
+ 0,
+ CORE_OBJECT_NULL_INIT(0),
+ CORE_OBJECT_ETHER_INIT(0),
+ CORE_OBJECT_LOOP_INIT(0),
+ CORE_OBJECT_LINUXSLL_INIT(0),
+ 0, { CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0) },
+ CORE_OBJECT_IP_INIT(0),
+ CORE_OBJECT_IP6_INIT(0),
+ CORE_OBJECT_GRE_INIT(0),
+ CORE_OBJECT_ICMP_INIT(0),
+ CORE_OBJECT_ICMP6_INIT(0),
+ CORE_OBJECT_UDP_INIT(0),
+ CORE_OBJECT_TCP_INIT(0),
+ CORE_OBJECT_PAYLOAD_INIT(0)
+};
+
+core_log_t* filter_layer_log()
+{
+ return &_log;
+}
+
+void filter_layer_init(filter_layer_t* self)
+{
+ mlassert_self();
+
+ *self = _defaults;
+}
+
+void filter_layer_destroy(filter_layer_t* self)
+{
+ mlassert_self();
+}
+
+#define need4x2(v1, v2, p, l) \
+ if (l < 1) { \
+ break; \
+ } \
+ v1 = (*p) >> 4; \
+ v2 = (*p) & 0xf; \
+ p += 1; \
+ l -= 1
+
+#define need8(v, p, l) \
+ if (l < 1) { \
+ break; \
+ } \
+ v = *p; \
+ p += 1; \
+ l -= 1
+
+static inline uint16_t _need16(const void* ptr)
+{
+ uint16_t v;
+ memcpy(&v, ptr, sizeof(v));
+ return be16toh(v);
+}
+
+#define need16(v, p, l) \
+ if (l < 2) { \
+ break; \
+ } \
+ v = _need16(p); \
+ p += 2; \
+ l -= 2
+
+#define needr16(v, p, l) \
+ if (l < 2) { \
+ break; \
+ } \
+ v = bswap_16(_need16(p)); \
+ p += 2; \
+ l -= 2
+
+static inline uint32_t _need32(const void* ptr)
+{
+ uint32_t v;
+ memcpy(&v, ptr, sizeof(v));
+ return be32toh(v);
+}
+
+#define need32(v, p, l) \
+ if (l < 4) { \
+ break; \
+ } \
+ v = _need32(p); \
+ p += 4; \
+ l -= 4
+
+#define needr32(v, p, l) \
+ if (l < 4) { \
+ break; \
+ } \
+ v = bswap_32(_need32(p)); \
+ p += 4; \
+ l -= 4
+
+#define needxb(b, x, p, l) \
+ if (l < x) { \
+ break; \
+ } \
+ memcpy(b, p, x); \
+ p += x; \
+ l -= x
+
+#define advancexb(x, p, l) \
+ if (l < x) { \
+ break; \
+ } \
+ p += x; \
+ l -= x
+
+//static int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned char* pkt, size_t len);
+
+static inline int _proto(filter_layer_t* self, uint8_t proto, const core_object_t* obj, const unsigned char* pkt, size_t len)
+{
+ switch (proto) {
+ case IPPROTO_GRE: {
+ core_object_gre_t* gre = &self->gre;
+ gre->obj_prev = obj;
+
+ need16(gre->gre_flags, pkt, len);
+ need16(gre->ether_type, pkt, len);
+
+ /* TODO: Incomplete, check RFC 1701 */
+
+ self->produced = (core_object_t*)gre;
+
+ // if (gre.gre_flags & 0x1) {
+ // need16(gre.checksum, pkt, len);
+ // }
+ // if (gre.gre_flags & 0x4) {
+ // need16(gre.key, pkt, len);
+ // }
+ // if (gre.gre_flags & 0x8) {
+ // need16(gre.sequence, pkt, len);
+ // }
+ //
+ // switch (gre.ether_type) {
+ // case ETHERTYPE_IP:
+ // case ETHERTYPE_IPV6:
+ // return _ip(self, (core_object_t*)gre, pkt, len);
+ //
+ // default:
+ // break;
+ // }
+ break;
+ }
+ case IPPROTO_ICMP: {
+ core_object_icmp_t* icmp = &self->icmp;
+ icmp->obj_prev = obj;
+
+ need8(icmp->type, pkt, len);
+ need8(icmp->code, pkt, len);
+ need16(icmp->cksum, pkt, len);
+
+ self->produced = (core_object_t*)icmp;
+ break;
+ }
+ case IPPROTO_ICMPV6: {
+ core_object_icmp6_t* icmp6 = &self->icmp6;
+ icmp6->obj_prev = obj;
+
+ need8(icmp6->type, pkt, len);
+ need8(icmp6->code, pkt, len);
+ need16(icmp6->cksum, pkt, len);
+
+ self->produced = (core_object_t*)icmp6;
+ break;
+ }
+ case IPPROTO_UDP: {
+ core_object_udp_t* udp = &self->udp;
+ core_object_payload_t* payload = &self->payload;
+ udp->obj_prev = obj;
+
+ need16(udp->sport, pkt, len);
+ need16(udp->dport, pkt, len);
+ need16(udp->ulen, pkt, len);
+ need16(udp->sum, pkt, len);
+
+ payload->obj_prev = (core_object_t*)udp;
+
+ /* Check for padding */
+ if (len > udp->ulen) {
+ payload->padding = len - udp->ulen;
+ payload->len = len - payload->padding;
+ } else {
+ payload->padding = 0;
+ payload->len = len;
+ }
+ payload->payload = (uint8_t*)pkt;
+
+ self->produced = (core_object_t*)payload;
+ break;
+ }
+ case IPPROTO_TCP: {
+ core_object_tcp_t* tcp = &self->tcp;
+ core_object_payload_t* payload = &self->payload;
+ tcp->obj_prev = obj;
+
+ need16(tcp->sport, pkt, len);
+ need16(tcp->dport, pkt, len);
+ need32(tcp->seq, pkt, len);
+ need32(tcp->ack, pkt, len);
+ need4x2(tcp->off, tcp->x2, pkt, len);
+ need8(tcp->flags, pkt, len);
+ need16(tcp->win, pkt, len);
+ need16(tcp->sum, pkt, len);
+ need16(tcp->urp, pkt, len);
+ if (tcp->off > 5) {
+ tcp->opts_len = (tcp->off - 5) * 4;
+ needxb(tcp->opts, tcp->opts_len, pkt, len);
+ } else {
+ tcp->opts_len = 0;
+ }
+
+ payload->obj_prev = (core_object_t*)tcp;
+
+ /* Check for padding */
+ if (obj->obj_type == CORE_OBJECT_IP && len > (((const core_object_ip_t*)obj)->len - (((const core_object_ip_t*)obj)->hl * 4))) {
+ payload->padding = len - (((const core_object_ip_t*)obj)->len - (((const core_object_ip_t*)obj)->hl * 4));
+ payload->len = len - payload->padding;
+ } else if (obj->obj_type == CORE_OBJECT_IP6 && len > ((const core_object_ip6_t*)obj)->plen) {
+ payload->padding = len - ((const core_object_ip6_t*)obj)->plen;
+ payload->len = len - payload->padding;
+ } else {
+ payload->padding = 0;
+ payload->len = len;
+ }
+
+ payload->payload = (uint8_t*)pkt;
+
+ self->produced = (core_object_t*)payload;
+ break;
+ }
+ default:
+ self->produced = obj;
+ break;
+ }
+
+ return 0;
+}
+
+static inline int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned char* pkt, size_t len)
+{
+ if (len) {
+ switch ((*pkt >> 4)) {
+ case 4: {
+ core_object_ip_t* ip = &self->ip;
+
+ ip->obj_prev = obj;
+
+ need4x2(ip->v, ip->hl, pkt, len);
+ need8(ip->tos, pkt, len);
+ need16(ip->len, pkt, len);
+ need16(ip->id, pkt, len);
+ need16(ip->off, pkt, len);
+ need8(ip->ttl, pkt, len);
+ need8(ip->p, pkt, len);
+ need16(ip->sum, pkt, len);
+ needxb(&ip->src, 4, pkt, len);
+ needxb(&ip->dst, 4, pkt, len);
+
+ /* TODO: IPv4 options */
+
+ if (ip->hl < 5)
+ break;
+ if (ip->hl > 5) {
+ advancexb((ip->hl - 5) * 4, pkt, len);
+ }
+
+ /* Check reported length for missing payload */
+ if (ip->len < (ip->hl * 4)) {
+ break;
+ }
+ if (len < (ip->len - (ip->hl * 4))) {
+ break;
+ }
+
+ if (ip->off & 0x2000 || ip->off & 0x1fff) {
+ core_object_payload_t* payload = &self->payload;
+
+ payload->obj_prev = (core_object_t*)ip;
+
+ /* Check for padding */
+ if (len > (ip->len - (ip->hl * 4))) {
+ payload->padding = len - (ip->len - (ip->hl * 4));
+ payload->len = len - payload->padding;
+ } else {
+ payload->padding = 0;
+ payload->len = len;
+ }
+ payload->payload = (uint8_t*)pkt;
+
+ self->produced = (core_object_t*)payload;
+ return 0;
+ }
+
+ return _proto(self, ip->p, (core_object_t*)ip, pkt, len);
+ }
+ case 6: {
+ core_object_ip6_t* ip6 = &self->ip6;
+ struct ip6_ext ext;
+
+ ip6->obj_prev = obj;
+ ip6->is_frag = ip6->have_rtdst = 0;
+
+ need32(ip6->flow, pkt, len);
+ need16(ip6->plen, pkt, len);
+ need8(ip6->nxt, pkt, len);
+ need8(ip6->hlim, pkt, len);
+ needxb(&ip6->src, 16, pkt, len);
+ needxb(&ip6->dst, 16, pkt, len);
+
+ /* Check reported length for missing payload */
+ if (len < ip6->plen) {
+ break;
+ }
+
+ ext.ip6e_nxt = ip6->nxt;
+ ext.ip6e_len = 0;
+ while (ext.ip6e_nxt != IPPROTO_NONE
+ && ext.ip6e_nxt != IPPROTO_GRE
+ && ext.ip6e_nxt != IPPROTO_ICMPV6
+ && ext.ip6e_nxt != IPPROTO_UDP
+ && ext.ip6e_nxt != IPPROTO_TCP) {
+
+ /*
+ * Advance to the start of next header, this may not be needed
+ * if it's the first header or if the header is supported.
+ */
+ if (ext.ip6e_len) {
+ advancexb(ext.ip6e_len * 8, pkt, len);
+ }
+
+ /* TODO: Store IPv6 headers? */
+
+ /* Handle supported headers */
+ if (ext.ip6e_nxt == IPPROTO_FRAGMENT) {
+ if (ip6->is_frag) {
+ return 1;
+ }
+ need8(ext.ip6e_nxt, pkt, len);
+ need8(ext.ip6e_len, pkt, len);
+ if (ext.ip6e_len) {
+ return 1;
+ }
+ need16(ip6->frag_offlg, pkt, len);
+ need32(ip6->frag_ident, pkt, len);
+ ip6->is_frag = 1;
+ } else if (ext.ip6e_nxt == IPPROTO_ROUTING) {
+ struct ip6_rthdr rthdr;
+
+ if (ip6->have_rtdst) {
+ return 1;
+ }
+
+ need8(ext.ip6e_nxt, pkt, len);
+ need8(ext.ip6e_len, pkt, len);
+ need8(rthdr.ip6r_type, pkt, len);
+ need8(rthdr.ip6r_segleft, pkt, len);
+ advancexb(4, pkt, len);
+
+ if (!rthdr.ip6r_type && rthdr.ip6r_segleft) {
+ if (ext.ip6e_len & 1) {
+ return 1;
+ }
+ if (ext.ip6e_len > 2) {
+ advancexb(ext.ip6e_len - 2, pkt, len);
+ }
+ needxb(ip6->rtdst, 16, pkt, len);
+ ip6->have_rtdst = 1;
+ }
+ } else {
+ need8(ext.ip6e_nxt, pkt, len);
+ need8(ext.ip6e_len, pkt, len);
+ advancexb(6, pkt, len);
+ }
+ }
+
+ if (ext.ip6e_nxt == IPPROTO_NONE || ip6->is_frag) {
+ core_object_payload_t* payload = &self->payload;
+
+ payload->obj_prev = (core_object_t*)ip6;
+
+ /* Check for padding */
+ if (len > ip6->plen) {
+ payload->padding = len - ip6->plen;
+ payload->len = len - payload->padding;
+ } else {
+ payload->padding = 0;
+ payload->len = len;
+ }
+ payload->payload = (uint8_t*)pkt;
+
+ self->produced = (core_object_t*)payload;
+ return 0;
+ }
+
+ return _proto(self, ext.ip6e_nxt, (core_object_t*)ip6, pkt, len);
+ }
+ default:
+ break;
+ }
+ }
+
+ self->produced = obj;
+
+ return 0;
+}
+
+static inline int _ieee802(filter_layer_t* self, uint16_t tpid, const core_object_t* obj, const unsigned char* pkt, size_t len)
+{
+ core_object_ieee802_t* ieee802 = &self->ieee802[self->n_ieee802];
+ uint16_t tci;
+
+ ieee802->obj_prev = obj;
+
+ for (;;) {
+ ieee802->tpid = tpid;
+ need16(tci, pkt, len);
+ ieee802->pcp = (tci & 0xe000) >> 13;
+ ieee802->dei = (tci & 0x1000) >> 12;
+ ieee802->vid = tci & 0x0fff;
+ need16(ieee802->ether_type, pkt, len);
+
+ switch (ieee802->ether_type) {
+ case 0x88a8: /* 802.1ad */
+ case 0x9100: /* 802.1 QinQ non-standard */
+ self->n_ieee802++;
+ if (self->n_ieee802 < N_IEEE802) {
+ obj = (const core_object_t*)ieee802;
+ ieee802 = &self->ieee802[self->n_ieee802];
+ ieee802->obj_prev = obj;
+ tpid = ieee802->ether_type;
+ continue;
+ }
+ return 1;
+
+ case ETHERTYPE_IP:
+ case ETHERTYPE_IPV6:
+ return _ip(self, (core_object_t*)ieee802, pkt, len);
+
+ default:
+ break;
+ }
+ break;
+ }
+
+ self->produced = obj;
+
+ return 0;
+}
+
+static inline int _link(filter_layer_t* self, const core_object_pcap_t* pcap)
+{
+ const unsigned char* pkt;
+ size_t len;
+
+ self->n_ieee802 = 0;
+
+ pkt = pcap->bytes;
+ len = pcap->caplen;
+
+ switch (pcap->linktype) {
+ case DLT_NULL: {
+ core_object_null_t* null = &self->null;
+ null->obj_prev = (core_object_t*)pcap;
+
+ if (pcap->is_swapped) {
+ needr32(null->family, pkt, len);
+ } else {
+ need32(null->family, pkt, len);
+ }
+
+ switch (null->family) {
+ case 2:
+ case 24:
+ case 28:
+ case 30:
+ return _ip(self, (core_object_t*)null, pkt, len);
+
+ default:
+ break;
+ }
+ break;
+ }
+ case DLT_EN10MB: {
+ core_object_ether_t* ether = &self->ether;
+ ether->obj_prev = (core_object_t*)pcap;
+
+ needxb(ether->dhost, 6, pkt, len);
+ needxb(ether->shost, 6, pkt, len);
+ need16(ether->type, pkt, len);
+
+ switch (ether->type) {
+ case 0x8100: /* 802.1q */
+ case 0x88a8: /* 802.1ad */
+ case 0x9100: /* 802.1 QinQ non-standard */
+ return _ieee802(self, ether->type, (core_object_t*)ether, pkt, len);
+
+ case ETHERTYPE_IP:
+ case ETHERTYPE_IPV6:
+ return _ip(self, (core_object_t*)ether, pkt, len);
+
+ default:
+ break;
+ }
+ break;
+ }
+ case DLT_LOOP: {
+ core_object_loop_t* loop = &self->loop;
+ loop->obj_prev = (core_object_t*)pcap;
+
+ need32(loop->family, pkt, len);
+
+ switch (loop->family) {
+ case 2:
+ case 24:
+ case 28:
+ case 30:
+ return _ip(self, (core_object_t*)loop, pkt, len);
+
+ default:
+ break;
+ }
+ break;
+ }
+ case DLT_RAW:
+#ifdef DLT_IPV4
+ case DLT_IPV4:
+#endif
+#ifdef DLT_IPV6
+ case DLT_IPV6:
+#endif
+ return _ip(self, (core_object_t*)pcap, pkt, len);
+ case DLT_LINUX_SLL: {
+ core_object_linuxsll_t* linuxsll = &self->linuxsll;
+ linuxsll->obj_prev = (core_object_t*)pcap;
+
+ need16(linuxsll->packet_type, pkt, len);
+ need16(linuxsll->arp_hardware, pkt, len);
+ need16(linuxsll->link_layer_address_length, pkt, len);
+ needxb(linuxsll->link_layer_address, 8, pkt, len);
+ need16(linuxsll->ether_type, pkt, len);
+
+ switch (linuxsll->ether_type) {
+ case 0x8100: /* 802.1q */
+ case 0x88a8: /* 802.1ad */
+ case 0x9100: /* 802.1 QinQ non-standard */
+ return _ieee802(self, linuxsll->ether_type, (core_object_t*)linuxsll, pkt, len);
+
+ case ETHERTYPE_IP:
+ case ETHERTYPE_IPV6:
+ return _ip(self, (core_object_t*)linuxsll, pkt, len);
+
+ default:
+ break;
+ }
+ break;
+ }
+ /* TODO: These might be interesting to implement
+ case DLT_IPNET:
+ case DLT_PKTAP:
+ */
+ default:
+ break;
+ }
+
+ self->produced = (core_object_t*)pcap;
+
+ return 0;
+}
+
+static void _receive(filter_layer_t* self, const core_object_t* obj)
+{
+ mlassert_self();
+ lassert(obj, "obj is nil");
+
+ if (!self->recv) {
+ lfatal("no receiver set");
+ }
+ if (obj->obj_type != CORE_OBJECT_PCAP) {
+ lfatal("obj is not CORE_OBJECT_PCAP");
+ }
+
+ if (!_link(self, (core_object_pcap_t*)obj)) {
+ self->recv(self->ctx, self->produced);
+ }
+}
+
+core_receiver_t filter_layer_receiver()
+{
+ return (core_receiver_t)_receive;
+}
+
+static const core_object_t* _produce(filter_layer_t* self)
+{
+ const core_object_t* obj;
+ mlassert_self();
+
+ obj = self->prod(self->prod_ctx);
+ if (!obj || obj->obj_type != CORE_OBJECT_PCAP || _link(self, (core_object_pcap_t*)obj)) {
+ return 0;
+ }
+
+ return self->produced;
+}
+
+core_producer_t filter_layer_producer(filter_layer_t* self)
+{
+ mlassert_self();
+
+ if (!self->prod) {
+ lfatal("no producer set");
+ }
+
+ return (core_producer_t)_produce;
+}
diff --git a/src/filter/layer.h b/src/filter/layer.h
new file mode 100644
index 0000000..da8671c
--- /dev/null
+++ b/src/filter/layer.h
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "core/log.h"
+#include "core/receiver.h"
+#include "core/producer.h"
+#include "core/object/pcap.h"
+#include "core/object/null.h"
+#include "core/object/ether.h"
+#include "core/object/loop.h"
+#include "core/object/linuxsll.h"
+#include "core/object/ieee802.h"
+#include "core/object/ip.h"
+#include "core/object/ip6.h"
+#include "core/object/gre.h"
+#include "core/object/icmp.h"
+#include "core/object/icmp6.h"
+#include "core/object/udp.h"
+#include "core/object/tcp.h"
+#include "core/object/payload.h"
+
+#ifndef __dnsjit_filter_layer_h
+#define __dnsjit_filter_layer_h
+
+#include "filter/layer.hh"
+
+#endif
diff --git a/src/filter/layer.hh b/src/filter/layer.hh
new file mode 100644
index 0000000..f8f260b
--- /dev/null
+++ b/src/filter/layer.hh
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+//lua:require("dnsjit.core.log")
+//lua:require("dnsjit.core.receiver_h")
+//lua:require("dnsjit.core.producer_h")
+//lua:require("dnsjit.core.object.pcap_h")
+//lua:require("dnsjit.core.object.null_h")
+//lua:require("dnsjit.core.object.ether_h")
+//lua:require("dnsjit.core.object.loop_h")
+//lua:require("dnsjit.core.object.linuxsll_h")
+//lua:require("dnsjit.core.object.ieee802_h")
+//lua:require("dnsjit.core.object.ip_h")
+//lua:require("dnsjit.core.object.ip6_h")
+//lua:require("dnsjit.core.object.gre_h")
+//lua:require("dnsjit.core.object.icmp_h")
+//lua:require("dnsjit.core.object.icmp6_h")
+//lua:require("dnsjit.core.object.udp_h")
+//lua:require("dnsjit.core.object.tcp_h")
+//lua:require("dnsjit.core.object.payload_h")
+
+typedef struct filter_layer {
+ core_log_t _log;
+ core_receiver_t recv;
+ void* ctx;
+
+ core_producer_t prod;
+ void* prod_ctx;
+
+ const core_object_t* produced;
+ core_object_null_t null;
+ core_object_ether_t ether;
+ core_object_loop_t loop;
+ core_object_linuxsll_t linuxsll;
+ size_t n_ieee802;
+ core_object_ieee802_t ieee802[3]; // N_IEEE802
+ core_object_ip_t ip;
+ core_object_ip6_t ip6;
+ core_object_gre_t gre;
+ core_object_icmp_t icmp;
+ core_object_icmp6_t icmp6;
+ core_object_udp_t udp;
+ core_object_tcp_t tcp;
+ core_object_payload_t payload;
+} filter_layer_t;
+
+core_log_t* filter_layer_log();
+
+void filter_layer_init(filter_layer_t* self);
+void filter_layer_destroy(filter_layer_t* self);
+
+core_receiver_t filter_layer_receiver();
+core_producer_t filter_layer_producer(filter_layer_t* self);
diff --git a/src/filter/layer.lua b/src/filter/layer.lua
new file mode 100644
index 0000000..74c56ba
--- /dev/null
+++ b/src/filter/layer.lua
@@ -0,0 +1,93 @@
+-- Copyright (c) 2018-2021, OARC, Inc.
+-- All rights reserved.
+--
+-- This file is part of dnsjit.
+--
+-- dnsjit is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License as published by
+-- the Free Software Foundation, either version 3 of the License, or
+-- (at your option) any later version.
+--
+-- dnsjit is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+
+-- dnsjit.filter.layer
+-- Parse the ether/IP stack
+-- local filter = require("dnsjit.filter.layer").new()
+--
+-- Parse the ether/IP stack of the received objects and send the top most
+-- object to the receivers.
+-- Objects are chained which each layer in the stack with the top most first.
+-- Currently supports input
+-- .IR dnsjit.core.object.pcap .
+module(...,package.seeall)
+
+require("dnsjit.filter.layer_h")
+local ffi = require("ffi")
+local C = ffi.C
+
+local t_name = "filter_layer_t"
+local filter_layer_t = ffi.typeof(t_name)
+local Layer = {}
+
+-- Create a new Layer filter.
+function Layer.new()
+ local self = {
+ _receiver = nil,
+ obj = filter_layer_t(),
+ }
+ C.filter_layer_init(self.obj)
+ ffi.gc(self.obj, C.filter_layer_destroy)
+ return setmetatable(self, { __index = Layer })
+end
+
+-- Return the Log object to control logging of this instance or module.
+function Layer:log()
+ if self == nil then
+ return C.filter_layer_log()
+ end
+ return self.obj._log
+end
+
+-- Return the C functions and context for receiving objects.
+function Layer:receive()
+ return C.filter_layer_receiver(), self.obj
+end
+
+-- Set the receiver to pass objects to.
+function Layer:receiver(o)
+ self.obj.recv, self.obj.ctx = o:receive()
+ self._receiver = o
+end
+
+-- Return the C functions and context for producing objects.
+function Layer:produce()
+ return C.filter_layer_producer(self.obj), self.obj
+end
+
+-- Set the producer to get objects from.
+function Layer:producer(o)
+ self.obj.prod, self.obj.prod_ctx = o:produce()
+ self._producer = o
+end
+
+-- dnsjit.core.object.pcap (3),
+-- dnsjit.core.object.ether (3),
+-- dnsjit.core.object.null (3),
+-- dnsjit.core.object.loop (3),
+-- dnsjit.core.object.linuxsll (3),
+-- dnsjit.core.object.ieee802 (3),
+-- dnsjit.core.object.gre (3),
+-- dnsjit.core.object.ip (3),
+-- dnsjit.core.object.ip6 (3),
+-- dnsjit.core.object.icmp (3),
+-- dnsjit.core.object.icmp6 (3),
+-- dnsjit.core.object.udp (3),
+-- dnsjit.core.object.tcp (3),
+-- dnsjit.core.object.payload (3)
+return Layer
diff --git a/src/filter/split.c b/src/filter/split.c
new file mode 100644
index 0000000..dccf38a
--- /dev/null
+++ b/src/filter/split.c
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+
+#include "filter/split.h"
+#include "core/assert.h"
+
+static core_log_t _log = LOG_T_INIT("filter.split");
+static filter_split_t _defaults = {
+ LOG_T_INIT_OBJ("filter.split"),
+ FILTER_SPLIT_MODE_ROUNDROBIN, 0, 0, 0
+};
+
+core_log_t* filter_split_log()
+{
+ return &_log;
+}
+
+void filter_split_init(filter_split_t* self)
+{
+ mlassert_self();
+
+ *self = _defaults;
+}
+
+void filter_split_destroy(filter_split_t* self)
+{
+ filter_split_recv_t* r;
+ mlassert_self();
+
+ if (self->recv_last)
+ self->recv_last->next = 0;
+ while ((r = self->recv_first)) {
+ self->recv_first = r->next;
+ free(r);
+ }
+}
+
+void filter_split_add(filter_split_t* self, core_receiver_t recv, void* ctx)
+{
+ filter_split_recv_t* r;
+ mlassert_self();
+ lassert(recv, "recv is nil");
+
+ lfatal_oom(r = malloc(sizeof(filter_split_recv_t)));
+ r->recv = recv;
+ r->ctx = ctx;
+
+ if (self->recv_last) {
+ self->recv_last->next = r;
+ r->next = self->recv_first;
+ self->recv_first = r;
+ } else {
+ self->recv_first = self->recv = self->recv_last = r;
+ r->next = r;
+ }
+}
+
+static void _roundrobin(filter_split_t* self, const core_object_t* obj)
+{
+ mlassert_self();
+
+ self->recv->recv(self->recv->ctx, obj);
+ self->recv = self->recv->next;
+}
+
+static void _sendall(filter_split_t* self, const core_object_t* obj)
+{
+ filter_split_recv_t* r;
+ mlassert_self();
+
+ for (r = self->recv_first; r; r = r->next) {
+ r->recv(r->ctx, obj);
+ if (r == self->recv_last)
+ break;
+ }
+}
+
+core_receiver_t filter_split_receiver(filter_split_t* self)
+{
+ mlassert_self();
+
+ if (!self->recv) {
+ lfatal("no receiver(s) set");
+ }
+
+ switch (self->mode) {
+ case FILTER_SPLIT_MODE_ROUNDROBIN:
+ return (core_receiver_t)_roundrobin;
+ case FILTER_SPLIT_MODE_SENDALL:
+ return (core_receiver_t)_sendall;
+ default:
+ lfatal("invalid split mode");
+ }
+ return 0;
+}
diff --git a/src/filter/split.h b/src/filter/split.h
new file mode 100644
index 0000000..921b649
--- /dev/null
+++ b/src/filter/split.h
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "core/log.h"
+#include "core/receiver.h"
+
+#ifndef __dnsjit_filter_split_h
+#define __dnsjit_filter_split_h
+
+#include "filter/split.hh"
+
+#endif
diff --git a/src/filter/split.hh b/src/filter/split.hh
new file mode 100644
index 0000000..2aba2ae
--- /dev/null
+++ b/src/filter/split.hh
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+//lua:require("dnsjit.core.log")
+//lua:require("dnsjit.core.receiver_h")
+
+typedef enum filter_split_mode {
+ FILTER_SPLIT_MODE_ROUNDROBIN,
+ FILTER_SPLIT_MODE_SENDALL
+} filter_split_mode_t;
+
+typedef struct filter_split_recv filter_split_recv_t;
+struct filter_split_recv {
+ filter_split_recv_t* next;
+ core_receiver_t recv;
+ void* ctx;
+};
+
+typedef struct filter_split {
+ core_log_t _log;
+ filter_split_mode_t mode;
+ filter_split_recv_t* recv_first;
+ filter_split_recv_t* recv;
+ filter_split_recv_t* recv_last;
+} filter_split_t;
+
+core_log_t* filter_split_log();
+
+void filter_split_init(filter_split_t* self);
+void filter_split_destroy(filter_split_t* self);
+void filter_split_add(filter_split_t* self, core_receiver_t recv, void* ctx);
+
+core_receiver_t filter_split_receiver(filter_split_t* self);
diff --git a/src/filter/split.lua b/src/filter/split.lua
new file mode 100644
index 0000000..d9aec4f
--- /dev/null
+++ b/src/filter/split.lua
@@ -0,0 +1,80 @@
+-- Copyright (c) 2018-2021, OARC, Inc.
+-- All rights reserved.
+--
+-- This file is part of dnsjit.
+--
+-- dnsjit is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License as published by
+-- the Free Software Foundation, either version 3 of the License, or
+-- (at your option) any later version.
+--
+-- dnsjit is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+
+-- dnsjit.filter.split
+-- Passthrough to other receivers in various ways
+-- local filter = require("dnsjit.filter.split").new()
+-- filter.receiver(...)
+-- filter.receiver(...)
+-- filter.receiver(...)
+-- input.receiver(filter)
+--
+-- Filter to pass objects to others in various ways.
+module(...,package.seeall)
+
+require("dnsjit.filter.split_h")
+local ffi = require("ffi")
+local C = ffi.C
+
+local t_name = "filter_split_t"
+local filter_split_t = ffi.typeof(t_name)
+local Split = {}
+
+-- Create a new Split filter.
+function Split.new()
+ local self = {
+ receivers = {},
+ obj = filter_split_t(),
+ }
+ C.filter_split_init(self.obj)
+ ffi.gc(self.obj, C.filter_split_destroy)
+ return setmetatable(self, { __index = Split })
+end
+
+-- Return the Log object to control logging of this instance or module.
+function Split:log()
+ if self == nil then
+ return C.filter_split_log()
+ end
+ return self.obj._log
+end
+
+-- Set the passthrough mode to round robin (default mode).
+function Split:roundrobin()
+ self.obj.mode = "FILTER_SPLIT_MODE_ROUNDROBIN"
+end
+
+-- Set the passthrough mode to send to all receivers.
+function Split:sendall()
+ self.obj.mode = "FILTER_SPLIT_MODE_SENDALL"
+end
+
+-- Return the C functions and context for receiving objects.
+function Split:receive()
+ return C.filter_split_receiver(self.obj), self.obj
+end
+
+-- Set the receiver to pass objects to, this can be called multiple times to
+-- set addtional receivers.
+function Split:receiver(o)
+ local recv, ctx = o:receive()
+ C.filter_split_add(self.obj, recv, ctx)
+ table.insert(self.receivers, o)
+end
+
+return Split
diff --git a/src/filter/timing.c b/src/filter/timing.c
new file mode 100644
index 0000000..bf4f865
--- /dev/null
+++ b/src/filter/timing.c
@@ -0,0 +1,557 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+
+#include "filter/timing.h"
+#include "core/assert.h"
+#include "core/timespec.h"
+#include "core/object/pcap.h"
+
+#include <time.h>
+#include <sys/time.h>
+
+#define N1e9 1000000000
+
+typedef struct _filter_timing {
+ filter_timing_t pub;
+
+ struct timespec diff;
+ core_timespec_t last_pkthdr_ts;
+ struct timespec last_ts;
+ struct timespec first_ts;
+ void (*timing_callback)(filter_timing_t*, const core_object_pcap_t*);
+ struct timespec mod_ts;
+ size_t counter;
+} _filter_timing_t;
+
+static core_log_t _log = LOG_T_INIT("filter.timing");
+static filter_timing_t _defaults = {
+ LOG_T_INIT_OBJ("filter.timing"),
+ 0, 0,
+ TIMING_MODE_KEEP, 0, 0, 0, 0, 0.0, 0,
+ 0, 0
+};
+
+#define _self ((_filter_timing_t*)self)
+
+core_log_t* filter_timing_log()
+{
+ return &_log;
+}
+
+static void _keep(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+#if HAVE_CLOCK_NANOSLEEP
+ struct timespec to = {
+ _self->diff.tv_sec + pkt->ts.sec,
+ _self->diff.tv_nsec + pkt->ts.nsec
+ };
+ int ret = EINTR;
+
+ if (to.tv_nsec >= N1e9) {
+ to.tv_sec += 1;
+ to.tv_nsec -= N1e9;
+ } else if (to.tv_nsec < 0) {
+ to.tv_sec -= 1;
+ to.tv_nsec += N1e9;
+ }
+
+ while (ret) {
+ ldebug("keep mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec);
+ ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret);
+ }
+ }
+#elif HAVE_NANOSLEEP
+ struct timespec diff = {
+ pkt->ts.sec - _self->last_pkthdr_ts.sec,
+ pkt->ts.nsec - _self->last_pkthdr_ts.nsec
+ };
+ int ret = EINTR;
+
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += 1;
+ diff.tv_nsec -= N1e9;
+ } else if (diff.tv_nsec < 0) {
+ diff.tv_sec -= 1;
+ diff.tv_nsec += N1e9;
+ }
+
+ if (diff.tv_sec > -1 && diff.tv_nsec > -1) {
+ while (ret) {
+ ldebug("keep mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec);
+ if ((ret = nanosleep(&diff, &diff))) {
+ ret = errno;
+ if (ret != EINTR) {
+ lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret);
+ }
+ }
+ }
+ }
+
+ _self->last_pkthdr_ts = pkt->ts;
+#endif
+}
+
+static void _increase(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+ struct timespec diff = {
+ pkt->ts.sec - _self->last_pkthdr_ts.sec,
+ pkt->ts.nsec - _self->last_pkthdr_ts.nsec
+ };
+ int ret = EINTR;
+
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += 1;
+ diff.tv_nsec -= N1e9;
+ } else if (diff.tv_nsec < 0) {
+ diff.tv_sec -= 1;
+ diff.tv_nsec += N1e9;
+ }
+
+ diff.tv_sec += _self->mod_ts.tv_sec;
+ diff.tv_nsec += _self->mod_ts.tv_nsec;
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += 1;
+ diff.tv_nsec -= N1e9;
+ }
+
+ if (diff.tv_sec > -1 && diff.tv_nsec > -1) {
+#if HAVE_CLOCK_NANOSLEEP
+ struct timespec to = {
+ _self->last_ts.tv_sec + diff.tv_sec,
+ _self->last_ts.tv_nsec + diff.tv_nsec
+ };
+
+ if (to.tv_nsec >= N1e9) {
+ to.tv_sec += 1;
+ to.tv_nsec -= N1e9;
+ } else if (to.tv_nsec < 0) {
+ to.tv_sec -= 1;
+ to.tv_nsec += N1e9;
+ }
+
+ while (ret) {
+ ldebug("increase mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec);
+ ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret);
+ }
+ }
+#elif HAVE_NANOSLEEP
+ while (ret) {
+ ldebug("increase mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec);
+ if ((ret = nanosleep(&diff, &diff))) {
+ ret = errno;
+ if (ret != EINTR) {
+ lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret);
+ }
+ }
+ }
+#endif
+ }
+
+ _self->last_pkthdr_ts = pkt->ts;
+
+#if HAVE_CLOCK_NANOSLEEP
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+#endif
+}
+
+static void _reduce(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+ struct timespec diff = {
+ pkt->ts.sec - _self->last_pkthdr_ts.sec,
+ pkt->ts.nsec - _self->last_pkthdr_ts.nsec
+ };
+ int ret = EINTR;
+
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += 1;
+ diff.tv_nsec -= N1e9;
+ } else if (diff.tv_nsec < 0) {
+ diff.tv_sec -= 1;
+ diff.tv_nsec += N1e9;
+ }
+
+ diff.tv_sec -= _self->mod_ts.tv_sec;
+ diff.tv_nsec -= _self->mod_ts.tv_nsec;
+ if (diff.tv_nsec < 0) {
+ diff.tv_sec -= 1;
+ diff.tv_nsec += N1e9;
+ }
+
+ if (diff.tv_sec > -1 && diff.tv_nsec > -1) {
+#if HAVE_CLOCK_NANOSLEEP
+ struct timespec to = {
+ _self->last_ts.tv_sec + diff.tv_sec,
+ _self->last_ts.tv_nsec + diff.tv_nsec
+ };
+
+ if (to.tv_nsec >= N1e9) {
+ to.tv_sec += 1;
+ to.tv_nsec -= N1e9;
+ } else if (to.tv_nsec < 0) {
+ to.tv_sec -= 1;
+ to.tv_nsec += N1e9;
+ }
+
+ while (ret) {
+ ldebug("reduce mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec);
+ ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret);
+ }
+ }
+#elif HAVE_NANOSLEEP
+ while (ret) {
+ ldebug("reduce mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec);
+ if ((ret = nanosleep(&diff, &diff))) {
+ ret = errno;
+ if (ret != EINTR) {
+ lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret);
+ }
+ }
+ }
+#endif
+ }
+
+ _self->last_pkthdr_ts = pkt->ts;
+
+#if HAVE_CLOCK_NANOSLEEP
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+#endif
+}
+
+static void _multiply(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+ struct timespec diff = {
+ pkt->ts.sec - _self->last_pkthdr_ts.sec,
+ pkt->ts.nsec - _self->last_pkthdr_ts.nsec
+ };
+ int ret = EINTR;
+
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += 1;
+ diff.tv_nsec -= N1e9;
+ } else if (diff.tv_nsec < 0) {
+ diff.tv_sec -= 1;
+ diff.tv_nsec += N1e9;
+ }
+
+ diff.tv_sec = (time_t)((float)diff.tv_sec * self->mul);
+ diff.tv_nsec = (long)((float)diff.tv_nsec * self->mul);
+ if (diff.tv_nsec >= N1e9) {
+ diff.tv_sec += diff.tv_nsec / N1e9;
+ diff.tv_nsec %= N1e9;
+ }
+
+ if (diff.tv_sec > -1 && diff.tv_nsec > -1) {
+#if HAVE_CLOCK_NANOSLEEP
+ struct timespec to = {
+ _self->last_ts.tv_sec + diff.tv_sec,
+ _self->last_ts.tv_nsec + diff.tv_nsec
+ };
+
+ if (to.tv_nsec >= N1e9) {
+ to.tv_sec += 1;
+ to.tv_nsec -= N1e9;
+ } else if (to.tv_nsec < 0) {
+ to.tv_sec -= 1;
+ to.tv_nsec += N1e9;
+ }
+
+ while (ret) {
+ ldebug("multiply mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec);
+ ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret);
+ }
+ }
+#elif HAVE_NANOSLEEP
+ while (ret) {
+ ldebug("multiply mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec);
+ if ((ret = nanosleep(&diff, &diff))) {
+ ret = errno;
+ if (ret != EINTR) {
+ lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret);
+ }
+ }
+ }
+#endif
+ }
+
+ _self->last_pkthdr_ts = pkt->ts;
+
+#if HAVE_CLOCK_NANOSLEEP
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+#endif
+}
+
+static void _fixed(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+ struct timespec diff = {
+ _self->mod_ts.tv_sec,
+ _self->mod_ts.tv_nsec
+ };
+ int ret = EINTR;
+
+ if (diff.tv_sec > -1 && diff.tv_nsec > -1) {
+#if HAVE_CLOCK_NANOSLEEP
+ struct timespec to = {
+ _self->last_ts.tv_sec + diff.tv_sec,
+ _self->last_ts.tv_nsec + diff.tv_nsec
+ };
+
+ if (to.tv_nsec >= N1e9) {
+ to.tv_sec += 1;
+ to.tv_nsec -= N1e9;
+ } else if (to.tv_nsec < 0) {
+ to.tv_sec -= 1;
+ to.tv_nsec += N1e9;
+ }
+
+ while (ret) {
+ ldebug("fixed mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec);
+ ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret);
+ }
+ }
+#elif HAVE_NANOSLEEP
+ while (ret) {
+ ldebug("fixed mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec);
+ if ((ret = nanosleep(&diff, &diff))) {
+ ret = errno;
+ if (ret != EINTR) {
+ lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret);
+ }
+ }
+ }
+#endif
+ }
+
+ _self->last_pkthdr_ts = pkt->ts;
+
+#if HAVE_CLOCK_NANOSLEEP
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+#endif
+}
+
+#if HAVE_CLOCK_NANOSLEEP
+static inline void _timespec_diff(struct timespec* start, struct timespec* stop,
+ struct timespec* result)
+{
+ if ((stop->tv_nsec - start->tv_nsec) < 0) {
+ mlassert(stop->tv_sec > start->tv_sec, "stop time must be after start time");
+ result->tv_sec = stop->tv_sec - start->tv_sec - 1;
+ result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000UL;
+ } else {
+ mlassert(stop->tv_sec >= start->tv_sec, "stop time must be after start time");
+ result->tv_sec = stop->tv_sec - start->tv_sec;
+ result->tv_nsec = stop->tv_nsec - start->tv_nsec;
+ }
+}
+
+static void _realtime(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+ _self->counter++;
+ if (_self->counter >= self->rt_batch) {
+ struct timespec simulated;
+
+ _self->counter = 0;
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+
+ // calculate simulated time from packet offsets
+ simulated.tv_sec = pkt->ts.sec;
+ simulated.tv_nsec = pkt->ts.nsec;
+ _timespec_diff(&_self->mod_ts, &simulated, &simulated);
+
+ // calculate real elapsed time from monotonic clock
+ _timespec_diff(&_self->first_ts, &_self->last_ts, &_self->diff);
+
+ linfo("simulated time: %ld.%09lds; real time: %ld.%09lds",
+ simulated.tv_sec, simulated.tv_nsec, _self->diff.tv_sec, _self->diff.tv_nsec);
+
+ if (simulated.tv_sec > _self->diff.tv_sec
+ || (simulated.tv_sec == _self->diff.tv_sec && simulated.tv_nsec > _self->diff.tv_nsec)) {
+ int ret = EINTR;
+ _timespec_diff(&_self->diff, &simulated, &simulated);
+
+ ldebug("sleeping for %ld.%09lds", simulated.tv_sec, simulated.tv_nsec);
+ while (ret) {
+ ret = clock_nanosleep(CLOCK_MONOTONIC, 0, &simulated, 0);
+ if (ret && ret != EINTR) {
+ lfatal("clock_nanosleep(%ld.%09ld) %d", simulated.tv_sec, simulated.tv_nsec, ret);
+ }
+ }
+ } else {
+ // check that real time didn't drift ahead more than specified drift limit
+ _timespec_diff(&simulated, &_self->diff, &_self->diff);
+ if (_self->diff.tv_sec > (self->rt_drift / N1e9)
+ || (_self->diff.tv_sec == (self->rt_drift / N1e9) && _self->diff.tv_nsec >= (self->rt_drift % N1e9))) {
+ lfatal("aborting, real time drifted ahead of simulated time (%ld.%09lds) by %ld.%09lds",
+ simulated.tv_sec, simulated.tv_nsec, _self->diff.tv_sec, _self->diff.tv_nsec);
+ }
+ }
+ }
+}
+#endif
+
+static void _init(filter_timing_t* self, const core_object_pcap_t* pkt)
+{
+#if HAVE_CLOCK_NANOSLEEP
+ if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) {
+ lfatal("clock_gettime()");
+ }
+ _self->first_ts = _self->last_ts;
+ _self->diff = _self->last_ts;
+ _self->diff.tv_sec -= pkt->ts.sec;
+ _self->diff.tv_nsec -= pkt->ts.nsec;
+ ldebug("init with clock_nanosleep() now is %ld.%09ld, diff of first pkt %ld.%09ld",
+ _self->last_ts.tv_sec, _self->last_ts.tv_nsec,
+ _self->diff.tv_sec, _self->diff.tv_nsec);
+#elif HAVE_NANOSLEEP
+ ldebug("init with nanosleep()");
+#else
+#error "No clock_nanosleep() or nanosleep(), can not continue"
+#endif
+
+ _self->last_pkthdr_ts = pkt->ts;
+
+ switch (self->mode) {
+ case TIMING_MODE_KEEP:
+ ldebug("init mode keep");
+ _self->timing_callback = _keep;
+ break;
+ case TIMING_MODE_INCREASE:
+ _self->timing_callback = _increase;
+ _self->mod_ts.tv_sec = self->inc / N1e9;
+ _self->mod_ts.tv_nsec = self->inc % N1e9;
+ ldebug("init mode increase by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec);
+ break;
+ case TIMING_MODE_REDUCE:
+ _self->timing_callback = _reduce;
+ _self->mod_ts.tv_sec = self->red / N1e9;
+ _self->mod_ts.tv_nsec = self->red % N1e9;
+ ldebug("init mode reduce by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec);
+ break;
+ case TIMING_MODE_MULTIPLY:
+ _self->timing_callback = _multiply;
+ ldebug("init mode multiply by %f", self->mul);
+ break;
+ case TIMING_MODE_FIXED:
+ _self->timing_callback = _fixed;
+ _self->mod_ts.tv_sec = self->fixed / N1e9;
+ _self->mod_ts.tv_nsec = self->fixed % N1e9;
+ ldebug("init mode fixed by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec);
+ break;
+ case TIMING_MODE_REALTIME:
+#if HAVE_CLOCK_NANOSLEEP
+ ldebug("init mode realtime");
+ _self->timing_callback = _realtime;
+ _self->counter = 0;
+ _self->mod_ts.tv_sec = pkt->ts.sec;
+ _self->mod_ts.tv_nsec = pkt->ts.nsec;
+#else
+ lfatal("realtime mode requires clock_nanosleep()");
+#endif
+ break;
+ default:
+ lfatal("invalid timing mode %d", self->mode);
+ }
+}
+
+filter_timing_t* filter_timing_new()
+{
+ filter_timing_t* self;
+ mlfatal_oom(self = malloc(sizeof(_filter_timing_t)));
+ *self = _defaults;
+ _self->timing_callback = _init;
+
+ return self;
+}
+
+void filter_timing_free(filter_timing_t* self)
+{
+ mlassert_self();
+ free(self);
+}
+
+static void _receive(filter_timing_t* self, const core_object_t* obj)
+{
+ mlassert_self();
+ lassert(obj, "obj is nil");
+
+ if (obj->obj_type != CORE_OBJECT_PCAP) {
+ lfatal("obj is not CORE_OBJECT_PCAP");
+ }
+
+ _self->timing_callback(self, (core_object_pcap_t*)obj);
+ self->recv(self->ctx, obj);
+}
+
+core_receiver_t filter_timing_receiver(filter_timing_t* self)
+{
+ mlassert_self();
+
+ if (!self->recv) {
+ lfatal("no receiver set");
+ }
+
+ return (core_receiver_t)_receive;
+}
+
+static const core_object_t* _produce(filter_timing_t* self)
+{
+ const core_object_t* obj;
+ mlassert_self();
+
+ obj = self->prod(self->prod_ctx);
+ if (!obj || obj->obj_type != CORE_OBJECT_PCAP) {
+ return 0;
+ }
+
+ _self->timing_callback(self, (core_object_pcap_t*)obj);
+ return obj;
+}
+
+core_producer_t filter_timing_producer(filter_timing_t* self)
+{
+ mlassert_self();
+
+ if (!self->prod) {
+ lfatal("no producer set");
+ }
+
+ return (core_producer_t)_produce;
+}
diff --git a/src/filter/timing.h b/src/filter/timing.h
new file mode 100644
index 0000000..c00c43d
--- /dev/null
+++ b/src/filter/timing.h
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "core/log.h"
+#include "core/receiver.h"
+#include "core/producer.h"
+
+#ifndef __dnsjit_filter_timing_h
+#define __dnsjit_filter_timing_h
+
+#include "filter/timing.hh"
+
+#endif
diff --git a/src/filter/timing.hh b/src/filter/timing.hh
new file mode 100644
index 0000000..8615907
--- /dev/null
+++ b/src/filter/timing.hh
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2018-2021, OARC, Inc.
+ * All rights reserved.
+ *
+ * This file is part of dnsjit.
+ *
+ * dnsjit is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * dnsjit is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+//lua:require("dnsjit.core.log")
+//lua:require("dnsjit.core.receiver_h")
+//lua:require("dnsjit.core.producer_h")
+//lua:require("dnsjit.core.timespec_h")
+
+typedef struct filter_timing {
+ core_log_t _log;
+ core_receiver_t recv;
+ void* ctx;
+ enum {
+ TIMING_MODE_KEEP = 0,
+ TIMING_MODE_INCREASE = 1,
+ TIMING_MODE_REDUCE = 2,
+ TIMING_MODE_MULTIPLY = 3,
+ TIMING_MODE_FIXED = 4,
+ TIMING_MODE_REALTIME = 5
+ } mode;
+ size_t inc, red, fixed, rt_batch;
+ float mul;
+ uint64_t rt_drift;
+
+ core_producer_t prod;
+ void* prod_ctx;
+} filter_timing_t;
+
+core_log_t* filter_timing_log();
+
+filter_timing_t* filter_timing_new();
+void filter_timing_free(filter_timing_t* self);
+
+core_receiver_t filter_timing_receiver(filter_timing_t* self);
+core_producer_t filter_timing_producer(filter_timing_t* self);
diff --git a/src/filter/timing.lua b/src/filter/timing.lua
new file mode 100644
index 0000000..cab9574
--- /dev/null
+++ b/src/filter/timing.lua
@@ -0,0 +1,123 @@
+-- Copyright (c) 2018-2021, OARC, Inc.
+-- All rights reserved.
+--
+-- This file is part of dnsjit.
+--
+-- dnsjit is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU General Public License as published by
+-- the Free Software Foundation, either version 3 of the License, or
+-- (at your option) any later version.
+--
+-- dnsjit is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with dnsjit. If not, see <http://www.gnu.org/licenses/>.
+
+-- dnsjit.filter.timing
+-- Filter to pass objects to the next receiver based on timing between packets
+-- local filter = require("dnsjit.filter.timing").new()
+-- ...
+-- filter:receiver(...)
+--
+-- Filter to manipulate processing so it simulates the actual timing when
+-- packets arrived or to delay processing.
+module(...,package.seeall)
+
+require("dnsjit.filter.timing_h")
+local ffi = require("ffi")
+local C = ffi.C
+
+local Timing = {}
+
+-- Create a new Timing filter.
+function Timing.new()
+ local self = {
+ _receiver = nil,
+ obj = C.filter_timing_new(),
+ }
+ ffi.gc(self.obj, C.filter_timing_free)
+ return setmetatable(self, { __index = Timing })
+end
+
+-- Return the Log object to control logging of this instance or module.
+function Timing:log()
+ if self == nil then
+ return C.filter_timing_log()
+ end
+ return self.obj._log
+end
+
+-- Set the timing mode to keep the timing between packets.
+function Timing:keep()
+ self.obj.mode = "TIMING_MODE_KEEP"
+end
+
+-- Set the timing mode to increase the timing between packets by the given
+-- number of nanoseconds.
+function Timing:increase(ns)
+ self.obj.mode = "TIMING_MODE_INCREASE"
+ self.obj.inc = ns
+end
+
+-- Set the timing mode to reduce the timing between packets by the given
+-- number of nanoseconds.
+function Timing:reduce(ns)
+ self.obj.mode = "TIMING_MODE_REDUCE"
+ self.obj.red = ns
+end
+
+-- Set the timing mode to multiply the timing between packets by the given
+-- factor (float/double).
+function Timing:multiply(factor)
+ self.obj.mode = "TIMING_MODE_MULTIPLY"
+ self.obj.mul = factor
+end
+
+-- Set the timing mode to a fixed number of nanoseconds between packets.
+function Timing:fixed(ns)
+ self.obj.mode = "TIMING_MODE_FIXED"
+ self.obj.fixed = ns
+end
+
+-- Set the timing mode to simulate the timing of packets in realtime.
+-- Packets are processed in batches of given size (default 128) before
+-- adjusting time. Aborts if real time drifts ahead more than given
+-- number of seconds (default 1.0s).
+function Timing:realtime(drift, batch_size)
+ self.obj.mode = "TIMING_MODE_REALTIME"
+ if drift == nil then
+ drift = 1
+ end
+ if batch_size == nil then
+ batch_size = 128
+ end
+ self.obj.rt_batch = batch_size
+ self.obj.rt_drift = math.floor(drift * 1000000000)
+end
+
+-- Return the C functions and context for receiving objects.
+function Timing:receive()
+ return C.filter_timing_receiver(), self.obj
+end
+
+-- Set the receiver to pass objects to.
+function Timing:receiver(o)
+ self.obj.recv, self.obj.ctx = o:receive()
+ self._receiver = o
+end
+
+-- Return the C functions and context for producing objects.
+function Timing:produce()
+ return C.filter_timing_producer(self.obj), self.obj
+end
+
+-- Set the producer to get objects from.
+function Timing:producer(o)
+ self.obj.prod, self.obj.prod_ctx = o:produce()
+ self._producer = o
+end
+
+return Timing