1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
# Copyright (C) 2017 Open Information Security Foundation
# Copyright (c) 2011-2013 Jason Ish
#
# You can copy, redistribute or modify this Program under the terms of
# the GNU General Public License version 2 as published by the Free
# Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 2 along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
from __future__ import print_function
import sys
import unittest
import io
import tempfile
import suricata.update.rule
import suricata.update.main
class RuleTestCase(unittest.TestCase):
def test_parse1(self):
# Some mods have been made to this rule (flowbits) for the
# purpose of testing.
rule = suricata.update.rule.parse("""alert tcp $HOME_NET any -> $EXTERNAL_NET $HTTP_PORTS (msg:"ET CURRENT_EVENTS Request to .in FakeAV Campaign June 19 2012 exe or zip"; flow:established,to_server; content:"setup."; fast_pattern:only; http_uri; content:".in|0d 0a|"; flowbits:isset,somebit; flowbits:unset,otherbit; http_header; pcre:"/\/[a-f0-9]{16}\/([a-z0-9]{1,3}\/)?setup\.(exe|zip)$/U"; pcre:"/^Host\x3a\s.+\.in\r?$/Hmi"; metadata:stage,hostile_download; reference:url,isc.sans.edu/diary/+Vulnerabilityqueerprocessbrittleness/13501; classtype:trojan-activity; sid:2014929; rev:1;)""")
self.assertEqual(rule.enabled, True)
self.assertEqual(rule.action, "alert")
self.assertEqual(rule.direction, "->")
self.assertEqual(rule.sid, 2014929)
self.assertEqual(rule.rev, 1)
self.assertEqual(rule.msg, "ET CURRENT_EVENTS Request to .in FakeAV Campaign June 19 2012 exe or zip")
self.assertEqual(len(rule.metadata), 2)
self.assertEqual(rule.metadata[0], "stage")
self.assertEqual(rule.metadata[1], "hostile_download")
self.assertEqual(len(rule.flowbits), 2)
self.assertEqual(rule.flowbits[0], "isset,somebit")
self.assertEqual(rule.flowbits[1], "unset,otherbit")
self.assertEqual(rule.classtype, "trojan-activity")
def test_disable_rule(self):
rule_buf = """# alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)"""
rule = suricata.update.rule.parse(rule_buf)
self.assertFalse(rule.enabled)
self.assertEqual(rule.raw, """alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)""")
self.assertEqual(str(rule), rule_buf)
def test_parse_rule_double_commented(self):
rule_buf = """## alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)"""
rule = suricata.update.rule.parse(rule_buf)
self.assertFalse(rule.enabled)
self.assertEqual(rule.raw, """alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)""")
def test_parse_rule_comments_and_spaces(self):
rule_buf = """## #alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)"""
rule = suricata.update.rule.parse(rule_buf)
self.assertFalse(rule.enabled)
self.assertEqual(rule.raw, """alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)""")
def test_toggle_rule(self):
rule_buf = """# alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)"""
rule = suricata.update.rule.parse(rule_buf)
self.assertFalse(rule.enabled)
rule.enabled = True
self.assertEqual(str(rule), """alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message"; sid:1;)""")
def test_parse_fileobj(self):
rule_buf = ("""alert ( msg:"DECODE_NOT_IPV4_DGRAM" sid:3; gid:116; rev:1; metadata:rule-type decode; classtype:protocol-command-decode;) \n"""
"""# alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message";) \n"""
"""alert ( msg:"DECODE_NOT_IPV4_DGRAM"; sid:2; gid:116; rev:1; metadata:rule-type decode; classtype:protocol-command-decode;)""")
fileobj = io.StringIO()
for i in range(2):
fileobj.write(u"%s\n" % rule_buf)
fileobj.seek(0)
rules = suricata.update.rule.parse_fileobj(fileobj)
self.assertEqual(2, len(rules))
def test_parse_file(self):
rule_buf = ("""# alert tcp $HOME_NET any -> $EXTERNAL_NET any (msg:"some message";) \n"""
"""alert ( msg:"DECODE_NOT_IPV4_DGRAM"; sid:1; gid:116; rev:1; metadata:rule-type decode; classtype:protocol-command-decode;) \n"""
"""alert ( msg:"DECODE_NOT_IPV4_DGRAM" sid:1; gid:116; rev:1; metadata:rule-type decode; classtype:protocol-command-decode;) \n""")
tmp = tempfile.NamedTemporaryFile()
for i in range(2):
tmp.write(("%s\n" % rule_buf).encode())
tmp.flush()
rules = suricata.update.rule.parse_file(tmp.name)
self.assertEqual(2, len(rules))
def test_parse_file_with_unicode(self):
rules = suricata.update.rule.parse_file("./tests/rule-with-unicode.rules")
def test_parse_decoder_rule(self):
rule_string = """alert ( msg:"DECODE_NOT_IPV4_DGRAM"; sid:1; gid:116; rev:1; metadata:rule-type decode; classtype:protocol-command-decode;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertEqual(rule["direction"], None)
def test_multiline_rule(self):
rule_string = u"""
alert dnp3 any any -> any any (msg:"SURICATA DNP3 Request flood detected"; \
app-layer-event:dnp3.flooded; sid:2200104; rev:1;)
"""
rules = suricata.update.rule.parse_fileobj(io.StringIO(rule_string))
self.assertEqual(len(rules), 1)
def test_parse_nomsg(self):
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertEqual("", rule["msg"])
def test_noalert(self):
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertFalse(rule["noalert"])
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:noalert; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertTrue(rule["noalert"])
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; noalert; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertTrue(rule["noalert"])
def test_set_noalert(self):
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertFalse(rule["noalert"])
self.assertTrue(rule.enabled)
rule["noalert"] = True
self.assertEqual(str(rule), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; noalert; sid:10000000; rev:1;)""")
self.assertTrue(rule["noalert"])
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:noalert; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertTrue(rule["noalert"])
self.assertTrue(rule.enabled)
self.assertEqual(str(rule), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:noalert; sid:10000000; rev:1;)""")
def test_resolve_flowbits(self):
rule_string_1 = u"""#alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:set,bit1; flowbits:noalert; sid:10000001; rev:1;)"""
rule_string_2 = u"""#alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit1; flowbits:set,bit2; flowbits:noalert; sid:10000002; rev:1;)"""
rule_string_3 = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit2; sid:10000003; rev:1;)"""
rule1 = suricata.update.rule.parse(rule_string_1)
rule2 = suricata.update.rule.parse(rule_string_2)
rule3 = suricata.update.rule.parse(rule_string_3)
rulemap = {}
rulemap[rule1.id] = rule1
rulemap[rule2.id] = rule2
rulemap[rule3.id] = rule3
disabled_rules = [rule1, rule2]
suricata.update.main.resolve_flowbits(rulemap, disabled_rules)
self.assertEqual(str(rule1), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:set,bit1; flowbits:noalert; sid:10000001; rev:1;)""")
self.assertEqual(str(rule2), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit1; flowbits:set,bit2; flowbits:noalert; sid:10000002; rev:1;)""")
self.assertEqual(str(rule3), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit2; sid:10000003; rev:1;)""")
rule_string_1 = u"""#alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:set,bit1; sid:10000001; rev:1;)"""
rule_string_2 = u"""#alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit1; flowbits:set,bit2; sid:10000002; rev:1;)"""
rule_string_3 = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit2; sid:10000003; rev:1;)"""
rule1 = suricata.update.rule.parse(rule_string_1)
rule2 = suricata.update.rule.parse(rule_string_2)
rule3 = suricata.update.rule.parse(rule_string_3)
rulemap = {}
rulemap[rule1.id] = rule1
rulemap[rule2.id] = rule2
rulemap[rule3.id] = rule3
disabled_rules = [rule1, rule2]
suricata.update.main.resolve_flowbits(rulemap, disabled_rules)
self.assertEqual(str(rule1), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:set,bit1; noalert; sid:10000001; rev:1;)""")
self.assertEqual(str(rule2), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit1; flowbits:set,bit2; noalert; sid:10000002; rev:1;)""")
self.assertEqual(str(rule3), """alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; flowbits:isset,bit2; sid:10000003; rev:1;)""")
def test_parse_message_with_semicolon(self):
rule_string = u"""alert ip any any -> any any (msg:"TEST RULE\; and some"; content:"uid=0|28|root|29|"; tag:session,5,packets; classtype:bad-unknown; sid:10000000; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertIsNotNone(rule)
self.assertEqual(rule.msg, "TEST RULE\; and some")
# Look for the expected content.
self.assertEqual("TEST RULE\; and some", rule["msg"])
def test_parse_message_with_colon(self):
rule_string = u"""alert tcp 93.174.88.0/21 any -> $HOME_NET any (msg:"SN: Inbound TCP traffic from suspect network (AS29073 - NL)"; flags:S; reference:url,https://suspect-networks.io/networks/cidr/13/; threshold: type limit, track by_dst, seconds 30, count 1; classtype:misc-attack; sid:71918985; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertIsNotNone(rule)
self.assertEqual(
rule.msg,
"SN: Inbound TCP traffic from suspect network (AS29073 - NL)")
def test_parse_multiple_metadata(self):
# metadata: former_category TROJAN;
# metadata:affected_product Windows_XP_Vista_7_8_10_Server_32_64_Bit, attack_target Client_Endpoint, deployment Perimeter, tag Ransomware_Onion_Domain, tag Ransomware, signature_severity Major, created_at 2017_08_08, malware_family Crypton, malware_family Nemesis, performance_impact Low, updated_at 2017_08_08;
rule_string = u"""alert udp $HOME_NET any -> any 53 (msg:"ET TROJAN CryptON/Nemesis/X3M Ransomware Onion Domain"; content:"|01 00 00 01 00 00 00 00 00 00|"; depth:10; offset:2; content:"|10|yvvu3fqglfceuzfu"; fast_pattern; distance:0; nocase; metadata: former_category TROJAN; reference:url,blog.emsisoft.com/2017/05/01/remove-cry128-ransomware-with-emsisofts-free-decrypter/; reference:url,www.cyber.nj.gov/threat-profiles/ransomware-variants/crypt-on; classtype:trojan-activity; sid:2024525; rev:2; metadata:affected_product Windows_XP_Vista_7_8_10_Server_32_64_Bit, attack_target Client_Endpoint, deployment Perimeter, tag Ransomware_Onion_Domain, tag Ransomware, signature_severity Major, created_at 2017_08_08, malware_family Crypton, malware_family Nemesis, performance_impact Low, updated_at 2017_08_08;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertIsNotNone(rule)
self.assertTrue("former_category TROJAN" in rule.metadata)
self.assertTrue("updated_at 2017_08_08" in rule.metadata)
def test_parse_option_missing_end(self):
"""Test parsing a rule where the last option is missing a
semicolon. This was responsible for an infinite loop. """
rule_buf = u"""alert icmp any any -> $HOME_NET any (msg:"ICMP test detected"; gid:0; sid:10000001; rev:1; classtype: icmp-event; metadata:policy balanced-ips drop, policy connectivity-ips drop, policy security-ips drop)"""
self.assertRaises(
suricata.update.rule.NoEndOfOptionError,
suricata.update.rule.parse, rule_buf)
def test_parse_addr_list(self):
"""Test parsing rules where the addresses and parts are lists with
spaces."""
rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] any -> any any (msg:"TEST"; sid:1; rev:1;)""")
self.assertIsNotNone(rule)
self.assertEqual(rule["source_addr"], "[$HOME_NET, $OTHER_NET]")
rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1, 2, 3] -> any any (msg:"TEST"; sid:1; rev:1;)""")
self.assertIsNotNone(rule)
self.assertEqual(rule["source_port"], "[1, 2, 3]")
rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] any (msg:"TEST"; sid:1; rev:1;)""")
self.assertIsNotNone(rule)
self.assertEqual(rule["dest_addr"], "[!$XNET, $YNET]")
rule = suricata.update.rule.parse("""alert any [$HOME_NET, $OTHER_NET] [1,2,3] -> [!$XNET, $YNET] [!2200, 5500] (msg:"TEST"; sid:1; rev:1;)""")
self.assertIsNotNone(rule)
self.assertEqual(rule["dest_port"], "[!2200, 5500]")
def test_parse_no_rev(self):
"""Test that a rule with no revision gets assigned the default
revision of 0."""
rule_string = u"""alert ip any any -> any any (content:"uid=0|28|root|29|"; classtype:bad-unknown; sid:10000000;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertEqual(0, rule["rev"])
def test_parse_no_sid(self):
"""Test parsing a rule where the sid is not parsed correctly. """
rule_buf = u"""alert icmp any any -> $HOME_NET any (msg:"ICMP test detected"; gid:0; rev:1; classtype: icmp-event;)"""
self.assertRaises(
suricata.update.rule.BadSidError,
suricata.update.rule.parse, rule_buf)
def test_parse_feature_ja3(self):
"""Test parsing rules that should set the ja3 feature."""
rule_string = u"""alert tls any any -> any any (msg:"REQUIRES JA3"; ja3_hash; content:"61d50e7771aee7f2f4b89a7200b4d45"; sid:1; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertIsNotNone(rule)
self.assertTrue("ja3" in rule["features"])
rule_string = u"""alert tls any any -> any any (msg:"REQUIRES JA3"; ja3.hash; content:"61d50e7771aee7f2f4b89a7200b4d45"; sid:1; rev:1;)"""
rule = suricata.update.rule.parse(rule_string)
self.assertIsNotNone(rule)
self.assertTrue("ja3" in rule["features"])
|