#!/usr/bin/env python """ Process packet capture files and produce a nice HTML report of MSN Chat sessions. Copyright (c) 2003 by Gilbert Ramirez SPDX-License-Identifier: GPL-2.0-or-later """ import os import re import sys import array import string import WiresharkXML import getopt # By default we output the HTML to stdout out_fh = sys.stdout class MSNMessage: pass class MSN_MSG(MSNMessage): def __init__(self, timestamp, user, message): self.timestamp = timestamp self.user = user self.message = message class Conversation: """Keeps track of a single MSN chat session""" re_MSG_out = re.compile("MSG (?P\d+) (?P[UNA]) (?P\d+)") re_MSG_in = re.compile("MSG (?P\S+)@(?P\S+) (?P\S+) (?P\d+)") USER_NOT_FOUND = -1 DEFAULT_USER = None DEFAULT_USER_COLOR = "#0000ff" USER_COLORS = [ "#ff0000", "#00ff00", "#800000", "#008000", "#000080" ] DEFAULT_USER_TEXT_COLOR = "#000000" USER_TEXT_COLOR = "#000080" def __init__(self): self.packets = [] self.messages = [] def AddPacket(self, packet): self.packets.append(packet) def Summarize(self): for packet in self.packets: msg = self.CreateMSNMessage(packet) if msg: self.messages.append(msg) else: #XXX pass def CreateMSNMessage(self, packet): msnms = packet.get_items("msnms")[0] # Check the first line in the msnms transmission for the user child = msnms.children[0] user = self.USER_NOT_FOUND m = self.re_MSG_out.search(child.show) if m: user = self.DEFAULT_USER else: m = self.re_MSG_in.search(child.show) if m: user = m.group("alias") if user == self.USER_NOT_FOUND: print >> sys.stderr, "No match for", child.show sys.exit(1) return None msg = "" i = 5 check_trailing = 0 if len(msnms.children) > 5: check_trailing = 1 while i < len(msnms.children): msg += msnms.children[i].show if check_trailing: j = msg.find("MSG ") if j >= 0: msg = msg[:j] i += 5 else: i += 6 else: i += 6 timestamp = packet.get_items("frame.time")[0].get_show() i = timestamp.rfind(".") timestamp = timestamp[:i] return MSN_MSG(timestamp, user, msg) def MsgToHTML(self, text): bytes = array.array("B") new_string = text i = new_string.find("\\") while i > -1: # At the end? if i == len(new_string) - 1: # Just let the default action # copy everything to 'bytes' break if new_string[i+1] in string.digits: left = new_string[:i] bytes.fromstring(left) right = new_string[i+4:] oct_string = new_string[i+1:i+4] char = int(oct_string, 8) bytes.append(char) new_string = right # ignore \r and \n elif new_string[i+1] in "rn": copy_these = new_string[:i] bytes.fromstring(copy_these) new_string = new_string[i+2:] else: copy_these = new_string[:i+2] bytes.fromstring(copy_these) new_string = new_string[i+2:] i = new_string.find("\\") bytes.fromstring(new_string) return bytes def CreateHTML(self, default_user): if not self.messages: return print >> out_fh, """

---- New Conversation @ %s ----


""" \ % (self.messages[0].timestamp) user_color_assignments = {} for msg in self.messages: # Calculate 'user' and 'user_color' and 'user_text_color' if msg.user == self.DEFAULT_USER: user = default_user user_color = self.DEFAULT_USER_COLOR user_text_color = self.DEFAULT_USER_TEXT_COLOR else: user = msg.user user_text_color = self.USER_TEXT_COLOR if user_color_assignments.has_key(user): user_color = user_color_assignments[user] else: num_assigned = len(user_color_assignments.keys()) user_color = self.USER_COLORS[num_assigned] user_color_assignments[user] = user_color # "Oct 6, 2003 21:45:25" --> "21:45:25" timestamp = msg.timestamp.split()[-1] htmlmsg = self.MsgToHTML(msg.message) print >> out_fh, """ (%s) %s: """ \ % (user_color, timestamp, user, user_text_color) htmlmsg.tofile(out_fh) print >> out_fh, "
" class CaptureFile: """Parses a single a capture file and keeps track of all chat sessions in the file.""" def __init__(self, capture_filename, tshark): """Run tshark on the capture file and parse the data.""" self.conversations = [] self.conversations_map = {} pipe = os.popen(tshark + " -Tpdml -n -R " "'msnms contains \"X-MMS-IM-Format\"' " "-r " + capture_filename, "r") WiresharkXML.parse_fh(pipe, self.collect_packets) for conv in self.conversations: conv.Summarize() def collect_packets(self, packet): """Collect the packets passed back from WiresharkXML. Sort them by TCP/IP conversation, as there could be multiple clients per machine.""" # Just in case we're looking at tunnelling protocols where # more than one IP or TCP header exists, look at the last one, # which would be the one inside the tunnel. src_ip = packet.get_items("ip.src")[-1].get_show() dst_ip = packet.get_items("ip.dst")[-1].get_show() src_tcp = packet.get_items("tcp.srcport")[-1].get_show() dst_tcp = packet.get_items("tcp.dstport")[-1].get_show() key_params = [src_ip, dst_ip, src_tcp, dst_tcp] key_params.sort() key = '|'.join(key_params) if not self.conversations_map.has_key(key): conv = self.conversations_map[key] = Conversation() self.conversations.append(conv) else: conv = self.conversations_map[key] conv.AddPacket(packet) def CreateHTML(self, default_user): if not self.conversations: return for conv in self.conversations: conv.CreateHTML(default_user) def run_filename(filename, default_user, tshark): """Process one capture file.""" capture = CaptureFile(filename, tshark) capture.CreateHTML(default_user) def run(filenames, default_user, tshark): # HTML Header print >> out_fh, """ MSN Conversation """ for filename in filenames: run_filename(filename, default_user, tshark) # HTML Footer print >> out_fh, """
""" def usage(): print >> sys.stderr, "msnchat [OPTIONS] CAPTURE_FILE [...]" print >> sys.stderr, " -o FILE name of output file" print >> sys.stderr, " -t TSHARK location of tshark binary" print >> sys.stderr, " -u USER name for unknown user" sys.exit(1) def main(): default_user = "Unknown" tshark = "tshark" optstring = "ho:t:u:" longopts = ["help"] try: opts, args = getopt.getopt(sys.argv[1:], optstring, longopts) except getopt.GetoptError: usage() for opt, arg in opts: if opt == "-h" or opt == "--help": usage() elif opt == "-o": filename = arg global out_fh try: out_fh = open(filename, "w") except IOError: sys.exit("Could not open %s for writing." % (filename,)) elif opt == "-u": default_user = arg elif opt == "-t": tshark = arg else: sys.exit("Unhandled command-line option: " + opt) run(args, default_user, tshark) if __name__ == '__main__': main()