#!/usr/bin/env python3 from optparse import OptionParser import fnmatch import struct import tarfile import zipfile from datetime import datetime, timezone from io import BytesIO import re import os import sys import email.message import urllib.request from urllib.parse import urlparse class URLFile(object): '''Simple proxy to urllib.request.urlopen, that responds to seek only if it's called before read. This is enough for tarfile to be happy''' def __init__(self, url): self.file = urllib.request.urlopen(url) def seekable(self): return True def seek(self, offset, whence = os.SEEK_SET): if whence != os.SEEK_SET or offset != 0 or self.read == self._read: raise Exception("unsupported") def _read(self, size = -1): return self.file.read(size) def read(self, size = -1): self.read = self._read return self._read(size) def close(self): self.file.close() def dirname(filespec): '''Returns os.path.dirname if a file, and '' if an url''' if urlparse(filespec).scheme: return '' return os.path.dirname(filespec) class TarFilterList(object): def __init__(self, filename): self.patterns = {} for filt in open(filename).readlines(): f = filt.strip().split(None, 1) if len(f) == 1: [pat] = f cmd = None else: [pat, cmd] = f pat = pat.split(os.sep) self.add_pattern(pat, self.patterns, cmd) def add_pattern(self, pat, patterns, cmd): if re.search(r'[\[\?\*]', pat[0]): if not '*' in patterns: patterns['*'] = [] patterns['*'].append([os.sep.join(pat), cmd, False]) else: if len(pat) >= 2: if not pat[0] in patterns: patterns[pat[0]] = {} self.add_pattern(pat[1:], patterns[pat[0]], cmd) else: if not '*' in patterns: patterns['*'] = [] patterns['*'].append([pat[0], cmd, False]) def match(self, name): name = name.split(os.sep)[1:] if len(name) == 0: return False return self._match(name, self.patterns) def _match(self, name, patterns): if len(name) > 1 and name[0] in patterns: cmd = self._match(name[1:], patterns[name[0]]) if cmd != False: return cmd if '*' in patterns: for pat in patterns['*']: if fnmatch.fnmatch(name[0], pat[0]) or fnmatch.fnmatch(os.sep.join(name), pat[0]): pat[2] = True return pat[1] return False def unused(self, patterns=None, root=''): result = [] if root: root += '/' if not patterns: patterns = self.patterns for pat in patterns: if pat != '*': result += self.unused(patterns[pat], root + pat) else: for p in patterns[pat]: if not p[2]: result.append(root + p[0]) return result def file_extension(name): return os.path.splitext(name)[1][1:] class ZipAsTar(object): def __init__(self, zip): self.zip = zip self.infos = {} def close(self): self.zip.close() def extractfile(self, info): return self.zip.open(self.infos[info], mode="r") def __iter__(self): for info in self.zip.infolist(): tar_info = tarfile.TarInfo(info.filename) self.infos[tar_info] = info.filename tar_info.size = info.file_size mtime = datetime(*info.date_time, tzinfo=timezone.utc) tar_info.mtime = mtime.timestamp() extra = info.extra while len(extra) >= 4: signature, length = struct.unpack("= length + 4 if signature == 0x5455: # extended timestamps (flags,) = struct.unpack("B", extra[4:5]) if flags & 0x1: # mtime assert length >= 5 (mtime,) = struct.unpack("> 16) & 0o777 if len(info.filename) > tarfile.LENGTH_NAME: tar_info.pax_headers["path"] = info.filename yield tar_info def filter_archive(orig, new, filt, topdir = None): filt = TarFilterList(filt) is_url = urlparse(orig).scheme if orig.endswith(".zip"): if is_url: # zipfile needs to seek so doesn't work with the simple URLFile. Only # the l10n packages are zipfiles and they are small so buffering the # whole archive in memory is fine orig_bytes = urllib.request.urlopen(orig).read() zip = zipfile.ZipFile(BytesIO(orig_bytes)) else: zip = zipfile.ZipFile(orig) tar = ZipAsTar(zip) format = tarfile.PAX_FORMAT else: fileobj = URLFile(orig) if is_url else None tar = tarfile.open(orig, "r:" + file_extension(orig), fileobj) format = tar.format new_tar = tarfile.open(new + ".new", "w:" + file_extension(new), format=format) for info in tar: if topdir: namefilt = lambda n: "/".join([topdir] + n.split("/")[1:]) info.name = namefilt(info.name) if "path" in info.pax_headers: info.pax_headers["path"] = namefilt(info.pax_headers["path"]) do_filt = filt.match(info.name) if do_filt == None: print("Removing", info.name, file=sys.stderr) continue if info.isfile(): file = tar.extractfile(info) if do_filt: print("Filtering", info.name, file=sys.stderr) orig = file file = BytesIO() the_filt = lambda l: l if do_filt[0].isalpha(): f = do_filt.split(do_filt[1]) if f[0] == 's': the_filt = lambda l: re.sub(f[1], f[2], l.decode()).encode('utf-8') else: f = do_filt.split(do_filt[0]) if f[2] == 'd': the_filt = lambda l: b'' if re.search(f[1], l.decode()) else l for l in orig.readlines(): file.write(the_filt(l)) info.size = file.tell() file.seek(0); new_tar.addfile(info, file) else: new_tar.addfile(info) tar.close() new_tar.close() os.rename(new_tar.name, new) unused = filt.unused() if unused: print('Unused filters:') print('', '\n '.join(unused)) def get_package_name(): control = os.path.join(os.path.dirname(__file__), "control") return email.message.Message(open(control))["Source"] def main(): parser = OptionParser() parser.add_option("-u", "--upstream-version", dest="upstream_version", help="define upstream version number to use when creating the file", metavar="VERSION") parser.add_option("-f", "--filter", dest="filter", help="use the given filter list", metavar="FILE") parser.add_option("-p", "--package", dest="package", help="use the given package name", metavar="NAME") parser.add_option("-o", "--output", dest="new_file", help="save the filtered tarball as the given file name", metavar="FILE") parser.add_option("-t", "--topdir", dest="topdir", help="replace the top directory with the given name", metavar="NAME") (options, args) = parser.parse_args() if not options.upstream_version and not options.new_file: parser.error("Need an upstream version") return if len(args) < 1: parser.error("Too few arguments") return if len(args) > 1: parser.error("Too many arguments") return if not options.filter: options.filter = os.path.join(os.path.dirname(__file__), "source.filter") if not options.package: options.package = get_package_name() if options.new_file: new_file = options.new_file if os.path.islink(args[0]): orig = os.path.realpath(args[0]) if not new_file: new_file = args[0] else: orig = args[0] compression = file_extension(orig) if not new_file: new_file = options.package + "_" + options.upstream_version + ".orig.tar." + compression new_file = os.path.realpath(os.path.join(dirname(orig), new_file)) print(orig, new_file) filter_archive(orig, new_file, options.filter, options.topdir) if __name__ == '__main__': main()