From 4754ed45b607e82450a5e31fea1da3ba61433b04 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Mar 2021 08:54:12 +0100 Subject: Adding upstream version 1.1.0+debian. Signed-off-by: Daniel Baumann --- src/filter/copy.c | 192 ++++++++++++++ src/filter/copy.h | 30 +++ src/filter/copy.hh | 40 +++ src/filter/copy.lua | 74 ++++++ src/filter/ipsplit.c | 270 +++++++++++++++++++ src/filter/ipsplit.h | 29 +++ src/filter/ipsplit.hh | 61 +++++ src/filter/ipsplit.lua | 122 +++++++++ src/filter/layer.c | 689 +++++++++++++++++++++++++++++++++++++++++++++++++ src/filter/layer.h | 44 ++++ src/filter/layer.hh | 70 +++++ src/filter/layer.lua | 93 +++++++ src/filter/split.c | 114 ++++++++ src/filter/split.h | 29 +++ src/filter/split.hh | 50 ++++ src/filter/split.lua | 80 ++++++ src/filter/timing.c | 557 +++++++++++++++++++++++++++++++++++++++ src/filter/timing.h | 30 +++ src/filter/timing.hh | 52 ++++ src/filter/timing.lua | 123 +++++++++ 20 files changed, 2749 insertions(+) create mode 100644 src/filter/copy.c create mode 100644 src/filter/copy.h create mode 100644 src/filter/copy.hh create mode 100644 src/filter/copy.lua create mode 100644 src/filter/ipsplit.c create mode 100644 src/filter/ipsplit.h create mode 100644 src/filter/ipsplit.hh create mode 100644 src/filter/ipsplit.lua create mode 100644 src/filter/layer.c create mode 100644 src/filter/layer.h create mode 100644 src/filter/layer.hh create mode 100644 src/filter/layer.lua create mode 100644 src/filter/split.c create mode 100644 src/filter/split.h create mode 100644 src/filter/split.hh create mode 100644 src/filter/split.lua create mode 100644 src/filter/timing.c create mode 100644 src/filter/timing.h create mode 100644 src/filter/timing.hh create mode 100644 src/filter/timing.lua (limited to 'src/filter') diff --git a/src/filter/copy.c b/src/filter/copy.c new file mode 100644 index 0000000..04a90ef --- /dev/null +++ b/src/filter/copy.c @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2019, CZ.NIC, z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "config.h" + +#include "filter/copy.h" +#include "core/assert.h" + +static core_log_t _log = LOG_T_INIT("filter.copy"); +static filter_copy_t _defaults = { + LOG_T_INIT_OBJ("filter.copy"), + 0, 0, + 0 +}; + +core_log_t* filter_copy_log() +{ + return &_log; +} + +void filter_copy_init(filter_copy_t* self) +{ + mlassert_self(); + + *self = _defaults; +} + +void filter_copy_destroy(filter_copy_t* self) +{ + mlassert_self(); +} + +void filter_copy_set(filter_copy_t* self, int32_t obj_type) +{ + mlassert_self(); + + switch (obj_type) { + case CORE_OBJECT_NONE: + self->copy |= 0x1; + break; + case CORE_OBJECT_PCAP: + self->copy |= 0x2; + break; + case CORE_OBJECT_ETHER: + self->copy |= 0x4; + break; + case CORE_OBJECT_NULL: + self->copy |= 0x8; + break; + case CORE_OBJECT_LOOP: + self->copy |= 0x10; + break; + case CORE_OBJECT_LINUXSLL: + self->copy |= 0x20; + break; + case CORE_OBJECT_IEEE802: + self->copy |= 0x40; + break; + case CORE_OBJECT_GRE: + self->copy |= 0x80; + break; + case CORE_OBJECT_IP: + self->copy |= 0x100; + break; + case CORE_OBJECT_IP6: + self->copy |= 0x200; + break; + case CORE_OBJECT_ICMP: + self->copy |= 0x400; + break; + case CORE_OBJECT_ICMP6: + self->copy |= 0x800; + break; + case CORE_OBJECT_UDP: + self->copy |= 0x1000; + break; + case CORE_OBJECT_TCP: + self->copy |= 0x2000; + break; + case CORE_OBJECT_PAYLOAD: + self->copy |= 0x4000; + break; + case CORE_OBJECT_DNS: + self->copy |= 0x8000; + break; + default: + lfatal("unknown type %d", obj_type); + } +} + +uint64_t filter_copy_get(filter_copy_t* self, int32_t obj_type) +{ + mlassert_self(); + + switch (obj_type) { + case CORE_OBJECT_NONE: + return self->copy & 0x1; + case CORE_OBJECT_PCAP: + return self->copy & 0x2; + case CORE_OBJECT_ETHER: + return self->copy & 0x4; + case CORE_OBJECT_NULL: + return self->copy & 0x8; + case CORE_OBJECT_LOOP: + return self->copy & 0x10; + case CORE_OBJECT_LINUXSLL: + return self->copy & 0x20; + case CORE_OBJECT_IEEE802: + return self->copy & 0x40; + case CORE_OBJECT_GRE: + return self->copy & 0x80; + case CORE_OBJECT_IP: + return self->copy & 0x100; + case CORE_OBJECT_IP6: + return self->copy & 0x200; + case CORE_OBJECT_ICMP: + return self->copy & 0x400; + case CORE_OBJECT_ICMP6: + return self->copy & 0x800; + case CORE_OBJECT_UDP: + return self->copy & 0x1000; + case CORE_OBJECT_TCP: + return self->copy & 0x2000; + case CORE_OBJECT_PAYLOAD: + return self->copy & 0x4000; + case CORE_OBJECT_DNS: + return self->copy & 0x8000; + default: + lfatal("unknown type %d", obj_type); + } + return 0; +} + +static void _receive(filter_copy_t* self, const core_object_t* obj) +{ + mlassert_self(); + lassert(obj, "obj is nil"); + + core_object_t* outobj = NULL; + core_object_t* next = NULL; + core_object_t* current = NULL; + const core_object_t* srcobj = obj; + + do { + if (filter_copy_get(self, srcobj->obj_type)) { + next = current; + current = core_object_copy(srcobj); + if (next == NULL) { + next = current; + outobj = current; + } else { + next->obj_prev = current; + } + } + srcobj = srcobj->obj_prev; + } while (srcobj != NULL); + + if (outobj == NULL) { + lnotice("object discarded (no types to copy)"); + return; + } + + self->recv(self->recv_ctx, outobj); +} + +core_receiver_t filter_copy_receiver(filter_copy_t* self) +{ + mlassert_self(); + + if (!self->recv) { + lfatal("no receiver(s) set"); + } + + return (core_receiver_t)_receive; +} diff --git a/src/filter/copy.h b/src/filter/copy.h new file mode 100644 index 0000000..96c8db0 --- /dev/null +++ b/src/filter/copy.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019, CZ.NIC, z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "core/log.h" +#include "core/object.h" +#include "core/receiver.h" + +#ifndef __dnsjit_filter_copy_h +#define __dnsjit_filter_copy_h + +#include "filter/copy.hh" + +#endif diff --git a/src/filter/copy.hh b/src/filter/copy.hh new file mode 100644 index 0000000..edf7fc7 --- /dev/null +++ b/src/filter/copy.hh @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019, CZ.NIC z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +//lua:require("dnsjit.core.log") +//lua:require("dnsjit.core.receiver_h") + +typedef struct filter_copy { + core_log_t _log; + + core_receiver_t recv; + void* recv_ctx; + + uint64_t copy; +} filter_copy_t; + +core_log_t* filter_copy_log(); + +void filter_copy_init(filter_copy_t* self); +void filter_copy_destroy(filter_copy_t* self); +void filter_copy_set(filter_copy_t* self, int32_t obj_type); +uint64_t filter_copy_get(filter_copy_t* self, int32_t obj_type); + +core_receiver_t filter_copy_receiver(filter_copy_t* self); diff --git a/src/filter/copy.lua b/src/filter/copy.lua new file mode 100644 index 0000000..4be469f --- /dev/null +++ b/src/filter/copy.lua @@ -0,0 +1,74 @@ +-- Copyright (c) 2019 CZ.NIC, z.s.p.o. +-- All rights reserved. +-- +-- This file is part of dnsjit. +-- +-- dnsjit is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- dnsjit is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with dnsjit. If not, see . + +-- dnsjit.filter.copy +-- Creates a copy of an object chain with selected object types. +-- local copy = require("dnsjit.filter.copy").new() +-- local object = require("dnsjit.core.objects") +-- copy:obj_type(object.PAYLOAD) +-- copy:obj_type(object.IP6) +-- channel:receiver(copy) +-- +-- Filter to create a copy of the object chain with selected object types. +-- The user is responsible for manually freeing the created object chain. +module(...,package.seeall) + +require("dnsjit.filter.copy_h") +local object = require("dnsjit.core.object") +local ffi = require("ffi") +local C = ffi.C + +local t_name = "filter_copy_t" +local filter_copy_t = ffi.typeof(t_name) +local Copy = {} + +-- Create a new Copy filter. +function Copy.new() + local self = { + obj = filter_copy_t(), + } + C.filter_copy_init(self.obj) + ffi.gc(self.obj, C.filter_copy_destroy) + return setmetatable(self, { __index = Copy }) +end + +-- Return the Log object to control logging of this instance or module. +function Copy:log() + if self == nil then + return C.filter_copy_log() + end + return self.obj._log +end + +-- Set the object type to be copied. Can be called multiple times to copy +-- multiple object types from the object chain. +function Copy:obj_type(obj_type) + C.filter_copy_set(self.obj, obj_type) +end + +-- Return the C functions and context for receiving objects. +function Copy:receive() + return C.filter_copy_receiver(self.obj), self.obj +end + +-- Set the receiver to pass objects to. +function Copy:receiver(o) + self.obj.recv, self.obj.recv_ctx = o:receive() +end + +return Copy diff --git a/src/filter/ipsplit.c b/src/filter/ipsplit.c new file mode 100644 index 0000000..8632e2e --- /dev/null +++ b/src/filter/ipsplit.c @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2019-2020, CZ.NIC, z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "config.h" + +#include "filter/ipsplit.h" +#include "core/assert.h" +#include "core/object/ip.h" +#include "core/object/ip6.h" +#include "lib/trie.h" + +#include + +typedef struct _filter_ipsplit { + filter_ipsplit_t pub; + + trie_t* trie; + uint32_t weight_total; +} _filter_ipsplit_t; + +typedef struct _client { + /* Receiver-specific client ID (1..N) in host byte order. */ + /* Client ID starts at 1 to avoid issues with lua. */ + uint8_t id[4]; + + filter_ipsplit_recv_t* recv; +} _client_t; + +#define _self ((_filter_ipsplit_t*)self) + +static core_log_t _log = LOG_T_INIT("filter.ipsplit"); +static filter_ipsplit_t _defaults = { + LOG_T_INIT_OBJ("filter.ipsplit"), + IPSPLIT_MODE_SEQUENTIAL, IPSPLIT_OVERWRITE_NONE, + 0, + NULL +}; + +core_log_t* filter_ipsplit_log() +{ + return &_log; +} + +filter_ipsplit_t* filter_ipsplit_new() +{ + filter_ipsplit_t* self; + + mlfatal_oom(self = malloc(sizeof(_filter_ipsplit_t))); + *self = _defaults; + lfatal_oom(_self->trie = trie_create(NULL)); + _self->weight_total = 0; + + return self; +} + +static int _free_trie_value(trie_val_t* val, void* ctx) +{ + free(*val); + return 0; +} + +void filter_ipsplit_free(filter_ipsplit_t* self) +{ + filter_ipsplit_recv_t* first; + filter_ipsplit_recv_t* r; + mlassert_self(); + + trie_apply(_self->trie, _free_trie_value, NULL); + trie_free(_self->trie); + + if (self->recv) { + first = self->recv; + do { + r = self->recv->next; + free(self->recv); + self->recv = r; + } while (self->recv != first); + } + + free(self); +} + +void filter_ipsplit_add(filter_ipsplit_t* self, core_receiver_t recv, void* ctx, uint32_t weight) +{ + filter_ipsplit_recv_t* r; + mlassert_self(); + lassert(recv, "recv is nil"); + lassert(weight > 0, "weight must be positive integer"); + + _self->weight_total += weight; + + lfatal_oom(r = malloc(sizeof(filter_ipsplit_recv_t))); + r->recv = recv; + r->ctx = ctx; + r->n_clients = 0; + r->weight = weight; + + if (!self->recv) { + r->next = r; + self->recv = r; + } else { + r->next = self->recv->next; + self->recv->next = r; + } +} + +/* + * Use portable pseudo-random number generator. + */ +static uint32_t _rand_val = 1; + +static uint32_t _rand() +{ + _rand_val = ((_rand_val * 1103515245) + 12345) & 0x7fffffff; + return _rand_val; +} + +void filter_ipsplit_srand(uint32_t seed) +{ + _rand_val = seed; +} + +static void _assign_client_to_receiver(filter_ipsplit_t* self, _client_t* client) +{ + uint32_t id = 0; + filter_ipsplit_recv_t* recv = 0; + + switch (self->mode) { + case IPSPLIT_MODE_SEQUENTIAL: + recv = self->recv; + id = ++recv->n_clients; + /* When *weight* clients are assigned, switch to next receiver. */ + if (recv->n_clients % recv->weight == 0) + self->recv = recv->next; + break; + case IPSPLIT_MODE_RANDOM: { + /* Get random number from [1, weight_total], then iterate through + * receivers until their weights add up to at least this value. */ + int32_t random = (int32_t)(_rand() % _self->weight_total) + 1; + while (random > 0) { + random -= self->recv->weight; + if (random > 0) + self->recv = self->recv->next; + } + recv = self->recv; + id = ++recv->n_clients; + break; + } + default: + lfatal("invalid ipsplit mode"); + } + + client->recv = recv; + memcpy(client->id, &id, sizeof(client->id)); +} + +/* + * Optionally, write client ID into byte 0-3 of src/dst IP address in the packet. + * + * Client ID is a 4-byte array in host byte order. + */ +static void _overwrite(filter_ipsplit_t* self, core_object_t* obj, _client_t* client) +{ + mlassert_self(); + lassert(obj, "invalid object"); + lassert(client, "invalid client"); + + core_object_ip_t* ip; + core_object_ip6_t* ip6; + + switch (self->overwrite) { + case IPSPLIT_OVERWRITE_NONE: + return; + case IPSPLIT_OVERWRITE_SRC: + if (obj->obj_type == CORE_OBJECT_IP) { + ip = (core_object_ip_t*)obj; + memcpy(&ip->src, client->id, sizeof(client->id)); + } else if (obj->obj_type == CORE_OBJECT_IP6) { + ip6 = (core_object_ip6_t*)obj; + memcpy(&ip6->src, client->id, sizeof(client->id)); + } + break; + case IPSPLIT_OVERWRITE_DST: + if (obj->obj_type == CORE_OBJECT_IP) { + ip = (core_object_ip_t*)obj; + memcpy(&ip->dst, client->id, sizeof(client->id)); + } else if (obj->obj_type == CORE_OBJECT_IP6) { + ip6 = (core_object_ip6_t*)obj; + memcpy(&ip6->dst, client->id, sizeof(client->id)); + } + break; + default: + lfatal("invalid overwrite mode"); + } +} + +static void _receive(filter_ipsplit_t* self, const core_object_t* obj) +{ + mlassert_self(); + + /* Find ip/ip6 object in chain. */ + core_object_t* pkt = (core_object_t*)obj; + while (pkt != NULL) { + if (pkt->obj_type == CORE_OBJECT_IP || pkt->obj_type == CORE_OBJECT_IP6) + break; + pkt = (core_object_t*)pkt->obj_prev; + } + if (pkt == NULL) { + self->discarded++; + lwarning("packet discarded (missing ip/ip6 object)"); + return; + } + + /* Lookup IPv4/IPv6 address in trie (prefix-tree). Inserts new node if not found. */ + trie_val_t* node = 0; + switch (pkt->obj_type) { + case CORE_OBJECT_IP: { + core_object_ip_t* ip = (core_object_ip_t*)pkt; + node = trie_get_ins(_self->trie, ip->src, sizeof(ip->src)); + break; + } + case CORE_OBJECT_IP6: { + core_object_ip6_t* ip6 = (core_object_ip6_t*)pkt; + node = trie_get_ins(_self->trie, ip6->src, sizeof(ip6->src)); + break; + } + default: + lfatal("unsupported object type"); + } + lassert(node, "trie failure"); + + _client_t* client; + if (*node == NULL) { /* IP address not found in tree -> create new client. */ + lfatal_oom(client = malloc(sizeof(_client_t))); + *node = (void*)client; + _assign_client_to_receiver(self, client); + } + + client = (_client_t*)*node; + _overwrite(self, pkt, client); + client->recv->recv(client->recv->ctx, obj); +} + +core_receiver_t filter_ipsplit_receiver(filter_ipsplit_t* self) +{ + mlassert_self(); + + if (!self->recv) { + lfatal("no receiver(s) set"); + } + + return (core_receiver_t)_receive; +} diff --git a/src/filter/ipsplit.h b/src/filter/ipsplit.h new file mode 100644 index 0000000..16c932e --- /dev/null +++ b/src/filter/ipsplit.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019-2020 CZ.NIC, z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "core/log.h" +#include "core/receiver.h" + +#ifndef __dnsjit_filter_ipsplit_h +#define __dnsjit_filter_ipsplit_h + +#include "filter/ipsplit.hh" + +#endif diff --git a/src/filter/ipsplit.hh b/src/filter/ipsplit.hh new file mode 100644 index 0000000..c362ac3 --- /dev/null +++ b/src/filter/ipsplit.hh @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019-2020, CZ.NIC, z.s.p.o. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +//lua:require("dnsjit.core.log") +//lua:require("dnsjit.core.receiver_h") + +typedef struct filter_ipsplit_recv filter_ipsplit_recv_t; +struct filter_ipsplit_recv { + filter_ipsplit_recv_t* next; + + core_receiver_t recv; + void* ctx; + + uint32_t n_clients; /* Total number of clients assigned to this receiver. */ + + uint32_t weight; +}; + +typedef struct filter_ipsplit { + core_log_t _log; + + enum { + IPSPLIT_MODE_SEQUENTIAL = 0, + IPSPLIT_MODE_RANDOM = 1 + } mode; + enum { + IPSPLIT_OVERWRITE_NONE = 0, + IPSPLIT_OVERWRITE_SRC = 1, + IPSPLIT_OVERWRITE_DST = 2 + } overwrite; + + uint64_t discarded; + + filter_ipsplit_recv_t* recv; +} filter_ipsplit_t; + +core_log_t* filter_ipsplit_log(); + +filter_ipsplit_t* filter_ipsplit_new(); +void filter_ipsplit_free(filter_ipsplit_t* self); +void filter_ipsplit_add(filter_ipsplit_t* self, core_receiver_t recv, void* ctx, uint32_t weight); +void filter_ipsplit_srand(unsigned int seed); + +core_receiver_t filter_ipsplit_receiver(filter_ipsplit_t* self); diff --git a/src/filter/ipsplit.lua b/src/filter/ipsplit.lua new file mode 100644 index 0000000..cca0249 --- /dev/null +++ b/src/filter/ipsplit.lua @@ -0,0 +1,122 @@ +-- Copyright (c) 2019-2020 CZ.NIC, z.s.p.o. +-- All rights reserved. +-- +-- This file is part of dnsjit. +-- +-- dnsjit is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- dnsjit is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with dnsjit. If not, see . + +-- dnsjit.filter.ipsplit +-- Pass objects to receivers based on the source IP address +-- local ipsplit = require("dnsjit.filter.ipsplit").new() +-- ipsplit.receiver(...) +-- ipsplit.receiver(...) +-- ipsplit.receiver(...) +-- input.receiver(ipsplit) +-- +-- The filter passes objects to other receivers. +-- Object chains without IPv4/IPv6 packet are discarded. +-- Packets which have the same source IP address are considered to be sent +-- from the same "client". +-- When the first packet from a client is processed, the client is assigned +-- to a receiver. +-- All objects from this client will be passed to the assigned receiver. +-- The filter can also write a receiver-specific client ID (starting from 1) +-- to the source or destination IP in the packet. +module(...,package.seeall) + +require("dnsjit.filter.ipsplit_h") +local bit = require("bit") +local object = require("dnsjit.core.objects") +local ffi = require("ffi") +local C = ffi.C + +local IpSplit = {} + +-- Create a new IpSplit filter. +function IpSplit.new() + local self = { + obj = C.filter_ipsplit_new(), + } + ffi.gc(self.obj, C.filter_ipsplit_free) + return setmetatable(self, { __index = IpSplit }) +end + +-- Return the Log object to control logging of this instance or module. +function IpSplit:log() + if self == nil then + return C.filter_ipsplit_log() + end + return self.obj._log +end + +-- Return the C functions and context for receiving objects. +function IpSplit:receive() + local recv = C.filter_ipsplit_receiver(self.obj) + return recv, self.obj +end + +-- Set the receiver to pass objects to, this can be called multiple times to +-- set additional receivers. +-- The weight parameter can be used to adjust distribution of clients among +-- receivers. +-- Weight must be a positive integer (default is 1). +function IpSplit:receiver(o, weight) + local recv, ctx = o:receive() + if weight == nil then + weight = 1 + end + C.filter_ipsplit_add(self.obj, recv, ctx, weight) +end + +-- Number of input packets discarded due to various reasons. +-- To investigate causes, run with increased logging level. +function IpSplit:discarded() + return tonumber(self.obj.discarded) +end + +-- Set the client assignment mode to sequential. +-- Assigns `weight` clients to a receiver before continuing with the next +-- receiver (default mode). +function IpSplit:sequential() + self.obj.mode = "IPSPLIT_MODE_SEQUENTIAL" +end + +-- Set the client assignment mode to random. +-- Each client is randomly assigned to a receiver (weight affects the +-- probability). +-- The client assignment is stable (and portable) for given seed. +function IpSplit:random(seed) + self.obj.mode = "IPSPLIT_MODE_RANDOM" + if seed then + C.filter_ipsplit_srand(seed) + end +end + +-- Don't overwrite source or destination IP (default). +function IpSplit:overwrite_none() + self.obj.overwrite = "IPSPLIT_OVERWRITE_NONE" +end + +-- Write receiver-specific client ID to bytes 0-3 of source IP (host byte order). +function IpSplit:overwrite_src() + self.obj.overwrite = "IPSPLIT_OVERWRITE_SRC" +end + +-- Write receiver-specific client ID to bytes 0-3 of destination IP (host byte +-- order). +function IpSplit:overwrite_dst() + self.obj.overwrite = "IPSPLIT_OVERWRITE_DST" +end + +return IpSplit diff --git a/src/filter/layer.c b/src/filter/layer.c new file mode 100644 index 0000000..b360ca3 --- /dev/null +++ b/src/filter/layer.c @@ -0,0 +1,689 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "config.h" + +#include "filter/layer.h" +#include "core/assert.h" + +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_NET_ETHERNET_H +#include +#endif +#ifdef HAVE_NET_ETHERTYPES_H +#include +#endif +#ifdef HAVE_ENDIAN_H +#include +#else +#ifdef HAVE_SYS_ENDIAN_H +#include +#else +#ifdef HAVE_MACHINE_ENDIAN_H +#include +#endif +#endif +#endif +#ifdef HAVE_BYTESWAP_H +#include +#endif +#ifndef bswap_16 +#ifndef bswap16 +#define bswap_16(x) swap16(x) +#define bswap_32(x) swap32(x) +#define bswap_64(x) swap64(x) +#else +#define bswap_16(x) bswap16(x) +#define bswap_32(x) bswap32(x) +#define bswap_64(x) bswap64(x) +#endif +#endif + +#define N_IEEE802 3 + +static core_log_t _log = LOG_T_INIT("filter.layer"); +static filter_layer_t _defaults = { + LOG_T_INIT_OBJ("filter.layer"), + 0, 0, + 0, 0, + 0, + CORE_OBJECT_NULL_INIT(0), + CORE_OBJECT_ETHER_INIT(0), + CORE_OBJECT_LOOP_INIT(0), + CORE_OBJECT_LINUXSLL_INIT(0), + 0, { CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0) }, + CORE_OBJECT_IP_INIT(0), + CORE_OBJECT_IP6_INIT(0), + CORE_OBJECT_GRE_INIT(0), + CORE_OBJECT_ICMP_INIT(0), + CORE_OBJECT_ICMP6_INIT(0), + CORE_OBJECT_UDP_INIT(0), + CORE_OBJECT_TCP_INIT(0), + CORE_OBJECT_PAYLOAD_INIT(0) +}; + +core_log_t* filter_layer_log() +{ + return &_log; +} + +void filter_layer_init(filter_layer_t* self) +{ + mlassert_self(); + + *self = _defaults; +} + +void filter_layer_destroy(filter_layer_t* self) +{ + mlassert_self(); +} + +#define need4x2(v1, v2, p, l) \ + if (l < 1) { \ + break; \ + } \ + v1 = (*p) >> 4; \ + v2 = (*p) & 0xf; \ + p += 1; \ + l -= 1 + +#define need8(v, p, l) \ + if (l < 1) { \ + break; \ + } \ + v = *p; \ + p += 1; \ + l -= 1 + +static inline uint16_t _need16(const void* ptr) +{ + uint16_t v; + memcpy(&v, ptr, sizeof(v)); + return be16toh(v); +} + +#define need16(v, p, l) \ + if (l < 2) { \ + break; \ + } \ + v = _need16(p); \ + p += 2; \ + l -= 2 + +#define needr16(v, p, l) \ + if (l < 2) { \ + break; \ + } \ + v = bswap_16(_need16(p)); \ + p += 2; \ + l -= 2 + +static inline uint32_t _need32(const void* ptr) +{ + uint32_t v; + memcpy(&v, ptr, sizeof(v)); + return be32toh(v); +} + +#define need32(v, p, l) \ + if (l < 4) { \ + break; \ + } \ + v = _need32(p); \ + p += 4; \ + l -= 4 + +#define needr32(v, p, l) \ + if (l < 4) { \ + break; \ + } \ + v = bswap_32(_need32(p)); \ + p += 4; \ + l -= 4 + +#define needxb(b, x, p, l) \ + if (l < x) { \ + break; \ + } \ + memcpy(b, p, x); \ + p += x; \ + l -= x + +#define advancexb(x, p, l) \ + if (l < x) { \ + break; \ + } \ + p += x; \ + l -= x + +//static int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned char* pkt, size_t len); + +static inline int _proto(filter_layer_t* self, uint8_t proto, const core_object_t* obj, const unsigned char* pkt, size_t len) +{ + switch (proto) { + case IPPROTO_GRE: { + core_object_gre_t* gre = &self->gre; + gre->obj_prev = obj; + + need16(gre->gre_flags, pkt, len); + need16(gre->ether_type, pkt, len); + + /* TODO: Incomplete, check RFC 1701 */ + + self->produced = (core_object_t*)gre; + + // if (gre.gre_flags & 0x1) { + // need16(gre.checksum, pkt, len); + // } + // if (gre.gre_flags & 0x4) { + // need16(gre.key, pkt, len); + // } + // if (gre.gre_flags & 0x8) { + // need16(gre.sequence, pkt, len); + // } + // + // switch (gre.ether_type) { + // case ETHERTYPE_IP: + // case ETHERTYPE_IPV6: + // return _ip(self, (core_object_t*)gre, pkt, len); + // + // default: + // break; + // } + break; + } + case IPPROTO_ICMP: { + core_object_icmp_t* icmp = &self->icmp; + icmp->obj_prev = obj; + + need8(icmp->type, pkt, len); + need8(icmp->code, pkt, len); + need16(icmp->cksum, pkt, len); + + self->produced = (core_object_t*)icmp; + break; + } + case IPPROTO_ICMPV6: { + core_object_icmp6_t* icmp6 = &self->icmp6; + icmp6->obj_prev = obj; + + need8(icmp6->type, pkt, len); + need8(icmp6->code, pkt, len); + need16(icmp6->cksum, pkt, len); + + self->produced = (core_object_t*)icmp6; + break; + } + case IPPROTO_UDP: { + core_object_udp_t* udp = &self->udp; + core_object_payload_t* payload = &self->payload; + udp->obj_prev = obj; + + need16(udp->sport, pkt, len); + need16(udp->dport, pkt, len); + need16(udp->ulen, pkt, len); + need16(udp->sum, pkt, len); + + payload->obj_prev = (core_object_t*)udp; + + /* Check for padding */ + if (len > udp->ulen) { + payload->padding = len - udp->ulen; + payload->len = len - payload->padding; + } else { + payload->padding = 0; + payload->len = len; + } + payload->payload = (uint8_t*)pkt; + + self->produced = (core_object_t*)payload; + break; + } + case IPPROTO_TCP: { + core_object_tcp_t* tcp = &self->tcp; + core_object_payload_t* payload = &self->payload; + tcp->obj_prev = obj; + + need16(tcp->sport, pkt, len); + need16(tcp->dport, pkt, len); + need32(tcp->seq, pkt, len); + need32(tcp->ack, pkt, len); + need4x2(tcp->off, tcp->x2, pkt, len); + need8(tcp->flags, pkt, len); + need16(tcp->win, pkt, len); + need16(tcp->sum, pkt, len); + need16(tcp->urp, pkt, len); + if (tcp->off > 5) { + tcp->opts_len = (tcp->off - 5) * 4; + needxb(tcp->opts, tcp->opts_len, pkt, len); + } else { + tcp->opts_len = 0; + } + + payload->obj_prev = (core_object_t*)tcp; + + /* Check for padding */ + if (obj->obj_type == CORE_OBJECT_IP && len > (((const core_object_ip_t*)obj)->len - (((const core_object_ip_t*)obj)->hl * 4))) { + payload->padding = len - (((const core_object_ip_t*)obj)->len - (((const core_object_ip_t*)obj)->hl * 4)); + payload->len = len - payload->padding; + } else if (obj->obj_type == CORE_OBJECT_IP6 && len > ((const core_object_ip6_t*)obj)->plen) { + payload->padding = len - ((const core_object_ip6_t*)obj)->plen; + payload->len = len - payload->padding; + } else { + payload->padding = 0; + payload->len = len; + } + + payload->payload = (uint8_t*)pkt; + + self->produced = (core_object_t*)payload; + break; + } + default: + self->produced = obj; + break; + } + + return 0; +} + +static inline int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned char* pkt, size_t len) +{ + if (len) { + switch ((*pkt >> 4)) { + case 4: { + core_object_ip_t* ip = &self->ip; + + ip->obj_prev = obj; + + need4x2(ip->v, ip->hl, pkt, len); + need8(ip->tos, pkt, len); + need16(ip->len, pkt, len); + need16(ip->id, pkt, len); + need16(ip->off, pkt, len); + need8(ip->ttl, pkt, len); + need8(ip->p, pkt, len); + need16(ip->sum, pkt, len); + needxb(&ip->src, 4, pkt, len); + needxb(&ip->dst, 4, pkt, len); + + /* TODO: IPv4 options */ + + if (ip->hl < 5) + break; + if (ip->hl > 5) { + advancexb((ip->hl - 5) * 4, pkt, len); + } + + /* Check reported length for missing payload */ + if (ip->len < (ip->hl * 4)) { + break; + } + if (len < (ip->len - (ip->hl * 4))) { + break; + } + + if (ip->off & 0x2000 || ip->off & 0x1fff) { + core_object_payload_t* payload = &self->payload; + + payload->obj_prev = (core_object_t*)ip; + + /* Check for padding */ + if (len > (ip->len - (ip->hl * 4))) { + payload->padding = len - (ip->len - (ip->hl * 4)); + payload->len = len - payload->padding; + } else { + payload->padding = 0; + payload->len = len; + } + payload->payload = (uint8_t*)pkt; + + self->produced = (core_object_t*)payload; + return 0; + } + + return _proto(self, ip->p, (core_object_t*)ip, pkt, len); + } + case 6: { + core_object_ip6_t* ip6 = &self->ip6; + struct ip6_ext ext; + + ip6->obj_prev = obj; + ip6->is_frag = ip6->have_rtdst = 0; + + need32(ip6->flow, pkt, len); + need16(ip6->plen, pkt, len); + need8(ip6->nxt, pkt, len); + need8(ip6->hlim, pkt, len); + needxb(&ip6->src, 16, pkt, len); + needxb(&ip6->dst, 16, pkt, len); + + /* Check reported length for missing payload */ + if (len < ip6->plen) { + break; + } + + ext.ip6e_nxt = ip6->nxt; + ext.ip6e_len = 0; + while (ext.ip6e_nxt != IPPROTO_NONE + && ext.ip6e_nxt != IPPROTO_GRE + && ext.ip6e_nxt != IPPROTO_ICMPV6 + && ext.ip6e_nxt != IPPROTO_UDP + && ext.ip6e_nxt != IPPROTO_TCP) { + + /* + * Advance to the start of next header, this may not be needed + * if it's the first header or if the header is supported. + */ + if (ext.ip6e_len) { + advancexb(ext.ip6e_len * 8, pkt, len); + } + + /* TODO: Store IPv6 headers? */ + + /* Handle supported headers */ + if (ext.ip6e_nxt == IPPROTO_FRAGMENT) { + if (ip6->is_frag) { + return 1; + } + need8(ext.ip6e_nxt, pkt, len); + need8(ext.ip6e_len, pkt, len); + if (ext.ip6e_len) { + return 1; + } + need16(ip6->frag_offlg, pkt, len); + need32(ip6->frag_ident, pkt, len); + ip6->is_frag = 1; + } else if (ext.ip6e_nxt == IPPROTO_ROUTING) { + struct ip6_rthdr rthdr; + + if (ip6->have_rtdst) { + return 1; + } + + need8(ext.ip6e_nxt, pkt, len); + need8(ext.ip6e_len, pkt, len); + need8(rthdr.ip6r_type, pkt, len); + need8(rthdr.ip6r_segleft, pkt, len); + advancexb(4, pkt, len); + + if (!rthdr.ip6r_type && rthdr.ip6r_segleft) { + if (ext.ip6e_len & 1) { + return 1; + } + if (ext.ip6e_len > 2) { + advancexb(ext.ip6e_len - 2, pkt, len); + } + needxb(ip6->rtdst, 16, pkt, len); + ip6->have_rtdst = 1; + } + } else { + need8(ext.ip6e_nxt, pkt, len); + need8(ext.ip6e_len, pkt, len); + advancexb(6, pkt, len); + } + } + + if (ext.ip6e_nxt == IPPROTO_NONE || ip6->is_frag) { + core_object_payload_t* payload = &self->payload; + + payload->obj_prev = (core_object_t*)ip6; + + /* Check for padding */ + if (len > ip6->plen) { + payload->padding = len - ip6->plen; + payload->len = len - payload->padding; + } else { + payload->padding = 0; + payload->len = len; + } + payload->payload = (uint8_t*)pkt; + + self->produced = (core_object_t*)payload; + return 0; + } + + return _proto(self, ext.ip6e_nxt, (core_object_t*)ip6, pkt, len); + } + default: + break; + } + } + + self->produced = obj; + + return 0; +} + +static inline int _ieee802(filter_layer_t* self, uint16_t tpid, const core_object_t* obj, const unsigned char* pkt, size_t len) +{ + core_object_ieee802_t* ieee802 = &self->ieee802[self->n_ieee802]; + uint16_t tci; + + ieee802->obj_prev = obj; + + for (;;) { + ieee802->tpid = tpid; + need16(tci, pkt, len); + ieee802->pcp = (tci & 0xe000) >> 13; + ieee802->dei = (tci & 0x1000) >> 12; + ieee802->vid = tci & 0x0fff; + need16(ieee802->ether_type, pkt, len); + + switch (ieee802->ether_type) { + case 0x88a8: /* 802.1ad */ + case 0x9100: /* 802.1 QinQ non-standard */ + self->n_ieee802++; + if (self->n_ieee802 < N_IEEE802) { + obj = (const core_object_t*)ieee802; + ieee802 = &self->ieee802[self->n_ieee802]; + ieee802->obj_prev = obj; + tpid = ieee802->ether_type; + continue; + } + return 1; + + case ETHERTYPE_IP: + case ETHERTYPE_IPV6: + return _ip(self, (core_object_t*)ieee802, pkt, len); + + default: + break; + } + break; + } + + self->produced = obj; + + return 0; +} + +static inline int _link(filter_layer_t* self, const core_object_pcap_t* pcap) +{ + const unsigned char* pkt; + size_t len; + + self->n_ieee802 = 0; + + pkt = pcap->bytes; + len = pcap->caplen; + + switch (pcap->linktype) { + case DLT_NULL: { + core_object_null_t* null = &self->null; + null->obj_prev = (core_object_t*)pcap; + + if (pcap->is_swapped) { + needr32(null->family, pkt, len); + } else { + need32(null->family, pkt, len); + } + + switch (null->family) { + case 2: + case 24: + case 28: + case 30: + return _ip(self, (core_object_t*)null, pkt, len); + + default: + break; + } + break; + } + case DLT_EN10MB: { + core_object_ether_t* ether = &self->ether; + ether->obj_prev = (core_object_t*)pcap; + + needxb(ether->dhost, 6, pkt, len); + needxb(ether->shost, 6, pkt, len); + need16(ether->type, pkt, len); + + switch (ether->type) { + case 0x8100: /* 802.1q */ + case 0x88a8: /* 802.1ad */ + case 0x9100: /* 802.1 QinQ non-standard */ + return _ieee802(self, ether->type, (core_object_t*)ether, pkt, len); + + case ETHERTYPE_IP: + case ETHERTYPE_IPV6: + return _ip(self, (core_object_t*)ether, pkt, len); + + default: + break; + } + break; + } + case DLT_LOOP: { + core_object_loop_t* loop = &self->loop; + loop->obj_prev = (core_object_t*)pcap; + + need32(loop->family, pkt, len); + + switch (loop->family) { + case 2: + case 24: + case 28: + case 30: + return _ip(self, (core_object_t*)loop, pkt, len); + + default: + break; + } + break; + } + case DLT_RAW: +#ifdef DLT_IPV4 + case DLT_IPV4: +#endif +#ifdef DLT_IPV6 + case DLT_IPV6: +#endif + return _ip(self, (core_object_t*)pcap, pkt, len); + case DLT_LINUX_SLL: { + core_object_linuxsll_t* linuxsll = &self->linuxsll; + linuxsll->obj_prev = (core_object_t*)pcap; + + need16(linuxsll->packet_type, pkt, len); + need16(linuxsll->arp_hardware, pkt, len); + need16(linuxsll->link_layer_address_length, pkt, len); + needxb(linuxsll->link_layer_address, 8, pkt, len); + need16(linuxsll->ether_type, pkt, len); + + switch (linuxsll->ether_type) { + case 0x8100: /* 802.1q */ + case 0x88a8: /* 802.1ad */ + case 0x9100: /* 802.1 QinQ non-standard */ + return _ieee802(self, linuxsll->ether_type, (core_object_t*)linuxsll, pkt, len); + + case ETHERTYPE_IP: + case ETHERTYPE_IPV6: + return _ip(self, (core_object_t*)linuxsll, pkt, len); + + default: + break; + } + break; + } + /* TODO: These might be interesting to implement + case DLT_IPNET: + case DLT_PKTAP: + */ + default: + break; + } + + self->produced = (core_object_t*)pcap; + + return 0; +} + +static void _receive(filter_layer_t* self, const core_object_t* obj) +{ + mlassert_self(); + lassert(obj, "obj is nil"); + + if (!self->recv) { + lfatal("no receiver set"); + } + if (obj->obj_type != CORE_OBJECT_PCAP) { + lfatal("obj is not CORE_OBJECT_PCAP"); + } + + if (!_link(self, (core_object_pcap_t*)obj)) { + self->recv(self->ctx, self->produced); + } +} + +core_receiver_t filter_layer_receiver() +{ + return (core_receiver_t)_receive; +} + +static const core_object_t* _produce(filter_layer_t* self) +{ + const core_object_t* obj; + mlassert_self(); + + obj = self->prod(self->prod_ctx); + if (!obj || obj->obj_type != CORE_OBJECT_PCAP || _link(self, (core_object_pcap_t*)obj)) { + return 0; + } + + return self->produced; +} + +core_producer_t filter_layer_producer(filter_layer_t* self) +{ + mlassert_self(); + + if (!self->prod) { + lfatal("no producer set"); + } + + return (core_producer_t)_produce; +} diff --git a/src/filter/layer.h b/src/filter/layer.h new file mode 100644 index 0000000..da8671c --- /dev/null +++ b/src/filter/layer.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "core/log.h" +#include "core/receiver.h" +#include "core/producer.h" +#include "core/object/pcap.h" +#include "core/object/null.h" +#include "core/object/ether.h" +#include "core/object/loop.h" +#include "core/object/linuxsll.h" +#include "core/object/ieee802.h" +#include "core/object/ip.h" +#include "core/object/ip6.h" +#include "core/object/gre.h" +#include "core/object/icmp.h" +#include "core/object/icmp6.h" +#include "core/object/udp.h" +#include "core/object/tcp.h" +#include "core/object/payload.h" + +#ifndef __dnsjit_filter_layer_h +#define __dnsjit_filter_layer_h + +#include "filter/layer.hh" + +#endif diff --git a/src/filter/layer.hh b/src/filter/layer.hh new file mode 100644 index 0000000..f8f260b --- /dev/null +++ b/src/filter/layer.hh @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +//lua:require("dnsjit.core.log") +//lua:require("dnsjit.core.receiver_h") +//lua:require("dnsjit.core.producer_h") +//lua:require("dnsjit.core.object.pcap_h") +//lua:require("dnsjit.core.object.null_h") +//lua:require("dnsjit.core.object.ether_h") +//lua:require("dnsjit.core.object.loop_h") +//lua:require("dnsjit.core.object.linuxsll_h") +//lua:require("dnsjit.core.object.ieee802_h") +//lua:require("dnsjit.core.object.ip_h") +//lua:require("dnsjit.core.object.ip6_h") +//lua:require("dnsjit.core.object.gre_h") +//lua:require("dnsjit.core.object.icmp_h") +//lua:require("dnsjit.core.object.icmp6_h") +//lua:require("dnsjit.core.object.udp_h") +//lua:require("dnsjit.core.object.tcp_h") +//lua:require("dnsjit.core.object.payload_h") + +typedef struct filter_layer { + core_log_t _log; + core_receiver_t recv; + void* ctx; + + core_producer_t prod; + void* prod_ctx; + + const core_object_t* produced; + core_object_null_t null; + core_object_ether_t ether; + core_object_loop_t loop; + core_object_linuxsll_t linuxsll; + size_t n_ieee802; + core_object_ieee802_t ieee802[3]; // N_IEEE802 + core_object_ip_t ip; + core_object_ip6_t ip6; + core_object_gre_t gre; + core_object_icmp_t icmp; + core_object_icmp6_t icmp6; + core_object_udp_t udp; + core_object_tcp_t tcp; + core_object_payload_t payload; +} filter_layer_t; + +core_log_t* filter_layer_log(); + +void filter_layer_init(filter_layer_t* self); +void filter_layer_destroy(filter_layer_t* self); + +core_receiver_t filter_layer_receiver(); +core_producer_t filter_layer_producer(filter_layer_t* self); diff --git a/src/filter/layer.lua b/src/filter/layer.lua new file mode 100644 index 0000000..74c56ba --- /dev/null +++ b/src/filter/layer.lua @@ -0,0 +1,93 @@ +-- Copyright (c) 2018-2021, OARC, Inc. +-- All rights reserved. +-- +-- This file is part of dnsjit. +-- +-- dnsjit is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- dnsjit is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with dnsjit. If not, see . + +-- dnsjit.filter.layer +-- Parse the ether/IP stack +-- local filter = require("dnsjit.filter.layer").new() +-- +-- Parse the ether/IP stack of the received objects and send the top most +-- object to the receivers. +-- Objects are chained which each layer in the stack with the top most first. +-- Currently supports input +-- .IR dnsjit.core.object.pcap . +module(...,package.seeall) + +require("dnsjit.filter.layer_h") +local ffi = require("ffi") +local C = ffi.C + +local t_name = "filter_layer_t" +local filter_layer_t = ffi.typeof(t_name) +local Layer = {} + +-- Create a new Layer filter. +function Layer.new() + local self = { + _receiver = nil, + obj = filter_layer_t(), + } + C.filter_layer_init(self.obj) + ffi.gc(self.obj, C.filter_layer_destroy) + return setmetatable(self, { __index = Layer }) +end + +-- Return the Log object to control logging of this instance or module. +function Layer:log() + if self == nil then + return C.filter_layer_log() + end + return self.obj._log +end + +-- Return the C functions and context for receiving objects. +function Layer:receive() + return C.filter_layer_receiver(), self.obj +end + +-- Set the receiver to pass objects to. +function Layer:receiver(o) + self.obj.recv, self.obj.ctx = o:receive() + self._receiver = o +end + +-- Return the C functions and context for producing objects. +function Layer:produce() + return C.filter_layer_producer(self.obj), self.obj +end + +-- Set the producer to get objects from. +function Layer:producer(o) + self.obj.prod, self.obj.prod_ctx = o:produce() + self._producer = o +end + +-- dnsjit.core.object.pcap (3), +-- dnsjit.core.object.ether (3), +-- dnsjit.core.object.null (3), +-- dnsjit.core.object.loop (3), +-- dnsjit.core.object.linuxsll (3), +-- dnsjit.core.object.ieee802 (3), +-- dnsjit.core.object.gre (3), +-- dnsjit.core.object.ip (3), +-- dnsjit.core.object.ip6 (3), +-- dnsjit.core.object.icmp (3), +-- dnsjit.core.object.icmp6 (3), +-- dnsjit.core.object.udp (3), +-- dnsjit.core.object.tcp (3), +-- dnsjit.core.object.payload (3) +return Layer diff --git a/src/filter/split.c b/src/filter/split.c new file mode 100644 index 0000000..dccf38a --- /dev/null +++ b/src/filter/split.c @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "config.h" + +#include "filter/split.h" +#include "core/assert.h" + +static core_log_t _log = LOG_T_INIT("filter.split"); +static filter_split_t _defaults = { + LOG_T_INIT_OBJ("filter.split"), + FILTER_SPLIT_MODE_ROUNDROBIN, 0, 0, 0 +}; + +core_log_t* filter_split_log() +{ + return &_log; +} + +void filter_split_init(filter_split_t* self) +{ + mlassert_self(); + + *self = _defaults; +} + +void filter_split_destroy(filter_split_t* self) +{ + filter_split_recv_t* r; + mlassert_self(); + + if (self->recv_last) + self->recv_last->next = 0; + while ((r = self->recv_first)) { + self->recv_first = r->next; + free(r); + } +} + +void filter_split_add(filter_split_t* self, core_receiver_t recv, void* ctx) +{ + filter_split_recv_t* r; + mlassert_self(); + lassert(recv, "recv is nil"); + + lfatal_oom(r = malloc(sizeof(filter_split_recv_t))); + r->recv = recv; + r->ctx = ctx; + + if (self->recv_last) { + self->recv_last->next = r; + r->next = self->recv_first; + self->recv_first = r; + } else { + self->recv_first = self->recv = self->recv_last = r; + r->next = r; + } +} + +static void _roundrobin(filter_split_t* self, const core_object_t* obj) +{ + mlassert_self(); + + self->recv->recv(self->recv->ctx, obj); + self->recv = self->recv->next; +} + +static void _sendall(filter_split_t* self, const core_object_t* obj) +{ + filter_split_recv_t* r; + mlassert_self(); + + for (r = self->recv_first; r; r = r->next) { + r->recv(r->ctx, obj); + if (r == self->recv_last) + break; + } +} + +core_receiver_t filter_split_receiver(filter_split_t* self) +{ + mlassert_self(); + + if (!self->recv) { + lfatal("no receiver(s) set"); + } + + switch (self->mode) { + case FILTER_SPLIT_MODE_ROUNDROBIN: + return (core_receiver_t)_roundrobin; + case FILTER_SPLIT_MODE_SENDALL: + return (core_receiver_t)_sendall; + default: + lfatal("invalid split mode"); + } + return 0; +} diff --git a/src/filter/split.h b/src/filter/split.h new file mode 100644 index 0000000..921b649 --- /dev/null +++ b/src/filter/split.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "core/log.h" +#include "core/receiver.h" + +#ifndef __dnsjit_filter_split_h +#define __dnsjit_filter_split_h + +#include "filter/split.hh" + +#endif diff --git a/src/filter/split.hh b/src/filter/split.hh new file mode 100644 index 0000000..2aba2ae --- /dev/null +++ b/src/filter/split.hh @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +//lua:require("dnsjit.core.log") +//lua:require("dnsjit.core.receiver_h") + +typedef enum filter_split_mode { + FILTER_SPLIT_MODE_ROUNDROBIN, + FILTER_SPLIT_MODE_SENDALL +} filter_split_mode_t; + +typedef struct filter_split_recv filter_split_recv_t; +struct filter_split_recv { + filter_split_recv_t* next; + core_receiver_t recv; + void* ctx; +}; + +typedef struct filter_split { + core_log_t _log; + filter_split_mode_t mode; + filter_split_recv_t* recv_first; + filter_split_recv_t* recv; + filter_split_recv_t* recv_last; +} filter_split_t; + +core_log_t* filter_split_log(); + +void filter_split_init(filter_split_t* self); +void filter_split_destroy(filter_split_t* self); +void filter_split_add(filter_split_t* self, core_receiver_t recv, void* ctx); + +core_receiver_t filter_split_receiver(filter_split_t* self); diff --git a/src/filter/split.lua b/src/filter/split.lua new file mode 100644 index 0000000..d9aec4f --- /dev/null +++ b/src/filter/split.lua @@ -0,0 +1,80 @@ +-- Copyright (c) 2018-2021, OARC, Inc. +-- All rights reserved. +-- +-- This file is part of dnsjit. +-- +-- dnsjit is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- dnsjit is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with dnsjit. If not, see . + +-- dnsjit.filter.split +-- Passthrough to other receivers in various ways +-- local filter = require("dnsjit.filter.split").new() +-- filter.receiver(...) +-- filter.receiver(...) +-- filter.receiver(...) +-- input.receiver(filter) +-- +-- Filter to pass objects to others in various ways. +module(...,package.seeall) + +require("dnsjit.filter.split_h") +local ffi = require("ffi") +local C = ffi.C + +local t_name = "filter_split_t" +local filter_split_t = ffi.typeof(t_name) +local Split = {} + +-- Create a new Split filter. +function Split.new() + local self = { + receivers = {}, + obj = filter_split_t(), + } + C.filter_split_init(self.obj) + ffi.gc(self.obj, C.filter_split_destroy) + return setmetatable(self, { __index = Split }) +end + +-- Return the Log object to control logging of this instance or module. +function Split:log() + if self == nil then + return C.filter_split_log() + end + return self.obj._log +end + +-- Set the passthrough mode to round robin (default mode). +function Split:roundrobin() + self.obj.mode = "FILTER_SPLIT_MODE_ROUNDROBIN" +end + +-- Set the passthrough mode to send to all receivers. +function Split:sendall() + self.obj.mode = "FILTER_SPLIT_MODE_SENDALL" +end + +-- Return the C functions and context for receiving objects. +function Split:receive() + return C.filter_split_receiver(self.obj), self.obj +end + +-- Set the receiver to pass objects to, this can be called multiple times to +-- set addtional receivers. +function Split:receiver(o) + local recv, ctx = o:receive() + C.filter_split_add(self.obj, recv, ctx) + table.insert(self.receivers, o) +end + +return Split diff --git a/src/filter/timing.c b/src/filter/timing.c new file mode 100644 index 0000000..bf4f865 --- /dev/null +++ b/src/filter/timing.c @@ -0,0 +1,557 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "config.h" + +#include "filter/timing.h" +#include "core/assert.h" +#include "core/timespec.h" +#include "core/object/pcap.h" + +#include +#include + +#define N1e9 1000000000 + +typedef struct _filter_timing { + filter_timing_t pub; + + struct timespec diff; + core_timespec_t last_pkthdr_ts; + struct timespec last_ts; + struct timespec first_ts; + void (*timing_callback)(filter_timing_t*, const core_object_pcap_t*); + struct timespec mod_ts; + size_t counter; +} _filter_timing_t; + +static core_log_t _log = LOG_T_INIT("filter.timing"); +static filter_timing_t _defaults = { + LOG_T_INIT_OBJ("filter.timing"), + 0, 0, + TIMING_MODE_KEEP, 0, 0, 0, 0, 0.0, 0, + 0, 0 +}; + +#define _self ((_filter_timing_t*)self) + +core_log_t* filter_timing_log() +{ + return &_log; +} + +static void _keep(filter_timing_t* self, const core_object_pcap_t* pkt) +{ +#if HAVE_CLOCK_NANOSLEEP + struct timespec to = { + _self->diff.tv_sec + pkt->ts.sec, + _self->diff.tv_nsec + pkt->ts.nsec + }; + int ret = EINTR; + + if (to.tv_nsec >= N1e9) { + to.tv_sec += 1; + to.tv_nsec -= N1e9; + } else if (to.tv_nsec < 0) { + to.tv_sec -= 1; + to.tv_nsec += N1e9; + } + + while (ret) { + ldebug("keep mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec); + ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret); + } + } +#elif HAVE_NANOSLEEP + struct timespec diff = { + pkt->ts.sec - _self->last_pkthdr_ts.sec, + pkt->ts.nsec - _self->last_pkthdr_ts.nsec + }; + int ret = EINTR; + + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += 1; + diff.tv_nsec -= N1e9; + } else if (diff.tv_nsec < 0) { + diff.tv_sec -= 1; + diff.tv_nsec += N1e9; + } + + if (diff.tv_sec > -1 && diff.tv_nsec > -1) { + while (ret) { + ldebug("keep mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec); + if ((ret = nanosleep(&diff, &diff))) { + ret = errno; + if (ret != EINTR) { + lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret); + } + } + } + } + + _self->last_pkthdr_ts = pkt->ts; +#endif +} + +static void _increase(filter_timing_t* self, const core_object_pcap_t* pkt) +{ + struct timespec diff = { + pkt->ts.sec - _self->last_pkthdr_ts.sec, + pkt->ts.nsec - _self->last_pkthdr_ts.nsec + }; + int ret = EINTR; + + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += 1; + diff.tv_nsec -= N1e9; + } else if (diff.tv_nsec < 0) { + diff.tv_sec -= 1; + diff.tv_nsec += N1e9; + } + + diff.tv_sec += _self->mod_ts.tv_sec; + diff.tv_nsec += _self->mod_ts.tv_nsec; + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += 1; + diff.tv_nsec -= N1e9; + } + + if (diff.tv_sec > -1 && diff.tv_nsec > -1) { +#if HAVE_CLOCK_NANOSLEEP + struct timespec to = { + _self->last_ts.tv_sec + diff.tv_sec, + _self->last_ts.tv_nsec + diff.tv_nsec + }; + + if (to.tv_nsec >= N1e9) { + to.tv_sec += 1; + to.tv_nsec -= N1e9; + } else if (to.tv_nsec < 0) { + to.tv_sec -= 1; + to.tv_nsec += N1e9; + } + + while (ret) { + ldebug("increase mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec); + ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret); + } + } +#elif HAVE_NANOSLEEP + while (ret) { + ldebug("increase mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec); + if ((ret = nanosleep(&diff, &diff))) { + ret = errno; + if (ret != EINTR) { + lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret); + } + } + } +#endif + } + + _self->last_pkthdr_ts = pkt->ts; + +#if HAVE_CLOCK_NANOSLEEP + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } +#endif +} + +static void _reduce(filter_timing_t* self, const core_object_pcap_t* pkt) +{ + struct timespec diff = { + pkt->ts.sec - _self->last_pkthdr_ts.sec, + pkt->ts.nsec - _self->last_pkthdr_ts.nsec + }; + int ret = EINTR; + + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += 1; + diff.tv_nsec -= N1e9; + } else if (diff.tv_nsec < 0) { + diff.tv_sec -= 1; + diff.tv_nsec += N1e9; + } + + diff.tv_sec -= _self->mod_ts.tv_sec; + diff.tv_nsec -= _self->mod_ts.tv_nsec; + if (diff.tv_nsec < 0) { + diff.tv_sec -= 1; + diff.tv_nsec += N1e9; + } + + if (diff.tv_sec > -1 && diff.tv_nsec > -1) { +#if HAVE_CLOCK_NANOSLEEP + struct timespec to = { + _self->last_ts.tv_sec + diff.tv_sec, + _self->last_ts.tv_nsec + diff.tv_nsec + }; + + if (to.tv_nsec >= N1e9) { + to.tv_sec += 1; + to.tv_nsec -= N1e9; + } else if (to.tv_nsec < 0) { + to.tv_sec -= 1; + to.tv_nsec += N1e9; + } + + while (ret) { + ldebug("reduce mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec); + ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret); + } + } +#elif HAVE_NANOSLEEP + while (ret) { + ldebug("reduce mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec); + if ((ret = nanosleep(&diff, &diff))) { + ret = errno; + if (ret != EINTR) { + lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret); + } + } + } +#endif + } + + _self->last_pkthdr_ts = pkt->ts; + +#if HAVE_CLOCK_NANOSLEEP + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } +#endif +} + +static void _multiply(filter_timing_t* self, const core_object_pcap_t* pkt) +{ + struct timespec diff = { + pkt->ts.sec - _self->last_pkthdr_ts.sec, + pkt->ts.nsec - _self->last_pkthdr_ts.nsec + }; + int ret = EINTR; + + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += 1; + diff.tv_nsec -= N1e9; + } else if (diff.tv_nsec < 0) { + diff.tv_sec -= 1; + diff.tv_nsec += N1e9; + } + + diff.tv_sec = (time_t)((float)diff.tv_sec * self->mul); + diff.tv_nsec = (long)((float)diff.tv_nsec * self->mul); + if (diff.tv_nsec >= N1e9) { + diff.tv_sec += diff.tv_nsec / N1e9; + diff.tv_nsec %= N1e9; + } + + if (diff.tv_sec > -1 && diff.tv_nsec > -1) { +#if HAVE_CLOCK_NANOSLEEP + struct timespec to = { + _self->last_ts.tv_sec + diff.tv_sec, + _self->last_ts.tv_nsec + diff.tv_nsec + }; + + if (to.tv_nsec >= N1e9) { + to.tv_sec += 1; + to.tv_nsec -= N1e9; + } else if (to.tv_nsec < 0) { + to.tv_sec -= 1; + to.tv_nsec += N1e9; + } + + while (ret) { + ldebug("multiply mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec); + ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret); + } + } +#elif HAVE_NANOSLEEP + while (ret) { + ldebug("multiply mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec); + if ((ret = nanosleep(&diff, &diff))) { + ret = errno; + if (ret != EINTR) { + lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret); + } + } + } +#endif + } + + _self->last_pkthdr_ts = pkt->ts; + +#if HAVE_CLOCK_NANOSLEEP + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } +#endif +} + +static void _fixed(filter_timing_t* self, const core_object_pcap_t* pkt) +{ + struct timespec diff = { + _self->mod_ts.tv_sec, + _self->mod_ts.tv_nsec + }; + int ret = EINTR; + + if (diff.tv_sec > -1 && diff.tv_nsec > -1) { +#if HAVE_CLOCK_NANOSLEEP + struct timespec to = { + _self->last_ts.tv_sec + diff.tv_sec, + _self->last_ts.tv_nsec + diff.tv_nsec + }; + + if (to.tv_nsec >= N1e9) { + to.tv_sec += 1; + to.tv_nsec -= N1e9; + } else if (to.tv_nsec < 0) { + to.tv_sec -= 1; + to.tv_nsec += N1e9; + } + + while (ret) { + ldebug("fixed mode, sleep to %ld.%09ld", to.tv_sec, to.tv_nsec); + ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &to, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", to.tv_sec, to.tv_nsec, ret); + } + } +#elif HAVE_NANOSLEEP + while (ret) { + ldebug("fixed mode, sleep for %ld.%09ld", diff.tv_sec, diff.tv_nsec); + if ((ret = nanosleep(&diff, &diff))) { + ret = errno; + if (ret != EINTR) { + lfatal("nanosleep(%ld.%09ld) %d", diff.tv_sec, diff.tv_nsec, ret); + } + } + } +#endif + } + + _self->last_pkthdr_ts = pkt->ts; + +#if HAVE_CLOCK_NANOSLEEP + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } +#endif +} + +#if HAVE_CLOCK_NANOSLEEP +static inline void _timespec_diff(struct timespec* start, struct timespec* stop, + struct timespec* result) +{ + if ((stop->tv_nsec - start->tv_nsec) < 0) { + mlassert(stop->tv_sec > start->tv_sec, "stop time must be after start time"); + result->tv_sec = stop->tv_sec - start->tv_sec - 1; + result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000UL; + } else { + mlassert(stop->tv_sec >= start->tv_sec, "stop time must be after start time"); + result->tv_sec = stop->tv_sec - start->tv_sec; + result->tv_nsec = stop->tv_nsec - start->tv_nsec; + } +} + +static void _realtime(filter_timing_t* self, const core_object_pcap_t* pkt) +{ + _self->counter++; + if (_self->counter >= self->rt_batch) { + struct timespec simulated; + + _self->counter = 0; + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } + + // calculate simulated time from packet offsets + simulated.tv_sec = pkt->ts.sec; + simulated.tv_nsec = pkt->ts.nsec; + _timespec_diff(&_self->mod_ts, &simulated, &simulated); + + // calculate real elapsed time from monotonic clock + _timespec_diff(&_self->first_ts, &_self->last_ts, &_self->diff); + + linfo("simulated time: %ld.%09lds; real time: %ld.%09lds", + simulated.tv_sec, simulated.tv_nsec, _self->diff.tv_sec, _self->diff.tv_nsec); + + if (simulated.tv_sec > _self->diff.tv_sec + || (simulated.tv_sec == _self->diff.tv_sec && simulated.tv_nsec > _self->diff.tv_nsec)) { + int ret = EINTR; + _timespec_diff(&_self->diff, &simulated, &simulated); + + ldebug("sleeping for %ld.%09lds", simulated.tv_sec, simulated.tv_nsec); + while (ret) { + ret = clock_nanosleep(CLOCK_MONOTONIC, 0, &simulated, 0); + if (ret && ret != EINTR) { + lfatal("clock_nanosleep(%ld.%09ld) %d", simulated.tv_sec, simulated.tv_nsec, ret); + } + } + } else { + // check that real time didn't drift ahead more than specified drift limit + _timespec_diff(&simulated, &_self->diff, &_self->diff); + if (_self->diff.tv_sec > (self->rt_drift / N1e9) + || (_self->diff.tv_sec == (self->rt_drift / N1e9) && _self->diff.tv_nsec >= (self->rt_drift % N1e9))) { + lfatal("aborting, real time drifted ahead of simulated time (%ld.%09lds) by %ld.%09lds", + simulated.tv_sec, simulated.tv_nsec, _self->diff.tv_sec, _self->diff.tv_nsec); + } + } + } +} +#endif + +static void _init(filter_timing_t* self, const core_object_pcap_t* pkt) +{ +#if HAVE_CLOCK_NANOSLEEP + if (clock_gettime(CLOCK_MONOTONIC, &_self->last_ts)) { + lfatal("clock_gettime()"); + } + _self->first_ts = _self->last_ts; + _self->diff = _self->last_ts; + _self->diff.tv_sec -= pkt->ts.sec; + _self->diff.tv_nsec -= pkt->ts.nsec; + ldebug("init with clock_nanosleep() now is %ld.%09ld, diff of first pkt %ld.%09ld", + _self->last_ts.tv_sec, _self->last_ts.tv_nsec, + _self->diff.tv_sec, _self->diff.tv_nsec); +#elif HAVE_NANOSLEEP + ldebug("init with nanosleep()"); +#else +#error "No clock_nanosleep() or nanosleep(), can not continue" +#endif + + _self->last_pkthdr_ts = pkt->ts; + + switch (self->mode) { + case TIMING_MODE_KEEP: + ldebug("init mode keep"); + _self->timing_callback = _keep; + break; + case TIMING_MODE_INCREASE: + _self->timing_callback = _increase; + _self->mod_ts.tv_sec = self->inc / N1e9; + _self->mod_ts.tv_nsec = self->inc % N1e9; + ldebug("init mode increase by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec); + break; + case TIMING_MODE_REDUCE: + _self->timing_callback = _reduce; + _self->mod_ts.tv_sec = self->red / N1e9; + _self->mod_ts.tv_nsec = self->red % N1e9; + ldebug("init mode reduce by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec); + break; + case TIMING_MODE_MULTIPLY: + _self->timing_callback = _multiply; + ldebug("init mode multiply by %f", self->mul); + break; + case TIMING_MODE_FIXED: + _self->timing_callback = _fixed; + _self->mod_ts.tv_sec = self->fixed / N1e9; + _self->mod_ts.tv_nsec = self->fixed % N1e9; + ldebug("init mode fixed by %ld.%09ld", _self->mod_ts.tv_sec, _self->mod_ts.tv_nsec); + break; + case TIMING_MODE_REALTIME: +#if HAVE_CLOCK_NANOSLEEP + ldebug("init mode realtime"); + _self->timing_callback = _realtime; + _self->counter = 0; + _self->mod_ts.tv_sec = pkt->ts.sec; + _self->mod_ts.tv_nsec = pkt->ts.nsec; +#else + lfatal("realtime mode requires clock_nanosleep()"); +#endif + break; + default: + lfatal("invalid timing mode %d", self->mode); + } +} + +filter_timing_t* filter_timing_new() +{ + filter_timing_t* self; + mlfatal_oom(self = malloc(sizeof(_filter_timing_t))); + *self = _defaults; + _self->timing_callback = _init; + + return self; +} + +void filter_timing_free(filter_timing_t* self) +{ + mlassert_self(); + free(self); +} + +static void _receive(filter_timing_t* self, const core_object_t* obj) +{ + mlassert_self(); + lassert(obj, "obj is nil"); + + if (obj->obj_type != CORE_OBJECT_PCAP) { + lfatal("obj is not CORE_OBJECT_PCAP"); + } + + _self->timing_callback(self, (core_object_pcap_t*)obj); + self->recv(self->ctx, obj); +} + +core_receiver_t filter_timing_receiver(filter_timing_t* self) +{ + mlassert_self(); + + if (!self->recv) { + lfatal("no receiver set"); + } + + return (core_receiver_t)_receive; +} + +static const core_object_t* _produce(filter_timing_t* self) +{ + const core_object_t* obj; + mlassert_self(); + + obj = self->prod(self->prod_ctx); + if (!obj || obj->obj_type != CORE_OBJECT_PCAP) { + return 0; + } + + _self->timing_callback(self, (core_object_pcap_t*)obj); + return obj; +} + +core_producer_t filter_timing_producer(filter_timing_t* self) +{ + mlassert_self(); + + if (!self->prod) { + lfatal("no producer set"); + } + + return (core_producer_t)_produce; +} diff --git a/src/filter/timing.h b/src/filter/timing.h new file mode 100644 index 0000000..c00c43d --- /dev/null +++ b/src/filter/timing.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +#include "core/log.h" +#include "core/receiver.h" +#include "core/producer.h" + +#ifndef __dnsjit_filter_timing_h +#define __dnsjit_filter_timing_h + +#include "filter/timing.hh" + +#endif diff --git a/src/filter/timing.hh b/src/filter/timing.hh new file mode 100644 index 0000000..8615907 --- /dev/null +++ b/src/filter/timing.hh @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2018-2021, OARC, Inc. + * All rights reserved. + * + * This file is part of dnsjit. + * + * dnsjit is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * dnsjit is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with dnsjit. If not, see . + */ + +//lua:require("dnsjit.core.log") +//lua:require("dnsjit.core.receiver_h") +//lua:require("dnsjit.core.producer_h") +//lua:require("dnsjit.core.timespec_h") + +typedef struct filter_timing { + core_log_t _log; + core_receiver_t recv; + void* ctx; + enum { + TIMING_MODE_KEEP = 0, + TIMING_MODE_INCREASE = 1, + TIMING_MODE_REDUCE = 2, + TIMING_MODE_MULTIPLY = 3, + TIMING_MODE_FIXED = 4, + TIMING_MODE_REALTIME = 5 + } mode; + size_t inc, red, fixed, rt_batch; + float mul; + uint64_t rt_drift; + + core_producer_t prod; + void* prod_ctx; +} filter_timing_t; + +core_log_t* filter_timing_log(); + +filter_timing_t* filter_timing_new(); +void filter_timing_free(filter_timing_t* self); + +core_receiver_t filter_timing_receiver(filter_timing_t* self); +core_producer_t filter_timing_producer(filter_timing_t* self); diff --git a/src/filter/timing.lua b/src/filter/timing.lua new file mode 100644 index 0000000..cab9574 --- /dev/null +++ b/src/filter/timing.lua @@ -0,0 +1,123 @@ +-- Copyright (c) 2018-2021, OARC, Inc. +-- All rights reserved. +-- +-- This file is part of dnsjit. +-- +-- dnsjit is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- dnsjit is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with dnsjit. If not, see . + +-- dnsjit.filter.timing +-- Filter to pass objects to the next receiver based on timing between packets +-- local filter = require("dnsjit.filter.timing").new() +-- ... +-- filter:receiver(...) +-- +-- Filter to manipulate processing so it simulates the actual timing when +-- packets arrived or to delay processing. +module(...,package.seeall) + +require("dnsjit.filter.timing_h") +local ffi = require("ffi") +local C = ffi.C + +local Timing = {} + +-- Create a new Timing filter. +function Timing.new() + local self = { + _receiver = nil, + obj = C.filter_timing_new(), + } + ffi.gc(self.obj, C.filter_timing_free) + return setmetatable(self, { __index = Timing }) +end + +-- Return the Log object to control logging of this instance or module. +function Timing:log() + if self == nil then + return C.filter_timing_log() + end + return self.obj._log +end + +-- Set the timing mode to keep the timing between packets. +function Timing:keep() + self.obj.mode = "TIMING_MODE_KEEP" +end + +-- Set the timing mode to increase the timing between packets by the given +-- number of nanoseconds. +function Timing:increase(ns) + self.obj.mode = "TIMING_MODE_INCREASE" + self.obj.inc = ns +end + +-- Set the timing mode to reduce the timing between packets by the given +-- number of nanoseconds. +function Timing:reduce(ns) + self.obj.mode = "TIMING_MODE_REDUCE" + self.obj.red = ns +end + +-- Set the timing mode to multiply the timing between packets by the given +-- factor (float/double). +function Timing:multiply(factor) + self.obj.mode = "TIMING_MODE_MULTIPLY" + self.obj.mul = factor +end + +-- Set the timing mode to a fixed number of nanoseconds between packets. +function Timing:fixed(ns) + self.obj.mode = "TIMING_MODE_FIXED" + self.obj.fixed = ns +end + +-- Set the timing mode to simulate the timing of packets in realtime. +-- Packets are processed in batches of given size (default 128) before +-- adjusting time. Aborts if real time drifts ahead more than given +-- number of seconds (default 1.0s). +function Timing:realtime(drift, batch_size) + self.obj.mode = "TIMING_MODE_REALTIME" + if drift == nil then + drift = 1 + end + if batch_size == nil then + batch_size = 128 + end + self.obj.rt_batch = batch_size + self.obj.rt_drift = math.floor(drift * 1000000000) +end + +-- Return the C functions and context for receiving objects. +function Timing:receive() + return C.filter_timing_receiver(), self.obj +end + +-- Set the receiver to pass objects to. +function Timing:receiver(o) + self.obj.recv, self.obj.ctx = o:receive() + self._receiver = o +end + +-- Return the C functions and context for producing objects. +function Timing:produce() + return C.filter_timing_producer(self.obj), self.obj +end + +-- Set the producer to get objects from. +function Timing:producer(o) + self.obj.prod, self.obj.prod_ctx = o:produce() + self._producer = o +end + +return Timing -- cgit v1.2.3