From 0d47952611198ef6b1163f366dc03922d20b1475 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 09:42:04 +0200 Subject: Adding upstream version 7.94+git20230807.3be01efb1+dfsg. Signed-off-by: Daniel Baumann --- zenmap/zenmapCore/NmapParser.py | 1345 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1345 insertions(+) create mode 100644 zenmap/zenmapCore/NmapParser.py (limited to 'zenmap/zenmapCore/NmapParser.py') diff --git a/zenmap/zenmapCore/NmapParser.py b/zenmap/zenmapCore/NmapParser.py new file mode 100644 index 0000000..bf9ad05 --- /dev/null +++ b/zenmap/zenmapCore/NmapParser.py @@ -0,0 +1,1345 @@ +#!/usr/bin/env python3 + +# ***********************IMPORTANT NMAP LICENSE TERMS************************ +# * +# * The Nmap Security Scanner is (C) 1996-2023 Nmap Software LLC ("The Nmap +# * Project"). Nmap is also a registered trademark of the Nmap Project. +# * +# * This program is distributed under the terms of the Nmap Public Source +# * License (NPSL). The exact license text applying to a particular Nmap +# * release or source code control revision is contained in the LICENSE +# * file distributed with that version of Nmap or source code control +# * revision. More Nmap copyright/legal information is available from +# * https://nmap.org/book/man-legal.html, and further information on the +# * NPSL license itself can be found at https://nmap.org/npsl/ . This +# * header summarizes some key points from the Nmap license, but is no +# * substitute for the actual license text. +# * +# * Nmap is generally free for end users to download and use themselves, +# * including commercial use. It is available from https://nmap.org. +# * +# * The Nmap license generally prohibits companies from using and +# * redistributing Nmap in commercial products, but we sell a special Nmap +# * OEM Edition with a more permissive license and special features for +# * this purpose. See https://nmap.org/oem/ +# * +# * If you have received a written Nmap license agreement or contract +# * stating terms other than these (such as an Nmap OEM license), you may +# * choose to use and redistribute Nmap under those terms instead. +# * +# * The official Nmap Windows builds include the Npcap software +# * (https://npcap.com) for packet capture and transmission. It is under +# * separate license terms which forbid redistribution without special +# * permission. So the official Nmap Windows builds may not be redistributed +# * without special permission (such as an Nmap OEM license). +# * +# * Source is provided to this software because we believe users have a +# * right to know exactly what a program is going to do before they run it. +# * This also allows you to audit the software for security holes. +# * +# * Source code also allows you to port Nmap to new platforms, fix bugs, and add +# * new features. You are highly encouraged to submit your changes as a Github PR +# * or by email to the dev@nmap.org mailing list for possible incorporation into +# * the main distribution. Unless you specify otherwise, it is understood that +# * you are offering us very broad rights to use your submissions as described in +# * the Nmap Public Source License Contributor Agreement. This is important +# * because we fund the project by selling licenses with various terms, and also +# * because the inability to relicense code has caused devastating problems for +# * other Free Software projects (such as KDE and NASM). +# * +# * The free version of Nmap is distributed in the hope that it will be +# * useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. Warranties, +# * indemnification and commercial support are all available through the +# * Npcap OEM program--see https://nmap.org/oem/ +# * +# ***************************************************************************/ + +import locale +import time +import socket +import copy + +from io import StringIO + +# Prevent loading PyXML +import xml +xml.__path__ = [x for x in xml.__path__ if "_xmlplus" not in x] + +from xml.sax import make_parser +from xml.sax import SAXException +from xml.sax.handler import ContentHandler, EntityResolver +from xml.sax.saxutils import XMLGenerator +from xml.sax.xmlreader import AttributesImpl as Attributes + +import zenmapCore.I18N # lgtm[py/unused-import] +from zenmapCore.NmapOptions import NmapOptions, join_quoted +from zenmapCore.StringPool import unique + +# The version of the Nmap DTD this file understands and emits. +XML_OUTPUT_VERSION = "1.04" + + +class HostInfo(object): + def __init__(self): + self.comment = None + self._tcpsequence = {} + self._osmatches = [] + self._ports = [] + self._ports_used = [] + self._extraports = [] + self._uptime = {} + self._hostnames = [] + self._tcptssequence = {} + self._ipidsequence = {} + self._ip = None + self._ipv6 = None + self._mac = None + self._state = '' + self._comment = '' + self._trace = {} + + def make_clone(self): + clone = HostInfo() + clone.comment = self.comment + clone._tcpsequence = copy.deepcopy(self._tcpsequence) + clone._osmatches = copy.deepcopy(self._osmatches) + clone._ports = copy.deepcopy(self._ports) + clone._ports_used = self._ports_used + clone._extraports = self._extraports + clone._uptime = copy.deepcopy(self._uptime) + clone._hostnames = copy.deepcopy(self._hostnames) + clone._tcptssequence = copy.deepcopy(self._tcptssequence) + clone._ipidsequence = copy.deepcopy(self._ipidsequence) + clone._ip = copy.deepcopy(self._ip) + clone._ipv6 = copy.deepcopy(self._ipv6) + clone._mac = copy.deepcopy(self._mac) + clone._state = self._state + clone._comment = self._comment + clone._trace = copy.deepcopy(self._trace) + + return clone + + # tcpsequence is a dict of the form + # {'index': u'203', + # 'values': u'3637785D,35B440D1,35E9FC3B,3640DB42,355F5931,3601AE14', + # 'difficulty': u'Good luck!'} + def set_tcpsequence(self, sequence): + self._tcpsequence = sequence + + def get_tcpsequence(self): + if self._tcpsequence: + return self._tcpsequence + return {} + + # tcptssequence is a dict of the form + # {'values': u'71D0483C,71D048A3,71D0490C,71D04973,71D049DB,71D04A45', + # 'class': u'1000HZ'} + def set_tcptssequence(self, sequence): + self._tcptssequence = sequence + + def get_tcptssequence(self): + if self._tcptssequence: + return self._tcptssequence + return {} + + # ipidsequence is a dict of the form + # {'values': u'0,0,0,0,0,0', 'class': u'All zeros'} + def set_ipidsequence(self, sequence): + self._ipidsequence = sequence + + def get_ipidsequence(self): + if self._ipidsequence: + return self._ipidsequence + return {} + + # osmatches is a list of dicts of the form + # {'name': u'Linux 2.6.24', 'accuracy': u'98', 'line': u'1000', + # 'osclasses': ...} + # where each 'osclasses' element is a dict of the form + # {'vendor': u'Linux', 'osfamily': u'Linux', 'type': u'general purpose', + # 'osgen': u'2.6.X', 'accuracy': u'98'} + def set_osmatches(self, matches): + self._osmatches = matches + + def get_osmatches(self): + return self._osmatches + + def get_best_osmatch(self): + """Return the OS match with the highest accuracy.""" + if not self._osmatches: + return None + + def osmatch_key(osmatch): + try: + return -float(osmatch["accuracy"]) + except ValueError: + return 0 + + return sorted(self._osmatches, key=osmatch_key)[0] + + # ports_used is a list like + # [{'state': u'open', 'portid': u'22', 'proto': u'tcp'}, + # {'state': u'closed', 'portid': u'25', 'proto': u'tcp'}, + # {'state': u'closed', 'portid': u'44054', 'proto': u'udp'}] + # but not all three elements are necessarily present. + def set_ports_used(self, ports): + self._ports_used = ports + + def get_ports_used(self): + return self._ports_used + + # uptime is a dict of the form + # {'seconds': u'1909493', 'lastboot': u'Wed Jul 2 06:48:31 2008'} + def set_uptime(self, uptime): + self._uptime = uptime + + def get_uptime(self): + if self._uptime: + return self._uptime + + # Avoid empty dict return + return {"seconds": "", "lastboot": ""} + + # ports is an array containing dicts of the form + # {'port_state': u'open', 'portid': u'22', 'protocol': u'tcp', + # 'service_conf': u'10', 'service_extrainfo': u'protocol 2.0', + # 'service_method': u'probed', 'service_name': u'ssh', + # 'service_product': u'OpenSSH', 'service_version': u'4.3'} + def set_ports(self, ports): + self._ports = ports + + def get_ports(self): + return self._ports + + # extraports is an array of dicts of the form + # {'count': u'1709', 'state': u'filtered'} + def set_extraports(self, port_list): + self._extraports = port_list + + def get_extraports(self): + return self._extraports + + # hostnames is a list containing dicts of the form + # [{'hostname': u'scanme.nmap.org', 'hostname_type': u'PTR'}] + def set_hostnames(self, hostname_list): + self._hostnames = hostname_list + + def get_hostnames(self): + return self._hostnames + + # ip, ipv6, and mac are either None or dicts of the form + # {'vendor': u'', 'type': u'ipv4', 'addr': u'64.13.134.52'} + def set_ip(self, addr): + self._ip = addr + + def get_ip(self): + return self._ip + + def set_mac(self, addr): + self._mac = addr + + def get_mac(self): + return self._mac + + def set_ipv6(self, addr): + self._ipv6 = addr + + def get_ipv6(self): + return self._ipv6 + + def get_addrs_for_sort(self): + """Return a list of addresses as opaque values sorted such that + 1) IPv4 comes before IPv6 comes before MAC, and + 2) addresses are sorted according to their binary values, not their + string representation. + Use this function to the the comparison key when sorting a list of + hosts by address.""" + l = [] + if self.ip: + l.append((1, socket.inet_aton(self.ip["addr"]))) + if self.ipv6: + try: + l.append((1, + socket.inet_pton(socket.AF_INET6, self.ipv6["addr"]))) + except AttributeError: + # Windows doesn't have socket.inet_pton. Go alphabetical. + # Encode to a byte string for possible comparison with binary + # address strings (which can't be converted to unicode). + l.append((1, self.ipv6["addr"].encode("utf-8"))) + if self.mac: + l.append((3, "".join( + chr(int(x, 16)) for x in self.mac["addr"].split(":")))) + l.sort() + return l + + # comment is a string. + def get_comment(self): + return self._comment + + def set_comment(self, comment): + self._comment = comment + + # state is a string like u'up' or u'down'. + def set_state(self, status): + self._state = status + + def get_state(self): + return self._state + + def get_hostname(self): + hostname = None + if len(self._hostnames) > 0: + hostname = self._hostnames[0]["hostname"] + + address = self.ip or self.ipv6 or self.mac + if address is not None: + address = address["addr"] + + if hostname is not None: + if address is not None: + return "%s (%s)" % (hostname, address) + else: + return hostname + else: + if address is not None: + return address + else: + return _("Unknown Host") + + def get_port_count_by_states(self, states): + count = 0 + + for p in self.ports: + state = p.get('port_state') + if state in states: + count += 1 + + for extra in self.get_extraports(): + if extra['state'] in states: + count += int(extra['count']) + + return count + + def get_open_ports(self): + return self.get_port_count_by_states(('open', 'open|filtered')) + + def get_filtered_ports(self): + return self.get_port_count_by_states( + ('filtered', 'open|filtered', 'closed|filtered')) + + def get_closed_ports(self): + return self.get_port_count_by_states(('closed', 'closed|filtered')) + + def get_scanned_ports(self): + scanned = 0 + + for p in self.ports: + scanned += 1 + + for extra in self.get_extraports(): + scanned += int(extra["count"]) + + return scanned + + def get_services(self): + services = [] + for p in self.ports: + services.append({ + "service_name": p.get("service_name", _("unknown")), + "portid": p.get("portid", ""), + "service_version": p.get("service_version", + _("Unknown version")), + "service_product": p.get("service_product", ""), + "service_extrainfo": p.get("service_extrainfo", ""), + "port_state": p.get("port_state", _("unknown")), + "protocol": p.get("protocol", "") + }) + return services + + def get_trace(self): + return self._trace + + def set_trace(self, trace): + self._trace = trace + + def append_trace_hop(self, hop): + if "hops" in self._trace: + self._trace["hops"].append(hop) + else: + self._trace["hops"] = [hop] + + def set_trace_error(self, errorstr): + self._trace["error"] = errorstr + + # Properties + tcpsequence = property(get_tcpsequence, set_tcpsequence) + osmatches = property(get_osmatches, set_osmatches) + ports = property(get_ports, set_ports) + ports_used = property(get_ports_used, set_ports_used) + extraports = property(get_extraports, set_extraports) + uptime = property(get_uptime, set_uptime) + hostnames = property(get_hostnames, set_hostnames) + tcptssequence = property(get_tcptssequence, set_tcptssequence) + ipidsequence = property(get_ipidsequence, set_ipidsequence) + ip = property(get_ip, set_ip) + ipv6 = property(get_ipv6, set_ipv6) + mac = property(get_mac, set_mac) + state = property(get_state, set_state) + comment = property(get_comment, set_comment) + services = property(get_services) + trace = property(get_trace, set_trace) + + +class ParserBasics(object): + def __init__(self): + # This flag informs us whether the XML output file is temporary (True), + # or user specified (False). If any of them is user-specified, it + # doesn't get stripped out of the command string in set_nmap_command. + self.xml_is_temp = True + + self.nmap = { + 'nmaprun': {}, + 'scaninfo': [], + 'verbose': '', + 'debugging': '', + 'hosts': [], + 'runstats': {} + } + + self.ops = NmapOptions() + self._nmap_output = StringIO() + + def set_xml_is_temp(self, xml_is_temp): + # This flag is False if a user has specified his own -oX option - in + # which case we not should remove the -oX option from the command + # string. A value of True means that we're using a temporary file which + # should be removed from the command string (see set_nmap_command). + self.xml_is_temp = xml_is_temp + + def get_profile_name(self): + return self.nmap['nmaprun'].get('profile_name', '') + + def set_profile_name(self, name): + self.nmap['nmaprun']['profile_name'] = name + + def get_targets(self): + return self.ops.target_specs + + def set_targets(self, targets): + self.ops.target_specs = targets + + def get_nmap_output(self): + return self._nmap_output.getvalue() + + def set_nmap_output(self, nmap_output): + self._nmap_output.close() + del self._nmap_output + self._nmap_output = StringIO() + self._nmap_output.write(nmap_output) + + def del_nmap_output(self): + self._nmap_output.close() + del self._nmap_output + + def get_debugging_level(self): + return self.nmap.get('debugging', '') + + def set_debugging_level(self, level): + self.nmap['debugging'] = level + + def get_verbose_level(self): + return self.nmap.get('verbose', '') + + def set_verbose_level(self, level): + self.nmap['verbose'] = level + + def get_scaninfo(self): + return self.nmap.get('scaninfo', '') + + def set_scaninfo(self, info): + self.nmap['scaninfo'] = info + + def get_services_scanned(self): + if self._services_scanned is None: + return self._services_scanned + + services = [] + for scan in self.nmap.get('scaninfo', []): + services.append(scan['services']) + + self._services_scanned = ','.join(services) + return self._services_scanned + + def set_services_scanned(self, services_scanned): + self._services_scanned = services_scanned + + def get_nmap_command(self): + return self.ops.render_string() + + def set_nmap_command(self, command): + self.ops.parse_string(command) + if self.xml_is_temp: + self.ops["-oX"] = None + self.nmap['nmaprun']['args'] = self.ops.render_string() + + def get_scan_type(self): + types = [] + for t in self.nmap.get('scaninfo', []): + types.append(t['type']) + return types + + def get_protocol(self): + protocols = [] + for proto in self.nmap.get('scaninfo', []): + protocols.append(proto['protocol']) + return protocols + + def get_num_services(self): + if self._num_services is None: + return self._num_services + + num = 0 + for n in self.nmap.get('scaninfo', []): + num += int(n['numservices']) + + self._num_services = num + return self._num_services + + def set_num_services(self, num_services): + self._num_services = num_services + + def get_date(self): + epoch = int(self.nmap['nmaprun'].get('start', '0')) + return time.localtime(epoch) + + def get_start(self): + return self.nmap['nmaprun'].get('start', '0') + + def set_start(self, start): + self.nmap['nmaprun']['start'] = start + + def set_date(self, date): + if type(date) == type(int): + self.nmap['nmaprun']['start'] = date + else: + raise Exception("Wrong date format. Date should be saved \ +in epoch format!") + + def get_open_ports(self): + ports = 0 + + for h in self.nmap.get('hosts', []): + ports += h.get_open_ports() + + return ports + + def get_filtered_ports(self): + ports = 0 + + for h in self.nmap.get('hosts', []): + ports += h.get_filtered_ports() + + return ports + + def get_closed_ports(self): + ports = 0 + + for h in self.nmap['hosts']: + ports += h.get_closed_ports() + + return ports + + def get_formatted_date(self): + return time.strftime("%B %d, %Y - %H:%M", self.get_date()) + + def get_scanner(self): + return self.nmap['nmaprun'].get('scanner', '') + + def set_scanner(self, scanner): + self.nmap['nmaprun']['scanner'] = scanner + + def get_scanner_version(self): + return self.nmap['nmaprun'].get('version', '') + + def set_scanner_version(self, version): + self.nmap['nmaprun']['version'] = version + + # IPv4 + def get_ipv4(self): + hosts = self.nmap.get('hosts') + if hosts is None: + return [] + return [host.ip for host in hosts if host.ip is not None] + + # MAC + def get_mac(self): + hosts = self.nmap.get('hosts') + if hosts is None: + return [] + return [host.mac for host in hosts if host.mac is not None] + + # IPv6 + def get_ipv6(self): + hosts = self.nmap.get('hosts') + if hosts is None: + return [] + return [host.ipv6 for host in hosts if host.ipv6 is not None] + + def get_hostnames(self): + hostnames = [] + for host in self.nmap.get('hosts', []): + hostnames += host.get_hostnames() + return hostnames + + def get_hosts(self): + return self.nmap.get('hosts', None) + + def get_runstats(self): + return self.nmap.get('runstats', None) + + def set_runstats(self, stats): + self.nmap['runstats'] = stats + + def get_hosts_down(self): + return int(self.nmap['runstats'].get('hosts_down', '0')) + + def set_hosts_down(self, down): + self.nmap['runstats']['hosts_down'] = int(down) + + def get_hosts_up(self): + return int(self.nmap['runstats'].get('hosts_up', '0')) + + def set_hosts_up(self, up): + self.nmap['runstats']['hosts_up'] = int(up) + + def get_hosts_scanned(self): + return int(self.nmap['runstats'].get('hosts_scanned', '0')) + + def set_hosts_scanned(self, scanned): + self.nmap['runstats']['hosts_scanned'] = int(scanned) + + def get_finish_time(self): + return time.localtime(int(self.nmap['runstats'].get('finished_time', + '0'))) + + def set_finish_time(self, finish): + self.nmap['runstats']['finished_time'] = int(finish) + + def get_finish_epoc_time(self): + return int(self.nmap['runstats'].get('finished_time', '0')) + + def set_finish_epoc_time(self, time): + self.nmap['runstats']['finished_time'] = time + + def get_scan_name(self): + """Get a human-readable string representing this scan.""" + scan_name = self.nmap.get("scan_name") + if scan_name: + return scan_name + if self.profile_name and self.get_targets(): + return _("%s on %s") % (self.profile_name, + join_quoted(self.get_targets())) + return self.get_nmap_command() + + def set_scan_name(self, scan_name): + self.nmap["scan_name"] = scan_name + + def get_formatted_finish_date(self): + return time.strftime("%B %d, %Y - %H:%M", self.get_finish_time()) + + def get_port_protocol_dict(self): + #Create a dict of port -> protocol for all ports scanned + ports = {} + for scaninfo in self.scaninfo: + services_string = scaninfo['services'].strip() + if services_string == "": + services_array = [] + else: + services_array = services_string.split(',') + for item in services_array: + if item.find('-') == -1: + if int(item) not in ports: + ports[int(item)] = [] + ports[int(item)].append(scaninfo['protocol']) + else: + begin, end = item.split('-') + for port in range(int(begin), int(end) + 1): + if int(port) not in ports: + ports[int(port)] = [] + ports[int(port)].append(scaninfo['protocol']) + return ports + + profile_name = property(get_profile_name, set_profile_name) + nmap_output = property(get_nmap_output, set_nmap_output, del_nmap_output) + debugging_level = property(get_debugging_level, set_debugging_level) + verbose_level = property(get_verbose_level, set_verbose_level) + scaninfo = property(get_scaninfo, set_scaninfo) + services_scanned = property(get_services_scanned, set_services_scanned) + nmap_command = property(get_nmap_command, set_nmap_command) + scan_type = property(get_scan_type) + protocol = property(get_protocol) + num_services = property(get_num_services, set_num_services) + date = property(get_date, set_date) + open_ports = property(get_open_ports) + filtered_ports = property(get_filtered_ports) + closed_ports = property(get_closed_ports) + formatted_date = property(get_formatted_date) + scanner = property(get_scanner, set_scanner) + scanner_version = property(get_scanner_version, set_scanner_version) + ipv4 = property(get_ipv4) + mac = property(get_mac) + ipv6 = property(get_ipv6) + hostnames = property(get_hostnames) + hosts = property(get_hosts) + runstats = property(get_runstats, set_runstats) + hosts_down = property(get_hosts_down, set_hosts_down) + hosts_up = property(get_hosts_up, set_hosts_up) + hosts_scanned = property(get_hosts_scanned, set_hosts_scanned) + finish_time = property(get_finish_time, set_finish_time) + finish_epoc_time = property(get_finish_epoc_time, set_finish_epoc_time) + formatted_finish_date = property(get_formatted_finish_date) + start = property(get_start, set_start) + scan_name = property(get_scan_name, set_scan_name) + + _num_services = None + _services_scanned = None + + +class NmapParserSAX(ParserBasics, ContentHandler): + def __init__(self): + ParserBasics.__init__(self) + ContentHandler.__init__(self) + + # The text inside an xml-stylesheet processing instruction, like + # 'href="file:///usr/share/nmap/nmap.xsl" type="text/xsl"'. + self.xml_stylesheet_data = None + + self.in_interactive_output = False + self.in_run_stats = False + self.in_host = False + self.in_hostnames = False + self.in_ports = False + self.in_port = False + self.in_os = False + self.in_trace = False + self.list_extraports = [] + + self.filename = None + + self.unsaved = False + + def set_parser(self, parser): + self.parser = parser + + def parse(self, f): + """Parse an Nmap XML file from the file-like object f.""" + self.parser.parse(f) + + def parse_file(self, filename): + """Parse an Nmap XML file from the named file.""" + with open(filename, "r") as f: + self.parse(f) + self.filename = filename + + def _parse_nmaprun(self, attrs): + run_tag = "nmaprun" + + if self.nmap_output == "" and "nmap_output" in attrs: + self.nmap_output = attrs["nmap_output"] + self.nmap[run_tag]["profile_name"] = attrs.get("profile_name", "") + self.nmap[run_tag]["start"] = attrs.get("start", "") + self.nmap[run_tag]["args"] = attrs.get("args", "") + self.nmap[run_tag]["scanner"] = attrs.get("scanner", "") + self.nmap[run_tag]["version"] = attrs.get("version", "") + self.nmap[run_tag]["xmloutputversion"] = attrs.get( + "xmloutputversion", "") + + self.nmap_command = self.nmap[run_tag]["args"] + + def _parse_output(self, attrs): + if attrs.get("type") != "interactive": + return + if self.in_interactive_output: + raise SAXException("Unexpected nested \"output\" element.") + self.in_interactive_output = True + self.nmap_output = "" + + def _parse_scaninfo(self, attrs): + dic = {} + + dic["type"] = unique(attrs.get("type", "")) + dic["protocol"] = unique(attrs.get("protocol", "")) + dic["numservices"] = attrs.get("numservices", "") + dic["services"] = attrs.get("services", "") + + self.nmap["scaninfo"].append(dic) + + def _parse_verbose(self, attrs): + self.nmap["verbose"] = attrs.get("level", "") + + def _parse_debugging(self, attrs): + self.nmap["debugging"] = attrs.get("level", "") + + def _parse_runstats_finished(self, attrs): + self.nmap["runstats"]["finished_time"] = attrs.get("time", "") + + def _parse_runstats_hosts(self, attrs): + self.nmap["runstats"]["hosts_up"] = attrs.get("up", "") + self.nmap["runstats"]["hosts_down"] = attrs.get("down", "") + self.nmap["runstats"]["hosts_scanned"] = attrs.get("total", "") + + def _parse_host(self, attrs): + self.host_info = HostInfo() + self.host_info.comment = attrs.get("comment", "") + + def _parse_host_status(self, attrs): + self.host_info.set_state(unique(attrs.get("state", ""))) + + def _parse_host_address(self, attrs): + address_attributes = {"type": unique(attrs.get("addrtype", "")), + "vendor": attrs.get("vendor", ""), + "addr": attrs.get("addr", "")} + + if address_attributes["type"] == "ipv4": + self.host_info.set_ip(address_attributes) + elif address_attributes["type"] == "ipv6": + self.host_info.set_ipv6(address_attributes) + elif address_attributes["type"] == "mac": + self.host_info.set_mac(address_attributes) + + def _parse_host_hostname(self, attrs): + self.list_hostnames.append({"hostname": attrs.get("name", ""), + "hostname_type": attrs.get("type", "")}) + + def _parse_host_extraports(self, attrs): + self.list_extraports.append({"state": unique(attrs.get("state", "")), + "count": attrs.get("count", "")}) + + def _parse_host_port(self, attrs): + self.dic_port = {"protocol": unique(attrs.get("protocol", "")), + "portid": unique(attrs.get("portid", ""))} + + def _parse_host_port_state(self, attrs): + self.dic_port["port_state"] = unique(attrs.get("state", "")) + self.dic_port["reason"] = unique(attrs.get("reason", "")) + self.dic_port["reason_ttl"] = unique(attrs.get("reason_ttl", "")) + + def _parse_host_port_service(self, attrs): + self.dic_port["service_name"] = attrs.get("name", "") + self.dic_port["service_method"] = unique(attrs.get("method", "")) + self.dic_port["service_conf"] = attrs.get("conf", "") + self.dic_port["service_product"] = attrs.get("product", "") + self.dic_port["service_version"] = attrs.get("version", "") + self.dic_port["service_extrainfo"] = attrs.get("extrainfo", "") + + def _parse_host_osmatch(self, attrs): + osmatch = self._parsing(attrs, [], ['name', 'accuracy', 'line']) + osmatch['osclasses'] = [] + self.list_osmatch.append(osmatch) + + def _parse_host_portused(self, attrs): + self.list_portused.append(self._parsing( + attrs, ['state', 'proto', 'portid'], [])) + + def _parse_host_osclass(self, attrs): + self.list_osclass.append(self._parsing( + attrs, ['type', 'vendor', 'osfamily', 'osgen'], ['accuracy'])) + + def _parsing(self, attrs, unique_names, other_names): + # Returns a dict with the attributes of a given tag with the + # attributes names as keys and their respective values + dic = {} + for at in unique_names: + dic[at] = unique(attrs.get(at, "")) + for at in other_names: + dic[at] = attrs.get(at, "") + return dic + + def _parse_host_uptime(self, attrs): + self.host_info.set_uptime(self._parsing( + attrs, [], ["seconds", "lastboot"])) + + def _parse_host_tcpsequence(self, attrs): + self.host_info.set_tcpsequence(self._parsing( + attrs, ['difficulty'], ['index', 'values'])) + + def _parse_host_tcptssequence(self, attrs): + self.host_info.set_tcptssequence(self._parsing( + attrs, ['class'], ['values'])) + + def _parse_host_ipidsequence(self, attrs): + self.host_info.set_ipidsequence(self._parsing( + attrs, ['class'], ['values'])) + + def _parse_host_trace(self, attrs): + trace = {} + for attr in ["proto", "port"]: + trace[attr] = unique(attrs.get(attr, "")) + self.host_info.set_trace(trace) + + def _parse_host_trace_hop(self, attrs): + hop = self._parsing(attrs, [], ["ttl", "rtt", "ipaddr", "host"]) + self.host_info.append_trace_hop(hop) + + def _parse_host_trace_error(self, attrs): + self.host_info.set_trace_error(unique(attrs.get("errorstr", ""))) + + def processingInstruction(self, target, data): + if target == "xml-stylesheet": + self.xml_stylesheet_data = data + + def startElement(self, name, attrs): + if name == "nmaprun": + self._parse_nmaprun(attrs) + if name == "output": + self._parse_output(attrs) + elif name == "scaninfo": + self._parse_scaninfo(attrs) + elif name == "verbose": + self._parse_verbose(attrs) + elif name == "debugging": + self._parse_debugging(attrs) + elif name == "runstats": + self.in_run_stats = True + elif self.in_run_stats and name == "finished": + self._parse_runstats_finished(attrs) + elif self.in_run_stats and name == "hosts": + self._parse_runstats_hosts(attrs) + elif name == "host": + self.in_host = True + self._parse_host(attrs) + self.list_ports = [] + self.list_extraports = [] + elif self.in_host and name == "status": + self._parse_host_status(attrs) + elif self.in_host and name == "address": + self._parse_host_address(attrs) + elif self.in_host and name == "hostnames": + self.in_hostnames = True + self.list_hostnames = [] + elif self.in_host and self.in_hostnames and name == "hostname": + self._parse_host_hostname(attrs) + elif self.in_host and name == "ports": + self.in_ports = True + elif self.in_host and self.in_ports and name == "extraports": + self._parse_host_extraports(attrs) + elif self.in_host and self.in_ports and name == "port": + self.in_port = True + self._parse_host_port(attrs) + elif self.in_host and self.in_ports and \ + self.in_port and name == "state": + self._parse_host_port_state(attrs) + elif self.in_host and self.in_ports and \ + self.in_port and name == "service": + self._parse_host_port_service(attrs) + elif self.in_host and name == "os": + self.in_os = True + self.list_portused = [] + self.list_osmatch = [] + elif self.in_host and self.in_os and name == "osmatch": + self._parse_host_osmatch(attrs) + elif self.in_host and self.in_os and name == "portused": + self._parse_host_portused(attrs) + elif self.in_host and self.in_os and name == "osclass": + self.list_osclass = [] + self._parse_host_osclass(attrs) + elif self.in_host and name == "uptime": + self._parse_host_uptime(attrs) + elif self.in_host and name == "tcpsequence": + self._parse_host_tcpsequence(attrs) + elif self.in_host and name == "tcptssequence": + self._parse_host_tcptssequence(attrs) + elif self.in_host and name == "ipidsequence": + self._parse_host_ipidsequence(attrs) + elif self.in_host and name == "trace": + self.in_trace = True + self._parse_host_trace(attrs) + elif self.in_host and self.in_trace and name == "hop": + self._parse_host_trace_hop(attrs) + elif self.in_host and self.in_trace and name == "error": + self._parse_host_trace_error(attrs) + + def endElement(self, name): + if name == "output": + self.in_interactive_output = False + elif name == "runstats": + self.in_run_stats = False + elif name == "host": + self.in_host = False + self.host_info.set_extraports(self.list_extraports) + self.host_info.set_ports(self.list_ports) + self.nmap["hosts"].append(self.host_info) + elif self.in_host and name == "hostnames": + self.in_hostnames = False + self.host_info.set_hostnames(self.list_hostnames) + elif self.in_host and name == "ports": + self.in_ports = False + elif self.in_host and self.in_ports and name == "port": + self.in_port = False + self.list_ports.append(self.dic_port) + del(self.dic_port) + elif self.in_host and self.in_os and name == "osmatch": + self.list_osmatch[-1]['osclasses'].extend(self.list_osclass) + self.list_osclass = [] + elif self.in_host and self.in_os and name == "os": + self.in_os = False + self.host_info.set_ports_used(self.list_portused) + self.host_info.set_osmatches(self.list_osmatch) + + del(self.list_portused) + del(self.list_osmatch) + elif self.in_host and self.in_trace and name == "trace": + self.in_trace = False + + def characters(self, content): + if self.in_interactive_output: + self._nmap_output.write(content) + + def write_text(self, f): + """Write the Nmap text output of this object to the file-like object + f.""" + if self.nmap_output == "": + return + f.write(self.nmap_output) + + def write_xml(self, f): + """Write the XML representation of this object to the file-like object + f.""" + writer = XMLGenerator(f) + writer.startDocument() + if self.xml_stylesheet_data is not None: + writer.processingInstruction( + "xml-stylesheet", self.xml_stylesheet_data) + self._write_nmaprun(writer) + self._write_scaninfo(writer) + self._write_verbose(writer) + self._write_debugging(writer) + self._write_output(writer) + self._write_hosts(writer) + self._write_runstats(writer) + writer.endElement("nmaprun") + writer.endDocument() + + def get_xml(self): + """Return a string containing the XML representation of this scan.""" + buffer = StringIO() + self.write_xml(buffer) + string = buffer.getvalue() + buffer.close() + return string + + def write_xml_to_file(self, filename): + """Write the XML representation of this scan to the file whose name is + given.""" + fd = open(filename, "w") + self.write_xml(fd) + fd.close() + + def _write_output(self, writer): + if self.nmap_output == "": + return + writer.startElement("output", Attributes({"type": "interactive"})) + writer.characters(self.nmap_output) + writer.endElement("output") + + def _write_runstats(self, writer): + ################## + # Runstats element + writer.startElement("runstats", Attributes(dict())) + + ## Finished element + writer.startElement("finished", + Attributes(dict(time=str(self.finish_epoc_time), + timestr=time.ctime(time.mktime( + self.get_finish_time()))))) + writer.endElement("finished") + + ## Hosts element + writer.startElement("hosts", + Attributes(dict(up=str(self.hosts_up), + down=str(self.hosts_down), + total=str(self.hosts_scanned)))) + writer.endElement("hosts") + + writer.endElement("runstats") + # End of Runstats element + ######################### + + def _write_hosts(self, writer): + for host in self.hosts: + # Start host element + writer.startElement("host", + Attributes(dict(comment=host.comment))) + + # Status element + writer.startElement("status", + Attributes(dict(state=host.state))) + writer.endElement("status") + + ################## + # Address elements + ## IPv4 + if host.ip is not None: + writer.startElement("address", + Attributes(dict(addr=host.ip.get("addr", ""), + vendor=host.ip.get("vendor", ""), + addrtype=host.ip.get("type", "")))) + writer.endElement("address") + + ## IPv6 + if host.ipv6 is not None: + writer.startElement("address", + Attributes(dict(addr=host.ipv6.get("addr", ""), + vendor=host.ipv6.get("vendor", ""), + addrtype=host.ipv6.get("type", "")))) + writer.endElement("address") + + ## MAC + if host.mac is not None: + writer.startElement("address", + Attributes(dict(addr=host.mac.get("addr", ""), + vendor=host.mac.get("vendor", ""), + addrtype=host.mac.get("type", "")))) + writer.endElement("address") + # End of Address elements + ######################### + + ################### + # Hostnames element + writer.startElement("hostnames", Attributes({})) + + for hname in host.hostnames: + writer.startElement("hostname", + Attributes(dict(name=hname.get("hostname", ""), + type=hname.get("hostname_type", "")))) + + writer.endElement("hostname") + + writer.endElement("hostnames") + # End of Hostnames element + ########################## + + ############### + # Ports element + writer.startElement("ports", Attributes({})) + + ## Extraports elements + for ext in host.get_extraports(): + writer.startElement("extraports", + Attributes(dict(count=ext.get("count", ""), + state=ext.get("state", "")))) + writer.endElement("extraports") + + ## Port elements + for p in host.ports: + writer.startElement("port", + Attributes(dict(portid=p.get("portid", ""), + protocol=p.get("protocol", "")))) + + ### Port state + writer.startElement("state", + Attributes(dict(state=p.get("port_state", ""), + reason=p.get("reason", ""), + reason_ttl=p.get("reason_ttl", "")))) + writer.endElement("state") + + ### Port service info + d = {} + for xml_attr, member in (("conf", "service_conf"), + ("method", "service_method"), + ("name", "service_name"), + ("product", "service_product"), + ("version", "service_version"), + ("extrainfo", "service_extrainfo")): + if p.get(member): + d[xml_attr] = p.get(member) + writer.startElement("service", Attributes(d)) + writer.endElement("service") + + writer.endElement("port") + + writer.endElement("ports") + # End of Ports element + ###################### + + ############ + # OS element + writer.startElement("os", Attributes({})) + + ## Ports used elements + for pu in host.ports_used: + writer.startElement("portused", + Attributes(dict(state=pu.get("state", ""), + proto=pu.get("proto", ""), + portid=pu.get("portid", "")))) + writer.endElement("portused") + + ## Osmatch elements + for om in host.osmatches: + writer.startElement("osmatch", + Attributes(dict(name=om.get("name", ""), + accuracy=om.get("accuracy", ""), + line=om.get("line", "")))) + ## Osclass elements + for oc in om['osclasses']: + writer.startElement("osclass", + Attributes(dict(vendor=oc.get("vendor", ""), + osfamily=oc.get("osfamily", ""), + type=oc.get("type", ""), + osgen=oc.get("osgen", ""), + accuracy=oc.get("accuracy", "")))) + writer.endElement("osclass") + writer.endElement("osmatch") + + writer.endElement("os") + # End of OS element + ################### + + # Uptime element + writer.startElement("uptime", + Attributes(dict(seconds=host.uptime.get("seconds", ""), + lastboot=host.uptime.get("lastboot", "")))) + writer.endElement("uptime") + + ##################### + # Sequences elements + ## TCP Sequence element + # Cannot use dict() here, because of the 'class' attribute. + writer.startElement("tcpsequence", + Attributes({"index": host.tcpsequence.get("index", ""), + "difficulty": host.tcpsequence.get("difficulty", ""), + "values": host.tcpsequence.get("values", "")})) + writer.endElement("tcpsequence") + + ## IP ID Sequence element + writer.startElement("ipidsequence", + Attributes({"class": host.ipidsequence.get("class", ""), + "values": host.ipidsequence.get("values", "")})) + writer.endElement("ipidsequence") + + ## TCP TS Sequence element + writer.startElement("tcptssequence", + Attributes({"class": host.tcptssequence.get("class", ""), + "values": host.tcptssequence.get("values", "")})) + writer.endElement("tcptssequence") + # End of sequences elements + ########################### + + ## Trace element + if len(host.trace) > 0: + writer.startElement("trace", + Attributes({"proto": host.trace.get("proto", ""), + "port": host.trace.get("port", "")})) + + if "hops" in host.trace: + for hop in host.trace["hops"]: + writer.startElement("hop", + Attributes({"ttl": hop["ttl"], + "rtt": hop["rtt"], + "ipaddr": hop["ipaddr"], + "host": hop["host"]})) + writer.endElement("hop") + + if "error" in host.trace: + writer.startElement("error", + Attributes({"errorstr": host.trace["error"]})) + writer.endElement("error") + + writer.endElement("trace") + # End of trace element + ########################### + + # End host element + writer.endElement("host") + + def _write_debugging(self, writer): + writer.startElement("debugging", Attributes(dict( + level=str(self.debugging_level)))) + writer.endElement("debugging") + + def _write_verbose(self, writer): + writer.startElement("verbose", Attributes(dict( + level=str(self.verbose_level)))) + writer.endElement("verbose") + + def _write_scaninfo(self, writer): + for scan in self.scaninfo: + writer.startElement("scaninfo", + Attributes(dict(type=scan.get("type", ""), + protocol=scan.get("protocol", ""), + numservices=scan.get("numservices", ""), + services=scan.get("services", "")))) + writer.endElement("scaninfo") + + def _write_nmaprun(self, writer): + writer.startElement("nmaprun", + Attributes(dict(args=str(self.nmap_command), + profile_name=str(self.profile_name), + scanner=str(self.scanner), + start=str(self.start), + startstr=time.ctime( + time.mktime(self.get_date())), + version=str(self.scanner_version), + xmloutputversion=str(XML_OUTPUT_VERSION)))) + + def set_unsaved(self): + self.unsaved = True + + def is_unsaved(self): + return self.unsaved + + +class OverrideEntityResolver(EntityResolver): + """This class overrides the default behavior of xml.sax to download + remote DTDs, instead returning blank strings""" + empty = StringIO() + + def resolveEntity(self, publicId, systemId): + return OverrideEntityResolver.empty + + +def nmap_parser_sax(): + parser = make_parser() + nmap_parser = NmapParserSAX() + + parser.setContentHandler(nmap_parser) + parser.setEntityResolver(OverrideEntityResolver()) + nmap_parser.set_parser(parser) + + return nmap_parser + +NmapParser = nmap_parser_sax + + +if __name__ == '__main__': + import sys + + file_to_parse = sys.argv[1] + + np = NmapParser() + np.parse_file(file_to_parse) + + for host in np.hosts: + print("%s:" % host.ip["addr"]) + print(" Comment:", repr(host.comment)) + print(" TCP sequence:", repr(host.tcpsequence)) + print(" TCP TS sequence:", repr(host.tcptssequence)) + print(" IP ID sequence:", repr(host.ipidsequence)) + print(" Uptime:", repr(host.uptime)) + print(" OS Match:", repr(host.osmatches)) + print(" Ports:") + for p in host.ports: + print("\t%s" % repr(p)) + print(" Ports used:", repr(host.ports_used)) + print(" OS Matches:", repr(host.osmatches)) + print(" Hostnames:", repr(host.hostnames)) + print(" IP:", repr(host.ip)) + print(" IPv6:", repr(host.ipv6)) + print(" MAC:", repr(host.mac)) + print(" State:", repr(host.state)) + if "hops" in host.trace: + print(" Trace:") + for hop in host.trace["hops"]: + print(" ", repr(hop)) + print() -- cgit v1.2.3