diff options
Diffstat (limited to '')
-rwxr-xr-x | debian/repack.py | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/debian/repack.py b/debian/repack.py new file mode 100755 index 0000000000..84d6ca6ae3 --- /dev/null +++ b/debian/repack.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 + +from optparse import OptionParser +import fnmatch +import struct +import tarfile +import zipfile +from datetime import datetime, timezone +from io import BytesIO +import re +import os +import sys +import email.message +import urllib.request +from urllib.parse import urlparse + +class URLFile(object): + '''Simple proxy to urllib.request.urlopen, that responds to seek only if + it's called before read. This is enough for tarfile to be happy''' + + def __init__(self, url): + self.file = urllib.request.urlopen(url) + + def seekable(self): + return True + + def seek(self, offset, whence = os.SEEK_SET): + if whence != os.SEEK_SET or offset != 0 or self.read == self._read: + raise Exception("unsupported") + + def _read(self, size = -1): + return self.file.read(size) + + def read(self, size = -1): + self.read = self._read + return self._read(size) + + def close(self): + self.file.close() + +def dirname(filespec): + '''Returns os.path.dirname if a file, and '' if an url''' + if urlparse(filespec).scheme: + return '' + return os.path.dirname(filespec) + +class TarFilterList(object): + def __init__(self, filename): + self.patterns = {} + for filt in open(filename).readlines(): + f = filt.strip().split(None, 1) + if len(f) == 1: + [pat] = f + cmd = None + else: + [pat, cmd] = f + + pat = pat.split(os.sep) + self.add_pattern(pat, self.patterns, cmd) + + def add_pattern(self, pat, patterns, cmd): + if re.search(r'[\[\?\*]', pat[0]): + if not '*' in patterns: + patterns['*'] = [] + patterns['*'].append([os.sep.join(pat), cmd, False]) + else: + if len(pat) >= 2: + if not pat[0] in patterns: + patterns[pat[0]] = {} + self.add_pattern(pat[1:], patterns[pat[0]], cmd) + else: + if not '*' in patterns: + patterns['*'] = [] + patterns['*'].append([pat[0], cmd, False]) + + def match(self, name): + name = name.split(os.sep)[1:] + if len(name) == 0: + return False + return self._match(name, self.patterns) + + def _match(self, name, patterns): + if len(name) > 1 and name[0] in patterns: + cmd = self._match(name[1:], patterns[name[0]]) + if cmd != False: + return cmd + if '*' in patterns: + for pat in patterns['*']: + if fnmatch.fnmatch(name[0], pat[0]) or fnmatch.fnmatch(os.sep.join(name), pat[0]): + pat[2] = True + return pat[1] + return False + + def unused(self, patterns=None, root=''): + result = [] + if root: + root += '/' + if not patterns: + patterns = self.patterns + for pat in patterns: + if pat != '*': + result += self.unused(patterns[pat], root + pat) + else: + for p in patterns[pat]: + if not p[2]: + result.append(root + p[0]) + return result + +def file_extension(name): + return os.path.splitext(name)[1][1:] + +class ZipAsTar(object): + def __init__(self, zip): + self.zip = zip + self.infos = {} + + def close(self): + self.zip.close() + + def extractfile(self, info): + return self.zip.open(self.infos[info], mode="r") + + def __iter__(self): + for info in self.zip.infolist(): + tar_info = tarfile.TarInfo(info.filename) + self.infos[tar_info] = info.filename + tar_info.size = info.file_size + mtime = datetime(*info.date_time, tzinfo=timezone.utc) + tar_info.mtime = mtime.timestamp() + + extra = info.extra + while len(extra) >= 4: + signature, length = struct.unpack("<HH", extra[:4]) + assert len(extra) >= length + 4 + if signature == 0x5455: # extended timestamps + (flags,) = struct.unpack("B", extra[4:5]) + if flags & 0x1: # mtime + assert length >= 5 + (mtime,) = struct.unpack("<L", extra[5:9]) + tar_info.mtime = float(mtime) + extra = extra[length + 4:] + + tar_info.mode = (info.external_attr >> 16) & 0o777 + if len(info.filename) > tarfile.LENGTH_NAME: + tar_info.pax_headers["path"] = info.filename + + yield tar_info + + +def filter_archive(orig, new, filt, topdir = None): + filt = TarFilterList(filt) + is_url = urlparse(orig).scheme + if orig.endswith(".zip"): + if is_url: + # zipfile needs to seek so doesn't work with the simple URLFile. Only + # the l10n packages are zipfiles and they are small so buffering the + # whole archive in memory is fine + orig_bytes = urllib.request.urlopen(orig).read() + zip = zipfile.ZipFile(BytesIO(orig_bytes)) + else: + zip = zipfile.ZipFile(orig) + tar = ZipAsTar(zip) + format = tarfile.PAX_FORMAT + else: + fileobj = URLFile(orig) if is_url else None + tar = tarfile.open(orig, "r:" + file_extension(orig), fileobj) + format = tar.format + + new_tar = tarfile.open(new + ".new", "w:" + file_extension(new), format=format) + for info in tar: + if topdir: + namefilt = lambda n: "/".join([topdir] + n.split("/")[1:]) + info.name = namefilt(info.name) + if "path" in info.pax_headers: + info.pax_headers["path"] = namefilt(info.pax_headers["path"]) + + do_filt = filt.match(info.name) + if do_filt == None: + print("Removing", info.name, file=sys.stderr) + continue + + if info.isfile(): + file = tar.extractfile(info) + if do_filt: + print("Filtering", info.name, file=sys.stderr) + orig = file + file = BytesIO() + the_filt = lambda l: l + if do_filt[0].isalpha(): + f = do_filt.split(do_filt[1]) + if f[0] == 's': + the_filt = lambda l: re.sub(f[1], f[2], + l.decode()).encode('utf-8') + else: + f = do_filt.split(do_filt[0]) + if f[2] == 'd': + the_filt = lambda l: b'' if re.search(f[1], + l.decode()) else l + for l in orig.readlines(): + file.write(the_filt(l)) + info.size = file.tell() + file.seek(0); + new_tar.addfile(info, file) + else: + new_tar.addfile(info) + + tar.close() + new_tar.close() + os.rename(new_tar.name, new) + unused = filt.unused() + if unused: + print('Unused filters:') + print('', '\n '.join(unused)) + +def get_package_name(): + control = os.path.join(os.path.dirname(__file__), "control") + return email.message.Message(open(control))["Source"] + +def main(): + parser = OptionParser() + parser.add_option("-u", "--upstream-version", dest="upstream_version", + help="define upstream version number to use when creating the file", + metavar="VERSION") + parser.add_option("-f", "--filter", dest="filter", + help="use the given filter list", metavar="FILE") + parser.add_option("-p", "--package", dest="package", + help="use the given package name", metavar="NAME") + parser.add_option("-o", "--output", dest="new_file", + help="save the filtered tarball as the given file name", metavar="FILE") + parser.add_option("-t", "--topdir", dest="topdir", + help="replace the top directory with the given name", metavar="NAME") + (options, args) = parser.parse_args() + + if not options.upstream_version and not options.new_file: + parser.error("Need an upstream version") + return + + if len(args) < 1: + parser.error("Too few arguments") + return + if len(args) > 1: + parser.error("Too many arguments") + return + + if not options.filter: + options.filter = os.path.join(os.path.dirname(__file__), "source.filter") + if not options.package: + options.package = get_package_name() + + if options.new_file: + new_file = options.new_file + + if os.path.islink(args[0]): + orig = os.path.realpath(args[0]) + if not new_file: + new_file = args[0] + else: + orig = args[0] + compression = file_extension(orig) + if not new_file: + new_file = options.package + "_" + options.upstream_version + ".orig.tar." + compression + new_file = os.path.realpath(os.path.join(dirname(orig), new_file)) + print(orig, new_file) + filter_archive(orig, new_file, options.filter, options.topdir) + +if __name__ == '__main__': + main() |