summaryrefslogtreecommitdiffstats
path: root/debian/repack.py
diff options
context:
space:
mode:
Diffstat (limited to 'debian/repack.py')
-rwxr-xr-xdebian/repack.py267
1 files changed, 267 insertions, 0 deletions
diff --git a/debian/repack.py b/debian/repack.py
new file mode 100755
index 0000000000..84d6ca6ae3
--- /dev/null
+++ b/debian/repack.py
@@ -0,0 +1,267 @@
+#!/usr/bin/env python3
+
+from optparse import OptionParser
+import fnmatch
+import struct
+import tarfile
+import zipfile
+from datetime import datetime, timezone
+from io import BytesIO
+import re
+import os
+import sys
+import email.message
+import urllib.request
+from urllib.parse import urlparse
+
+class URLFile(object):
+ '''Simple proxy to urllib.request.urlopen, that responds to seek only if
+ it's called before read. This is enough for tarfile to be happy'''
+
+ def __init__(self, url):
+ self.file = urllib.request.urlopen(url)
+
+ def seekable(self):
+ return True
+
+ def seek(self, offset, whence = os.SEEK_SET):
+ if whence != os.SEEK_SET or offset != 0 or self.read == self._read:
+ raise Exception("unsupported")
+
+ def _read(self, size = -1):
+ return self.file.read(size)
+
+ def read(self, size = -1):
+ self.read = self._read
+ return self._read(size)
+
+ def close(self):
+ self.file.close()
+
+def dirname(filespec):
+ '''Returns os.path.dirname if a file, and '' if an url'''
+ if urlparse(filespec).scheme:
+ return ''
+ return os.path.dirname(filespec)
+
+class TarFilterList(object):
+ def __init__(self, filename):
+ self.patterns = {}
+ for filt in open(filename).readlines():
+ f = filt.strip().split(None, 1)
+ if len(f) == 1:
+ [pat] = f
+ cmd = None
+ else:
+ [pat, cmd] = f
+
+ pat = pat.split(os.sep)
+ self.add_pattern(pat, self.patterns, cmd)
+
+ def add_pattern(self, pat, patterns, cmd):
+ if re.search(r'[\[\?\*]', pat[0]):
+ if not '*' in patterns:
+ patterns['*'] = []
+ patterns['*'].append([os.sep.join(pat), cmd, False])
+ else:
+ if len(pat) >= 2:
+ if not pat[0] in patterns:
+ patterns[pat[0]] = {}
+ self.add_pattern(pat[1:], patterns[pat[0]], cmd)
+ else:
+ if not '*' in patterns:
+ patterns['*'] = []
+ patterns['*'].append([pat[0], cmd, False])
+
+ def match(self, name):
+ name = name.split(os.sep)[1:]
+ if len(name) == 0:
+ return False
+ return self._match(name, self.patterns)
+
+ def _match(self, name, patterns):
+ if len(name) > 1 and name[0] in patterns:
+ cmd = self._match(name[1:], patterns[name[0]])
+ if cmd != False:
+ return cmd
+ if '*' in patterns:
+ for pat in patterns['*']:
+ if fnmatch.fnmatch(name[0], pat[0]) or fnmatch.fnmatch(os.sep.join(name), pat[0]):
+ pat[2] = True
+ return pat[1]
+ return False
+
+ def unused(self, patterns=None, root=''):
+ result = []
+ if root:
+ root += '/'
+ if not patterns:
+ patterns = self.patterns
+ for pat in patterns:
+ if pat != '*':
+ result += self.unused(patterns[pat], root + pat)
+ else:
+ for p in patterns[pat]:
+ if not p[2]:
+ result.append(root + p[0])
+ return result
+
+def file_extension(name):
+ return os.path.splitext(name)[1][1:]
+
+class ZipAsTar(object):
+ def __init__(self, zip):
+ self.zip = zip
+ self.infos = {}
+
+ def close(self):
+ self.zip.close()
+
+ def extractfile(self, info):
+ return self.zip.open(self.infos[info], mode="r")
+
+ def __iter__(self):
+ for info in self.zip.infolist():
+ tar_info = tarfile.TarInfo(info.filename)
+ self.infos[tar_info] = info.filename
+ tar_info.size = info.file_size
+ mtime = datetime(*info.date_time, tzinfo=timezone.utc)
+ tar_info.mtime = mtime.timestamp()
+
+ extra = info.extra
+ while len(extra) >= 4:
+ signature, length = struct.unpack("<HH", extra[:4])
+ assert len(extra) >= length + 4
+ if signature == 0x5455: # extended timestamps
+ (flags,) = struct.unpack("B", extra[4:5])
+ if flags & 0x1: # mtime
+ assert length >= 5
+ (mtime,) = struct.unpack("<L", extra[5:9])
+ tar_info.mtime = float(mtime)
+ extra = extra[length + 4:]
+
+ tar_info.mode = (info.external_attr >> 16) & 0o777
+ if len(info.filename) > tarfile.LENGTH_NAME:
+ tar_info.pax_headers["path"] = info.filename
+
+ yield tar_info
+
+
+def filter_archive(orig, new, filt, topdir = None):
+ filt = TarFilterList(filt)
+ is_url = urlparse(orig).scheme
+ if orig.endswith(".zip"):
+ if is_url:
+ # zipfile needs to seek so doesn't work with the simple URLFile. Only
+ # the l10n packages are zipfiles and they are small so buffering the
+ # whole archive in memory is fine
+ orig_bytes = urllib.request.urlopen(orig).read()
+ zip = zipfile.ZipFile(BytesIO(orig_bytes))
+ else:
+ zip = zipfile.ZipFile(orig)
+ tar = ZipAsTar(zip)
+ format = tarfile.PAX_FORMAT
+ else:
+ fileobj = URLFile(orig) if is_url else None
+ tar = tarfile.open(orig, "r:" + file_extension(orig), fileobj)
+ format = tar.format
+
+ new_tar = tarfile.open(new + ".new", "w:" + file_extension(new), format=format)
+ for info in tar:
+ if topdir:
+ namefilt = lambda n: "/".join([topdir] + n.split("/")[1:])
+ info.name = namefilt(info.name)
+ if "path" in info.pax_headers:
+ info.pax_headers["path"] = namefilt(info.pax_headers["path"])
+
+ do_filt = filt.match(info.name)
+ if do_filt == None:
+ print("Removing", info.name, file=sys.stderr)
+ continue
+
+ if info.isfile():
+ file = tar.extractfile(info)
+ if do_filt:
+ print("Filtering", info.name, file=sys.stderr)
+ orig = file
+ file = BytesIO()
+ the_filt = lambda l: l
+ if do_filt[0].isalpha():
+ f = do_filt.split(do_filt[1])
+ if f[0] == 's':
+ the_filt = lambda l: re.sub(f[1], f[2],
+ l.decode()).encode('utf-8')
+ else:
+ f = do_filt.split(do_filt[0])
+ if f[2] == 'd':
+ the_filt = lambda l: b'' if re.search(f[1],
+ l.decode()) else l
+ for l in orig.readlines():
+ file.write(the_filt(l))
+ info.size = file.tell()
+ file.seek(0);
+ new_tar.addfile(info, file)
+ else:
+ new_tar.addfile(info)
+
+ tar.close()
+ new_tar.close()
+ os.rename(new_tar.name, new)
+ unused = filt.unused()
+ if unused:
+ print('Unused filters:')
+ print('', '\n '.join(unused))
+
+def get_package_name():
+ control = os.path.join(os.path.dirname(__file__), "control")
+ return email.message.Message(open(control))["Source"]
+
+def main():
+ parser = OptionParser()
+ parser.add_option("-u", "--upstream-version", dest="upstream_version",
+ help="define upstream version number to use when creating the file",
+ metavar="VERSION")
+ parser.add_option("-f", "--filter", dest="filter",
+ help="use the given filter list", metavar="FILE")
+ parser.add_option("-p", "--package", dest="package",
+ help="use the given package name", metavar="NAME")
+ parser.add_option("-o", "--output", dest="new_file",
+ help="save the filtered tarball as the given file name", metavar="FILE")
+ parser.add_option("-t", "--topdir", dest="topdir",
+ help="replace the top directory with the given name", metavar="NAME")
+ (options, args) = parser.parse_args()
+
+ if not options.upstream_version and not options.new_file:
+ parser.error("Need an upstream version")
+ return
+
+ if len(args) < 1:
+ parser.error("Too few arguments")
+ return
+ if len(args) > 1:
+ parser.error("Too many arguments")
+ return
+
+ if not options.filter:
+ options.filter = os.path.join(os.path.dirname(__file__), "source.filter")
+ if not options.package:
+ options.package = get_package_name()
+
+ if options.new_file:
+ new_file = options.new_file
+
+ if os.path.islink(args[0]):
+ orig = os.path.realpath(args[0])
+ if not new_file:
+ new_file = args[0]
+ else:
+ orig = args[0]
+ compression = file_extension(orig)
+ if not new_file:
+ new_file = options.package + "_" + options.upstream_version + ".orig.tar." + compression
+ new_file = os.path.realpath(os.path.join(dirname(orig), new_file))
+ print(orig, new_file)
+ filter_archive(orig, new_file, options.filter, options.topdir)
+
+if __name__ == '__main__':
+ main()