1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# -*- coding: utf-8 -*-
"""Usage:
7zx.py [--help | options] <zipfiles>...
Options:
-h, --help Print this help and exit
-v, --version Print version and exit
-c, --compressed Use compressed (instead of uncompressed) file sizes
-s, --silent Do not print one row per zip file
-y, --yes Assume yes to all queries (for extraction)
-D=<level>, --debug=<level>
Print various types of debugging information. Choices:
CRITICAL|FATAL
ERROR
WARN(ING)
[default: INFO]
DEBUG
NOTSET
-d, --debug-trace Print lots of debugging information (-D NOTSET)
"""
import io
import logging
import os
import pty
import re
import subprocess # nosec
from argopt import argopt
from tqdm import tqdm
__author__ = "Casper da Costa-Luis <casper.dcl@physics.org>"
__licence__ = "MPLv2.0"
__version__ = "0.2.2"
__license__ = __licence__
RE_SCN = re.compile(r"([0-9]+)\s+([0-9]+)\s+(.*)$", flags=re.M)
def main():
args = argopt(__doc__, version=__version__).parse_args()
if args.debug_trace:
args.debug = "NOTSET"
logging.basicConfig(level=getattr(logging, args.debug, logging.INFO),
format='%(levelname)s:%(message)s')
log = logging.getLogger(__name__)
log.debug(args)
# Get compressed sizes
zips = {}
for fn in args.zipfiles:
info = subprocess.check_output(["7z", "l", fn]).strip() # nosec
finfo = RE_SCN.findall(info) # size|compressed|name
# builtin test: last line should be total sizes
log.debug(finfo)
totals = map(int, finfo[-1][:2])
# log.debug(totals)
for s in range(2): # size|compressed totals
totals_s = sum(map(int, (inf[s] for inf in finfo[:-1])))
if totals_s != totals[s]:
log.warn("%s: individual total %d != 7z total %d",
fn, totals_s, totals[s])
fcomp = {n: int(c if args.compressed else u) for (u, c, n) in finfo[:-1]}
# log.debug(fcomp)
# zips : {'zipname' : {'filename' : int(size)}}
zips[fn] = fcomp
# Extract
cmd7zx = ["7z", "x", "-bd"]
if args.yes:
cmd7zx += ["-y"]
log.info("Extracting from %d file(s)", len(zips))
with tqdm(total=sum(sum(fcomp.values()) for fcomp in zips.values()),
unit="B", unit_scale=True) as tall:
for fn, fcomp in zips.items():
md, sd = pty.openpty()
ex = subprocess.Popen( # nosec
cmd7zx + [fn],
bufsize=1,
stdout=md, # subprocess.PIPE,
stderr=subprocess.STDOUT)
os.close(sd)
with io.open(md, mode="rU", buffering=1) as m:
with tqdm(total=sum(fcomp.values()), disable=len(zips) < 2,
leave=False, unit="B", unit_scale=True) as t:
if not hasattr(t, "start_t"): # disabled
t.start_t = tall._time()
while True:
try:
l_raw = m.readline()
except IOError:
break
ln = l_raw.strip()
if ln.startswith("Extracting"):
exname = ln[len("Extracting"):].lstrip()
s = fcomp.get(exname, 0) # 0 is likely folders
t.update(s)
tall.update(s)
elif ln:
if not any(
ln.startswith(i)
for i in ("7-Zip ", "p7zip Version ",
"Everything is Ok", "Folders: ",
"Files: ", "Size: ", "Compressed: ")):
if ln.startswith("Processing archive: "):
if not args.silent:
t.write(t.format_interval(
t.start_t - tall.start_t) + ' ' +
ln.replace("Processing archive: ", ""))
else:
t.write(ln)
ex.wait()
main.__doc__ = __doc__
if __name__ == "__main__":
main()
|