# SPDX-License-Identifier: GPL-2.0-or-later # FRR ELF xref extractor # # Copyright (C) 2020 David Lamparter for NetDEF, Inc. import sys import os import struct import re import traceback json_dump_args = {} try: import ujson as json json_dump_args["escape_forward_slashes"] = False except ImportError: import json import argparse from clippy.uidhash import uidhash from clippy.elf import * from clippy import frr_top_src, CmdAttr, elf_notes from tiabwarfo import FieldApplicator from xref2vtysh import CommandEntry try: with open(os.path.join(frr_top_src, "python", "xrefstructs.json"), "r") as fd: xrefstructs = json.load(fd) except FileNotFoundError: sys.stderr.write( """ The "xrefstructs.json" file (created by running tiabwarfo.py with the pahole tool available) could not be found. It should be included with the sources. """ ) sys.exit(1) # constants, need to be kept in sync manually... XREFT_EVENTSCHED = 0x100 XREFT_LOGMSG = 0x200 XREFT_DEFUN = 0x300 XREFT_INSTALL_ELEMENT = 0x301 # LOG_* priovals = {} prios = ["0", "1", "2", "E", "W", "N", "I", "D"] class XrelfoJson(object): def dump(self): pass def check(self, wopt): yield from [] def to_dict(self, refs): pass class Xref(ELFDissectStruct, XrelfoJson): struct = "xref" fieldrename = {"type": "typ"} containers = {} def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._container = None if self.xrefdata: self.xrefdata.ref_from(self, self.typ) def container(self): if self._container is None: if self.typ in self.containers: self._container = self.container_of(self.containers[self.typ], "xref") return self._container def check(self, *args, **kwargs): if self._container: yield from self._container.check(*args, **kwargs) class Xrefdata(ELFDissectStruct): struct = "xrefdata" # uid is all zeroes in the data loaded from ELF fieldrename = {"uid": "_uid"} def ref_from(self, xref, typ): self.xref = xref @property def uid(self): if self.hashstr is None: return None return uidhash(self.xref.file, self.hashstr, self.hashu32_0, self.hashu32_1) class XrefPtr(ELFDissectStruct): fields = [ ("xref", "P", Xref), ] class XrefThreadSched(ELFDissectStruct, XrelfoJson): struct = "xref_threadsched" Xref.containers[XREFT_EVENTSCHED] = XrefThreadSched class XrefLogmsg(ELFDissectStruct, XrelfoJson): struct = "xref_logmsg" def _warn_fmt(self, text): lines = text.split("\n") yield ( (self.xref.file, self.xref.line), "%s:%d: %s (in %s())%s\n" % ( self.xref.file, self.xref.line, lines[0], self.xref.func, "".join(["\n" + l for l in lines[1:]]), ), ) fmt_regexes = [ (re.compile(r"([\n\t]+)"), "error: log message contains tab or newline"), # (re.compile(r'^(\s+)'), 'warning: log message starts with whitespace'), ( re.compile(r"^((?:warn(?:ing)?|error):\s*)", re.I), "warning: log message starts with severity", ), ] arg_regexes = [ # the (?" if edf._elffile.bigendian else "<" mem = edf._elffile[note] if edf._elffile.elfclass == 64: start, end = struct.unpack(endian + "QQ", mem) start += note.start end += note.start + 8 else: start, end = struct.unpack(endian + "II", mem) start += note.start end += note.start + 4 ptrs = edf.iter_data(XrefPtr, slice(start, end)) else: if elf_notes: self.note_warn = True sys.stderr.write( """%s: warning: binary has no FRRouting.XREF note %s- one of FRR_MODULE_SETUP, FRR_DAEMON_INFO or XREF_SETUP must be used """ % (orig_filename, orig_filename) ) xrefarray = edf.get_section("xref_array") if xrefarray is None: raise ValueError("file has neither xref note nor xref_array section") ptrs = xrefarray.iter_data(XrefPtr) for ptr in ptrs: if ptr.xref is None: print("NULL xref") continue self._xrefs.append(ptr.xref) container = ptr.xref.container() if container is None: continue container.to_dict(self) return edf def load_json(self, fd): data = json.load(fd) for uid, items in data["refs"].items(): myitems = self["refs"].setdefault(uid, []) for item in items: if item in myitems: continue myitems.append(item) for cmd, items in data["cli"].items(): self["cli"].setdefault(cmd, {}).update(items) return data def check(self, checks): for xref in self._xrefs: yield from xref.check(checks) def main(): argp = argparse.ArgumentParser(description="FRR xref ELF extractor") argp.add_argument("-o", dest="output", type=str, help="write JSON output") argp.add_argument("--out-by-file", type=str, help="write by-file JSON output") argp.add_argument("-c", dest="vtysh_cmds", type=str, help="write vtysh_cmd.c") argp.add_argument("-Wlog-format", action="store_const", const=True) argp.add_argument("-Wlog-args", action="store_const", const=True) argp.add_argument("-Werror", action="store_const", const=True) argp.add_argument("--profile", action="store_const", const=True) argp.add_argument( "binaries", metavar="BINARY", nargs="+", type=str, help="files to read (ELF files or libtool objects)", ) args = argp.parse_args() if args.profile: import cProfile cProfile.runctx("_main(args)", globals(), {"args": args}, sort="cumtime") else: _main(args) def _main(args): errors = 0 xrelfo = Xrelfo() for fn in args.binaries: try: xrelfo.load_file(fn) except: errors += 1 sys.stderr.write("while processing %s:\n" % (fn)) traceback.print_exc() if xrelfo.note_warn and args.Werror: errors += 1 for option in dir(args): if option.startswith("W") and option != "Werror": checks = sorted(xrelfo.check(args)) sys.stderr.write("".join([c[-1] for c in checks])) if args.Werror and len(checks) > 0: errors += 1 break refs = xrelfo["refs"] counts = {} for k, v in refs.items(): strs = set([i["fmtstring"] for i in v]) if len(strs) != 1: print("\033[31;1m%s\033[m" % k) counts[k] = len(v) out = xrelfo outbyfile = {} for uid, locs in refs.items(): for loc in locs: filearray = outbyfile.setdefault(loc["file"], []) loc = dict(loc) del loc["file"] filearray.append(loc) for k in outbyfile.keys(): outbyfile[k] = sorted(outbyfile[k], key=lambda x: x["line"]) if errors: sys.exit(1) if args.output: with open(args.output + ".tmp", "w") as fd: json.dump(out, fd, indent=2, sort_keys=True, **json_dump_args) os.rename(args.output + ".tmp", args.output) if args.out_by_file: with open(args.out_by_file + ".tmp", "w") as fd: json.dump(outbyfile, fd, indent=2, sort_keys=True, **json_dump_args) os.rename(args.out_by_file + ".tmp", args.out_by_file) if args.vtysh_cmds: with open(args.vtysh_cmds + ".tmp", "w") as fd: CommandEntry.run(out, fd) os.rename(args.vtysh_cmds + ".tmp", args.vtysh_cmds) if args.Werror and CommandEntry.warn_counter: sys.exit(1) if __name__ == "__main__": main()