1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
# logstash configuration to process RADIUS detail files
#
# Matthew Newton
# April 2019
#
# This config has been tested with logstash version 6.7.0
#
# RADIUS "detail" files are textual representations of the RADIUS
# packets, and are written to disk by e.g. FreeRADIUS. They look
# something like the following, with the timestamp on the first
# line then all attributes/values tab-indented.
#
# Tue Mar 10 15:32:24 2015
# Packet-Type = Access-Request
# User-Name = "test@example.com"
# Calling-Station-Id = "01-02-03-04-05-06"
# Called-Station-Id = "aa-bb-cc-dd-ee-ff:myssid"
# NAS-Port = 10
# NAS-IP-Address = 10.9.0.4
# NAS-Identifier = "Wireless-Controller-1"
# Service-Type = Framed-User
# NAS-Port-Type = Wireless-802.11
#
# Example input - read data from a file. For example, to read in a
# detail file with this input you could use:
#
# # /usr/share/logstash/bin/logstash --path.settings=/etc/logstash -f logstash-radius.conf --log.level=debug
#
input {
file {
path => "/var/log/radius/radacct/*/detail-*"
exclude => "*.gz"
# Note when testing that logstash will remember where
# it got to and continue from there.
start_position => "beginning"
# Set the type, for below.
type => radiusdetail
# It is preferable to use a log feeder that can join
# multiple lines together, rather than using multiline
# here. For an example, see the log-courier
# configuration in this directory.
# If you didn't read the above, go back and read it again.
# If that is not possible you may be able to use the
# following section. Note that if you are using the
# "stdin" input, the file is chunked into 16k blobs,
# so every 16k a detail record is likely to be chopped
# in half. If you are using the "file" input (as in this
# example), the blank links between records are not
# passed through so the regex here has to be aware of
# that. Basically, do multiline as early as possible
# in your log feeder client not here and you'll avoid
# most issues that are likely to come up.
codec => multiline {
pattern => "^\t"
negate => false
what => "previous"
}
# If you really want to use the "stdin" input, this
# will work better, but be aware of the comments
# above.
#codec => multiline {
# pattern => "^[A-Z\t]"
# negate => false
# what => "next"
#}
}
}
# Moving into production will likely need something more reliable.
# There are many input methods, an example here using log-courier
# (which supports client-site multiline processing and does not
# lose log events if logstash is restarted). You could also
# investigate e.g. filebeat from Elastic.
# input {
# courier {
# port => 5140
# transport => "tcp"
#
# # Don't set the type here, as it's set in the
# # log-courier config instead.
# #type => radiusdetail
# }
# }
# Filter stage. Here we take the raw logs and process them into
# something structured ready to index. Each attribute is stored as
# a separate field in the output document.
filter {
if [type] == "radiusdetail" {
# Pull off the timestamp at the start of the
# detail record. Note there may be additional data
# after it that has been added by the local admin,
# so stop at a newline OR a tab.
grok {
match => [ "message", "^(?<timestamp>[^\n\t]+)[\n\t]" ]
}
# Create the @timestamp field.
date {
match => [ "timestamp", "EEE MMM dd HH:mm:ss yyyy",
"EEE MMM d HH:mm:ss yyyy" ]
}
# Split the attributes and values into fields.
# This is the bulk of processing that adds all of
# the RADIUS attributes as elasticsearch fields.
# Note issue https://github.com/logstash-plugins/logstash-filter-kv/issues/10
# currently means that all spaces will be stripped
# from all fields. If this is a problem, adjust the
# trim setting.
kv {
field_split => "\n"
source => "message"
trim_value => "\" "
trim_key => "\t "
}
# Now we try and add some useful additional
# information. If certain fields can be broken
# down into components then do that here and add
# the data as sub-fields. For example,
# Called-Station-Id might be able to be broken
# down to Called-Station-Id_mac and Called-Station-Id_ssid
# on some wireless systems, or to _ip and _port
# with a VPN.
# Multiple calls to grok otherwise it can stop
# processing once it has matched one field, but
# e.g. you want to pull both IP and port out of
# the same field in two different regex's.
# Pull out some IP addresses as field_ip:
grok {
break_on_match => false
tag_on_failure => []
match => [
"Framed-IP-Address", "^(?<Framed-IP-Address_ip>\d+\.\d+\.\d+\.\d+$)",
"NAS-IP-Address", "^(?<NAS-IP-Address_ip>\d+\.\d+\.\d+\.\d+$)",
"Calling-Station-Id", "^(?<Calling-Station-Id_ip>\d+\.\d+\.\d+\.\d+)",
"Called-Station-Id", "^(?<Called-Station-Id_ip>\d+\.\d+\.\d+\.\d+)"
]
}
# Split User-Name, Operator-Name, and pull out
# some IP ports if they are there:
grok {
break_on_match => false
tag_on_failure => []
match => [
"User-Name", "^(?<User-Name_username>[^@]+)?(?:@(?<User-Name_realm>[^@]+))$",
"Operator-Name", "^(?<Operator-Name_id>.)(?<Operator-Name_value>.+)$",
"Calling-Station-Id", "\[(?<Calling-Station-Id_port>\d+)\]$",
"Called-Station-Id", "\[(?<Called-Station-Id_port>\d+)\]$"
]
}
# Extract MAC addresses (and SSIDs if there).
# MAC address matching here is lazy, but should be
# good enough.
grok {
break_on_match => false
tag_on_failure => []
match => [
"Calling-Station-Id", "^(?<Calling-Station-Id_mac>[a-fA-F0-9:-]{17})$",
"Calling-Station-Id", "^(?<Calling-Station-Id_mac>[a-fA-F0-9\.]{14})$",
"Calling-Station-Id", "^(?<Calling-Station-Id_mac>[a-fA-F0-9]{12})$",
"Called-Station-Id", "^(?<Called-Station-Id_mac>[a-fA-F0-9:-]{17})(?::(?<Called-Station-Id_ssid>.*))?$",
"Called-Station-Id", "^(?<Called-Station-Id_mac>[a-fA-F0-9\.]{14})(?::(?<Called-Station-Id_ssid>.*))?$",
"Called-Station-Id", "^(?<Called-Station-Id_mac>[a-fA-F0-9]{12})(?::(?<Called-Station-Id_ssid>.*))?$"
]
}
# With the optional sanitize_mac plugin, it's
# possible to make sure all MAC addresses look the
# same, which has obvious benefits.
#
# https://github.com/mcnewton/logstash-filter-sanitize_mac
# sanitize_mac {
# match => {
# "Called-Station-Id_mac" => "Called-Station-Id_mac"
# "Calling-Station-Id_mac" => "Calling-Station-Id_mac"
# }
# separator => "-"
# fixcase => "lower"
# }
# Gigawords presents an issue because the 64-bit
# value is split across two attributes. Combine
# them both back into a single attribute so that
# the full value is available to use.
if ([Acct-Input-Octets]) {
ruby {
code => "event.set('Acct-Input-Octets_long', event.get('Acct-Input-Octets').to_i +
(event.get('Acct-Input-Gigawords') ? (event.get('Acct-Input-Gigawords').to_i * (2**32)) : 0))"
}
}
if ([Acct-Output-Octets]) {
ruby {
code => "event.set('Acct-Output-Octets_long', event.get('Acct-Output-Octets').to_i +
(event.get('Acct-Output-Gigawords') ? (event.get('Acct-Output-Gigawords').to_i * (2**32)) : 0))"
}
}
# Remove the original "message" field.
mutate {
remove_field => ["message"]
}
}
}
# Output data to the local elasticsearch cluster
# using type "detail" in index "radius-DATE".
output {
if [type] == "radiusdetail" {
elasticsearch {
index => "radius-%{+YYYY.MM.dd}"
}
}
}
|