1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
#!/usr/bin/env python3
import os
import tarfile
import zipfile
import rpmfile
class ZFile (object):
def __init__(self, path, mode='r', ext=None):
super(ZFile, self).__init__()
if ext is not None:
_ext = ext
else:
_ext = os.path.splitext(path)[-1]
if _ext.startswith('.'):
_ext = _ext[1:]
if zipfile.is_zipfile(path) or _ext == 'zip':
self.f = zipfile.ZipFile(path, mode)
elif tarfile.is_tarfile(path) or _ext in ('tar', 'tgz', 'gz'):
self.f = tarfile.open(path, mode)
elif _ext == 'rpm':
self.f = rpmfile.open(path, mode + 'b')
else:
raise ValueError('Unsupported file extension: %s' % path)
def __enter__(self):
return self
def __exit__(self, *args):
if callable(getattr(self.f, 'close', None)):
self.f.close()
def getnames(self):
if isinstance(self.f, zipfile.ZipFile):
return self.f.namelist()
elif isinstance(self.f, tarfile.TarFile):
return self.f.getnames()
elif isinstance(self.f, rpmfile.RPMFile):
return [x.name for x in self.f.getmembers()]
else:
raise NotImplementedError
def headers(self):
if isinstance(self.f, rpmfile.RPMFile):
return self.f.headers
else:
return dict()
def extract_to(self, member, path):
""" Extract compress file's \\p member to \\p path
If \\p path is a directory the member's basename will used as
filename, otherwise path is considered the full file path name. """
if not os.path.isdir(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if os.path.isdir(path):
path = os.path.join(path, os.path.basename(member))
with open(path, 'wb') as of:
if isinstance(self.f, zipfile.ZipFile):
zf = self.f.open(member)
else:
zf = self.f.extractfile(member)
while True:
b = zf.read(1024 * 100)
if b:
of.write(b)
else:
break
zf.close()
@classmethod
def extract(cls, zpath, member, outpath):
"""
Extract file member (full internal path) to output from
archive zpath.
"""
with ZFile(zpath) as zf:
zf.extract_to(member, outpath)
@classmethod
def compress(cls, zpath, paths, stripcnt=0, ext=None):
"""
Create new compressed file \\p zpath containing files in \\p paths
"""
with ZFile(zpath, 'w', ext=ext) as zf:
for p in paths:
outp = os.path.sep.join(p.split(os.path.sep)[stripcnt:])
print('zip %s to %s (stripcnt %d)' % (p, outp, stripcnt))
zf.f.write(p, outp)
|