# coding: utf-8 from __future__ import division, unicode_literals import io import zipfile import contextlib import tempfile import shutil import string try: import pathlib except ImportError: import pathlib2 as pathlib if not hasattr(contextlib, 'ExitStack'): import contextlib2 contextlib.ExitStack = contextlib2.ExitStack try: import unittest unittest.TestCase.subTest except AttributeError: import unittest2 as unittest import jaraco.itertools import func_timeout import zipp __metaclass__ = type consume = tuple def add_dirs(zf): """ Given a writable zip file zf, inject directory entries for any directories implied by the presence of children. """ for name in zipp.CompleteDirs._implied_dirs(zf.namelist()): zf.writestr(name, b"") return zf def build_alpharep_fixture(): """ Create a zip file with this structure: . ├── a.txt ├── b │ ├── c.txt │ ├── d │ │ └── e.txt │ └── f.txt └── g └── h └── i.txt This fixture has the following key characteristics: - a file at the root (a) - a file two levels deep (b/d/e) - multiple files in a directory (b/c, b/f) - a directory containing only a directory (g/h) "alpha" because it uses alphabet "rep" because it's a representative example """ data = io.BytesIO() zf = zipfile.ZipFile(data, "w") zf.writestr("a.txt", b"content of a") zf.writestr("b/c.txt", b"content of c") zf.writestr("b/d/e.txt", b"content of e") zf.writestr("b/f.txt", b"content of f") zf.writestr("g/h/i.txt", b"content of i") zf.filename = "alpharep.zip" return zf @contextlib.contextmanager def temp_dir(): tmpdir = tempfile.mkdtemp() try: yield pathlib.Path(tmpdir) finally: shutil.rmtree(tmpdir) class TestPath(unittest.TestCase): def setUp(self): self.fixtures = contextlib.ExitStack() self.addCleanup(self.fixtures.close) def zipfile_alpharep(self): with self.subTest(): yield build_alpharep_fixture() with self.subTest(): yield add_dirs(build_alpharep_fixture()) def zipfile_ondisk(self): tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) for alpharep in self.zipfile_alpharep(): buffer = alpharep.fp alpharep.close() path = tmpdir / alpharep.filename with path.open("wb") as strm: strm.write(buffer.getvalue()) yield path def test_iterdir_and_types(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) assert root.is_dir() a, b, g = root.iterdir() assert a.is_file() assert b.is_dir() assert g.is_dir() c, f, d = b.iterdir() assert c.is_file() and f.is_file() e, = d.iterdir() assert e.is_file() h, = g.iterdir() i, = h.iterdir() assert i.is_file() def test_open(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) a, b, g = root.iterdir() with a.open() as strm: data = strm.read() assert data == "content of a" def test_read(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) a, b, g = root.iterdir() assert a.read_text() == "content of a" assert a.read_bytes() == b"content of a" def test_joinpath(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) a = root.joinpath("a") assert a.is_file() e = root.joinpath("b").joinpath("d").joinpath("e.txt") assert e.read_text() == "content of e" def test_traverse_truediv(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) a = root / "a" assert a.is_file() e = root / "b" / "d" / "e.txt" assert e.read_text() == "content of e" def test_traverse_simplediv(self): """ Disable the __future__.division when testing traversal. """ for alpharep in self.zipfile_alpharep(): code = compile( source="zipp.Path(alpharep) / 'a'", filename="(test)", mode="eval", dont_inherit=True, ) eval(code) def test_pathlike_construction(self): """ zipp.Path should be constructable from a path-like object """ for zipfile_ondisk in self.zipfile_ondisk(): pathlike = pathlib.Path(str(zipfile_ondisk)) zipp.Path(pathlike) def test_traverse_pathlike(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) root / pathlib.Path("a") def test_parent(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) assert (root / 'a').parent.at == '' assert (root / 'a' / 'b').parent.at == 'a/' def test_dir_parent(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) assert (root / 'b').parent.at == '' assert (root / 'b/').parent.at == '' def test_missing_dir_parent(self): for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) assert (root / 'missing dir/').parent.at == '' def test_mutability(self): """ If the underlying zipfile is changed, the Path object should reflect that change. """ for alpharep in self.zipfile_alpharep(): root = zipp.Path(alpharep) a, b, g = root.iterdir() alpharep.writestr('foo.txt', b'foo') alpharep.writestr('bar/baz.txt', b'baz') assert any( child.name == 'foo.txt' for child in root.iterdir()) assert (root / 'foo.txt').read_text() == 'foo' baz, = (root / 'bar').iterdir() assert baz.read_text() == 'baz' HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 def huge_zipfile(self): """Create a read-only zipfile with a huge number of entries entries.""" strm = io.BytesIO() zf = zipfile.ZipFile(strm, "w") for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): zf.writestr(entry, entry) zf.mode = 'r' return zf def test_joinpath_constant_time(self): """ Ensure joinpath on items in zipfile is linear time. """ root = zipp.Path(self.huge_zipfile()) entries = jaraco.itertools.Counter(root.iterdir()) for entry in entries: entry.joinpath('suffix') # Check the file iterated all items assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES @func_timeout.func_set_timeout(3) def test_implied_dirs_performance(self): data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] zipp.CompleteDirs._implied_dirs(data)