# logstash configuration to process RADIUS detail files # # Matthew Newton # April 2019 # # This config has been tested with logstash version 6.7.0 # # RADIUS "detail" files are textual representations of the RADIUS # packets, and are written to disk by e.g. FreeRADIUS. They look # something like the following, with the timestamp on the first # line then all attributes/values tab-indented. # # Tue Mar 10 15:32:24 2015 # Packet-Type = Access-Request # User-Name = "test@example.com" # Calling-Station-Id = "01-02-03-04-05-06" # Called-Station-Id = "aa-bb-cc-dd-ee-ff:myssid" # NAS-Port = 10 # NAS-IP-Address = 10.9.0.4 # NAS-Identifier = "Wireless-Controller-1" # Service-Type = Framed-User # NAS-Port-Type = Wireless-802.11 # # Example input - read data from a file. For example, to read in a # detail file with this input you could use: # # # /usr/share/logstash/bin/logstash --path.settings=/etc/logstash -f logstash-radius.conf --log.level=debug # input { file { path => "/var/log/radius/radacct/*/detail-*" exclude => "*.gz" # Note when testing that logstash will remember where # it got to and continue from there. start_position => "beginning" # Set the type, for below. type => radiusdetail # It is preferable to use a log feeder that can join # multiple lines together, rather than using multiline # here. For an example, see the log-courier # configuration in this directory. # If you didn't read the above, go back and read it again. # If that is not possible you may be able to use the # following section. Note that if you are using the # "stdin" input, the file is chunked into 16k blobs, # so every 16k a detail record is likely to be chopped # in half. If you are using the "file" input (as in this # example), the blank links between records are not # passed through so the regex here has to be aware of # that. Basically, do multiline as early as possible # in your log feeder client not here and you'll avoid # most issues that are likely to come up. codec => multiline { pattern => "^\t" negate => false what => "previous" } # If you really want to use the "stdin" input, this # will work better, but be aware of the comments # above. #codec => multiline { # pattern => "^[A-Z\t]" # negate => false # what => "next" #} } } # Moving into production will likely need something more reliable. # There are many input methods, an example here using log-courier # (which supports client-site multiline processing and does not # lose log events if logstash is restarted). You could also # investigate e.g. filebeat from Elastic. # input { # courier { # port => 5140 # transport => "tcp" # # # Don't set the type here, as it's set in the # # log-courier config instead. # #type => radiusdetail # } # } # Filter stage. Here we take the raw logs and process them into # something structured ready to index. Each attribute is stored as # a separate field in the output document. filter { if [type] == "radiusdetail" { # Pull off the timestamp at the start of the # detail record. Note there may be additional data # after it that has been added by the local admin, # so stop at a newline OR a tab. grok { match => [ "message", "^(?[^\n\t]+)[\n\t]" ] } # Create the @timestamp field. date { match => [ "timestamp", "EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy" ] } # Split the attributes and values into fields. # This is the bulk of processing that adds all of # the RADIUS attributes as elasticsearch fields. # Note issue https://github.com/logstash-plugins/logstash-filter-kv/issues/10 # currently means that all spaces will be stripped # from all fields. If this is a problem, adjust the # trim setting. kv { field_split => "\n" source => "message" trim_value => "\" " trim_key => "\t " } # Now we try and add some useful additional # information. If certain fields can be broken # down into components then do that here and add # the data as sub-fields. For example, # Called-Station-Id might be able to be broken # down to Called-Station-Id_mac and Called-Station-Id_ssid # on some wireless systems, or to _ip and _port # with a VPN. # Multiple calls to grok otherwise it can stop # processing once it has matched one field, but # e.g. you want to pull both IP and port out of # the same field in two different regex's. # Pull out some IP addresses as field_ip: grok { break_on_match => false tag_on_failure => [] match => [ "Framed-IP-Address", "^(?\d+\.\d+\.\d+\.\d+$)", "NAS-IP-Address", "^(?\d+\.\d+\.\d+\.\d+$)", "Calling-Station-Id", "^(?\d+\.\d+\.\d+\.\d+)", "Called-Station-Id", "^(?\d+\.\d+\.\d+\.\d+)" ] } # Split User-Name, Operator-Name, and pull out # some IP ports if they are there: grok { break_on_match => false tag_on_failure => [] match => [ "User-Name", "^(?[^@]+)?(?:@(?[^@]+))$", "Operator-Name", "^(?.)(?.+)$", "Calling-Station-Id", "\[(?\d+)\]$", "Called-Station-Id", "\[(?\d+)\]$" ] } # Extract MAC addresses (and SSIDs if there). # MAC address matching here is lazy, but should be # good enough. grok { break_on_match => false tag_on_failure => [] match => [ "Calling-Station-Id", "^(?[a-fA-F0-9:-]{17})$", "Calling-Station-Id", "^(?[a-fA-F0-9\.]{14})$", "Calling-Station-Id", "^(?[a-fA-F0-9]{12})$", "Called-Station-Id", "^(?[a-fA-F0-9:-]{17})(?::(?.*))?$", "Called-Station-Id", "^(?[a-fA-F0-9\.]{14})(?::(?.*))?$", "Called-Station-Id", "^(?[a-fA-F0-9]{12})(?::(?.*))?$" ] } # With the optional sanitize_mac plugin, it's # possible to make sure all MAC addresses look the # same, which has obvious benefits. # # https://github.com/mcnewton/logstash-filter-sanitize_mac # sanitize_mac { # match => { # "Called-Station-Id_mac" => "Called-Station-Id_mac" # "Calling-Station-Id_mac" => "Calling-Station-Id_mac" # } # separator => "-" # fixcase => "lower" # } # Gigawords presents an issue because the 64-bit # value is split across two attributes. Combine # them both back into a single attribute so that # the full value is available to use. if ([Acct-Input-Octets]) { ruby { code => "event.set('Acct-Input-Octets_long', event.get('Acct-Input-Octets').to_i + (event.get('Acct-Input-Gigawords') ? (event.get('Acct-Input-Gigawords').to_i * (2**32)) : 0))" } } if ([Acct-Output-Octets]) { ruby { code => "event.set('Acct-Output-Octets_long', event.get('Acct-Output-Octets').to_i + (event.get('Acct-Output-Gigawords') ? (event.get('Acct-Output-Gigawords').to_i * (2**32)) : 0))" } } # Remove the original "message" field. mutate { remove_field => ["message"] } } } # Output data to the local elasticsearch cluster # using type "detail" in index "radius-DATE". output { if [type] == "radiusdetail" { elasticsearch { index => "radius-%{+YYYY.MM.dd}" } } }