#!/usr/bin/env python3 from optparse import OptionParser import fnmatch import tarfile from io import BytesIO import re import os import sys import email.message import urllib.request from urllib.parse import urlparse class URLFile(object): '''Simple proxy to urllib.request.urlopen, that responds to seek only if it's called before read. This is enough for tarfile to be happy''' def __init__(self, url): self.file = urllib.request.urlopen(url) def seekable(self): return True def seek(self, offset, whence = os.SEEK_SET): if whence != os.SEEK_SET or offset != 0 or self.read == self._read: raise Exception("unsupported") def _read(self, size = -1): return self.file.read(size) def read(self, size = -1): self.read = self._read return self._read(size) def close(self): self.file.close() def dirname(filespec): '''Returns os.path.dirname if a file, and '' if an url''' if urlparse(filespec).scheme: return '' return os.path.dirname(filespec) class TarFilterList(object): def __init__(self, filename): self.patterns = {} for filt in open(filename).readlines(): f = filt.strip().split(None, 1) if len(f) == 1: [pat] = f cmd = None else: [pat, cmd] = f pat = pat.split(os.sep) self.add_pattern(pat, self.patterns, cmd) def add_pattern(self, pat, patterns, cmd): if re.search(r'[\[\?\*]', pat[0]): if not '*' in patterns: patterns['*'] = [] patterns['*'].append([os.sep.join(pat), cmd, False]) else: if len(pat) >= 2: if not pat[0] in patterns: patterns[pat[0]] = {} self.add_pattern(pat[1:], patterns[pat[0]], cmd) else: if not '*' in patterns: patterns['*'] = [] patterns['*'].append([pat[0], cmd, False]) def match(self, name): name = name.split(os.sep)[1:] if len(name) == 0: return False return self._match(name, self.patterns) def _match(self, name, patterns): if len(name) > 1 and name[0] in patterns: cmd = self._match(name[1:], patterns[name[0]]) if cmd != False: return cmd if '*' in patterns: for pat in patterns['*']: if fnmatch.fnmatch(name[0], pat[0]) or fnmatch.fnmatch(os.sep.join(name), pat[0]): pat[2] = True return pat[1] return False def unused(self, patterns=None, root=''): result = [] if root: root += '/' if not patterns: patterns = self.patterns for pat in patterns: if pat != '*': result += self.unused(patterns[pat], root + pat) else: for p in patterns[pat]: if not p[2]: result.append(root + p[0]) return result def file_extension(name): return os.path.splitext(name)[1][1:] def filter_tar(orig, new, filt, topdir = None): filt = TarFilterList(filt) if urlparse(orig).scheme: tar = tarfile.open(orig, "r:" + file_extension(orig), URLFile(orig)) else: tar = tarfile.open(orig, "r:" + file_extension(orig)) new_tar = tarfile.open(new + ".new", "w:" + file_extension(new), format=tar.format) while True: info = tar.next() if not info: break if topdir: namefilt = lambda n: "/".join([topdir] + n.split("/")[1:]) info.name = namefilt(info.name) if "path" in info.pax_headers: info.pax_headers["path"] = namefilt(info.pax_headers["path"]) do_filt = filt.match(info.name) if do_filt == None: print("Removing", info.name, file=sys.stderr) continue if info.isfile(): file = tar.extractfile(info) if do_filt: print("Filtering", info.name, file=sys.stderr) orig = file file = BytesIO() the_filt = lambda l: l if do_filt[0].isalpha(): f = do_filt.split(do_filt[1]) if f[0] == 's': the_filt = lambda l: re.sub(f[1], f[2], l.decode()).encode('utf-8') else: f = do_filt.split(do_filt[0]) if f[2] == 'd': the_filt = lambda l: b'' if re.search(f[1], l.decode()) else l for l in orig.readlines(): file.write(the_filt(l)) info.size = file.tell() file.seek(0); new_tar.addfile(info, file) else: new_tar.addfile(info) tar.close() new_tar.close() os.rename(new_tar.name, new) unused = filt.unused() if unused: print('Unused filters:') print('', '\n '.join(unused)) def get_package_name(): control = os.path.join(os.path.dirname(__file__), "control") return email.message.Message(open(control))["Source"] def main(): parser = OptionParser() parser.add_option("-u", "--upstream-version", dest="upstream_version", help="define upstream version number to use when creating the file", metavar="VERSION") parser.add_option("-f", "--filter", dest="filter", help="use the given filter list", metavar="FILE") parser.add_option("-p", "--package", dest="package", help="use the given package name", metavar="NAME") parser.add_option("-o", "--output", dest="new_file", help="save the filtered tarball as the given file name", metavar="FILE") parser.add_option("-t", "--topdir", dest="topdir", help="replace the top directory with the given name", metavar="NAME") (options, args) = parser.parse_args() if not options.upstream_version and not options.new_file: parser.error("Need an upstream version") return if len(args) < 1: parser.error("Too few arguments") return if len(args) > 1: parser.error("Too many arguments") return if not options.filter: options.filter = os.path.join(os.path.dirname(__file__), "source.filter") if not options.package: options.package = get_package_name() if options.new_file: new_file = options.new_file if os.path.islink(args[0]): orig = os.path.realpath(args[0]) if not new_file: new_file = args[0] else: orig = args[0] compression = file_extension(orig) if not new_file: new_file = options.package + "_" + options.upstream_version + ".orig.tar." + compression new_file = os.path.realpath(os.path.join(dirname(orig), new_file)) print(orig, new_file) filter_tar(orig, new_file, options.filter, options.topdir) if __name__ == '__main__': main()