#!/usr/bin/env python # Copyright (C) 2010, Eduardo Silva # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import os import sys import re import getopt import ConfigParser RULES_PATH = 'log_rules' ACCESS_FILE = '../logs/access.log' ERROR_FILE = '../logs/error.log' class AccessRule: def __init__(self, ip = None, time = None, method = None, uri = None, protocol = None, status = None, size = None): self.type = 'access' self.ip = ip self.time = time self.method = method self.uri = uri self.status = status self.protocol = protocol self.status = status if size is not None: if size.isdigit(): self.size = size else: if size[:8] == 'FILESIZE': target = size[8:].strip() self.size = os.path.getsize(target) else: self.size = size else: self.size = size class ErrorRule: def __init__(self, ip = None, time = None, error = None, message = None): self.type = 'error' self.ip = ip self.time = time self.error = error self.message = message class Config(ConfigParser.ConfigParser): def __init__(self): ConfigParser.ConfigParser.__init__(self) def _get_value(self, section, key): try: value = self.get(section, key) except: value = None return value def get_rules(self, path): self.read(path) rules = [] for section in self.sections(): if section == 'access': ip = self._get_value(section, 'ip') time = self._get_value(section, 'time') method = self._get_value(section, 'method') uri = self._get_value(section, 'uri') protocol = self._get_value(section, 'protocol') status = self._get_value(section, 'status') size = self._get_value(section, 'size') rule = AccessRule(ip, time, method, uri, protocol, status, size) elif section == 'error': ip = self._get_value(section, 'ip') time = self._get_value(section, 'time') error = self._get_value(section, 'error') message = self._get_value(section, 'message') rule = ErrorRule(ip, time, error, message) # Add rule to list rules.append(rule) return rules class Logfile: def __init__(self): self.silent_mode = False self.target_logfile = None self.check_arguments() # Check if file exists if os.path.isfile(self.target_logfile) is False: # No rules exists for this test if self.silent_mode is False: print "No rules for target" exit(2) # Read rules config = Config() rules = config.get_rules(self.target_logfile) if len(rules) == 0: if self.silent_mode is False: print "Error, no rules found on target file" exit(2) # Check rules self.check_rules(rules) def check_arguments(self): optlist, args = getopt.getopt(sys.argv[1:], 'shl:') for key, val in optlist: if key == '-s': self.silent_mode = True elif key == '-l': self.target_logfile = val elif key == '-h': self.help() if self.target_logfile is None: self.help() def help(self): print "** Monkey QA Checklog **" print "Usage: ./checklog [-s] [-l logfile_rules]" print "\nAvailable options" print " -s Run checklog in silent mode, no messages to stdout" print " -l logfile Specify the logfile rule" print " -h Show this help" print exit(1) def get_last_file_line(self, file): f = open(file, 'r') lines = f.readlines() f.close() if len(lines) < 1: return None # get last file line last = lines[len(lines) - 1] return last def check_field(self, rule, log): if rule is not None: if str(rule) != str(log): if self.silent_mode is False: print "Rule does not match, expect '" + str(rule) + '\' got \'' + log + '\'' exit(1) else: return 0 else: return 0 def check_rules(self, rules): # Parse access log format, anyone is invited to fix this nasty regex access_re = re.compile("^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})( - )(\[.*\])( .* ){1,4}(/.* )(.*/.* )(\d.* )(.*)\n$") error_re = re.compile("^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})( - )(\[.*\])( \[.*\])(.*)$") for r in rules: if r.type == 'access': line = self.get_last_file_line(ACCESS_FILE) fields = access_re.split(line) data = {'ip': fields[1], 'time': fields[3], 'method': fields[4].strip(), 'uri': fields[5].strip(), 'protocol': fields[6].strip(), 'status': fields[7].strip(), 'size': fields[8] } self.check_field(r.ip, data['ip']) self.check_field(r.time, data['time']) self.check_field(r.method, data['method']) self.check_field(r.uri, data['uri']) self.check_field(r.protocol, data['protocol']) self.check_field(r.status, data['status']) self.check_field(r.size, data['size']) elif r.type == 'error': line = self.get_last_file_line(ERROR_FILE) fields = error_re.split(line) # We always expect at least 4 fields if len(fields) < 4: if self.silent_mode is False: print "Error: we did not find the expected fields" print "Logfile line" print " %s " % line exit(1) data = {'ip': fields[1], 'time': fields[3], 'error': fields[4].strip('[error (\d)]'), 'message': fields[5].strip() } self.check_field(r.ip, data['ip']) self.check_field(r.time, data['time']) self.check_field(r.error, data['error']) self.check_field(r.message, data['message']) if self.silent_mode is False: print "Check passed :)" if __name__ == '__main__': Logfile()