diff options
Diffstat (limited to '')
-rw-r--r-- | python/mozbuild/mozpack/test/test_copier.py | 547 |
1 files changed, 547 insertions, 0 deletions
diff --git a/python/mozbuild/mozpack/test/test_copier.py b/python/mozbuild/mozpack/test/test_copier.py new file mode 100644 index 0000000000..ea2b0eae80 --- /dev/null +++ b/python/mozbuild/mozpack/test/test_copier.py @@ -0,0 +1,547 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import os +import stat +import unittest + +import mozpack.path as mozpath +import mozunit +import six +from mozpack.copier import FileCopier, FileRegistry, FileRegistrySubtree, Jarrer +from mozpack.errors import ErrorMessage +from mozpack.files import ExistingFile, GeneratedFile +from mozpack.mozjar import JarReader +from mozpack.test.test_files import MatchTestTemplate, MockDest, TestWithTmpDir + + +class BaseTestFileRegistry(MatchTestTemplate): + def add(self, path): + self.registry.add(path, GeneratedFile(path)) + + def do_check(self, pattern, result): + self.checked = True + if result: + self.assertTrue(self.registry.contains(pattern)) + else: + self.assertFalse(self.registry.contains(pattern)) + self.assertEqual(self.registry.match(pattern), result) + + def do_test_file_registry(self, registry): + self.registry = registry + self.registry.add("foo", GeneratedFile(b"foo")) + bar = GeneratedFile(b"bar") + self.registry.add("bar", bar) + self.assertEqual(self.registry.paths(), ["foo", "bar"]) + self.assertEqual(self.registry["bar"], bar) + + self.assertRaises( + ErrorMessage, self.registry.add, "foo", GeneratedFile(b"foo2") + ) + + self.assertRaises(ErrorMessage, self.registry.remove, "qux") + + self.assertRaises( + ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar") + ) + self.assertRaises( + ErrorMessage, self.registry.add, "foo/bar/baz", GeneratedFile(b"foobar") + ) + + self.assertEqual(self.registry.paths(), ["foo", "bar"]) + + self.registry.remove("foo") + self.assertEqual(self.registry.paths(), ["bar"]) + self.registry.remove("bar") + self.assertEqual(self.registry.paths(), []) + + self.prepare_match_test() + self.do_match_test() + self.assertTrue(self.checked) + self.assertEqual( + self.registry.paths(), + [ + "bar", + "foo/bar", + "foo/baz", + "foo/qux/1", + "foo/qux/bar", + "foo/qux/2/test", + "foo/qux/2/test2", + ], + ) + + self.registry.remove("foo/qux") + self.assertEqual(self.registry.paths(), ["bar", "foo/bar", "foo/baz"]) + + self.registry.add("foo/qux", GeneratedFile(b"fooqux")) + self.assertEqual( + self.registry.paths(), ["bar", "foo/bar", "foo/baz", "foo/qux"] + ) + self.registry.remove("foo/b*") + self.assertEqual(self.registry.paths(), ["bar", "foo/qux"]) + + self.assertEqual([f for f, c in self.registry], ["bar", "foo/qux"]) + self.assertEqual(len(self.registry), 2) + + self.add("foo/.foo") + self.assertTrue(self.registry.contains("foo/.foo")) + + def do_test_registry_paths(self, registry): + self.registry = registry + + # Can't add a file if it requires a directory in place of a + # file we also require. + self.registry.add("foo", GeneratedFile(b"foo")) + self.assertRaises( + ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar") + ) + + # Can't add a file if we already have a directory there. + self.registry.add("bar/baz", GeneratedFile(b"barbaz")) + self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar")) + + # Bump the count of things that require bar/ to 2. + self.registry.add("bar/zot", GeneratedFile(b"barzot")) + self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar")) + + # Drop the count of things that require bar/ to 1. + self.registry.remove("bar/baz") + self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar")) + + # Drop the count of things that require bar/ to 0. + self.registry.remove("bar/zot") + self.registry.add("bar/zot", GeneratedFile(b"barzot")) + + +class TestFileRegistry(BaseTestFileRegistry, unittest.TestCase): + def test_partial_paths(self): + cases = { + "foo/bar/baz/zot": ["foo/bar/baz", "foo/bar", "foo"], + "foo/bar": ["foo"], + "bar": [], + } + reg = FileRegistry() + for path, parts in six.iteritems(cases): + self.assertEqual(reg._partial_paths(path), parts) + + def test_file_registry(self): + self.do_test_file_registry(FileRegistry()) + + def test_registry_paths(self): + self.do_test_registry_paths(FileRegistry()) + + def test_required_directories(self): + self.registry = FileRegistry() + + self.registry.add("foo", GeneratedFile(b"foo")) + self.assertEqual(self.registry.required_directories(), set()) + + self.registry.add("bar/baz", GeneratedFile(b"barbaz")) + self.assertEqual(self.registry.required_directories(), {"bar"}) + + self.registry.add("bar/zot", GeneratedFile(b"barzot")) + self.assertEqual(self.registry.required_directories(), {"bar"}) + + self.registry.add("bar/zap/zot", GeneratedFile(b"barzapzot")) + self.assertEqual(self.registry.required_directories(), {"bar", "bar/zap"}) + + self.registry.remove("bar/zap/zot") + self.assertEqual(self.registry.required_directories(), {"bar"}) + + self.registry.remove("bar/baz") + self.assertEqual(self.registry.required_directories(), {"bar"}) + + self.registry.remove("bar/zot") + self.assertEqual(self.registry.required_directories(), set()) + + self.registry.add("x/y/z", GeneratedFile(b"xyz")) + self.assertEqual(self.registry.required_directories(), {"x", "x/y"}) + + +class TestFileRegistrySubtree(BaseTestFileRegistry, unittest.TestCase): + def test_file_registry_subtree_base(self): + registry = FileRegistry() + self.assertEqual(registry, FileRegistrySubtree("", registry)) + self.assertNotEqual(registry, FileRegistrySubtree("base", registry)) + + def create_registry(self): + registry = FileRegistry() + registry.add("foo/bar", GeneratedFile(b"foo/bar")) + registry.add("baz/qux", GeneratedFile(b"baz/qux")) + return FileRegistrySubtree("base/root", registry) + + def test_file_registry_subtree(self): + self.do_test_file_registry(self.create_registry()) + + def test_registry_paths_subtree(self): + FileRegistry() + self.do_test_registry_paths(self.create_registry()) + + +class TestFileCopier(TestWithTmpDir): + def all_dirs(self, base): + all_dirs = set() + for root, dirs, files in os.walk(base): + if not dirs: + all_dirs.add(mozpath.relpath(root, base)) + return all_dirs + + def all_files(self, base): + all_files = set() + for root, dirs, files in os.walk(base): + for f in files: + all_files.add(mozpath.join(mozpath.relpath(root, base), f)) + return all_files + + def test_file_copier(self): + copier = FileCopier() + copier.add("foo/bar", GeneratedFile(b"foobar")) + copier.add("foo/qux", GeneratedFile(b"fooqux")) + copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz")) + copier.add("bar", GeneratedFile(b"bar")) + copier.add("qux/foo", GeneratedFile(b"quxfoo")) + copier.add("qux/bar", GeneratedFile(b"")) + + result = copier.copy(self.tmpdir) + self.assertEqual(self.all_files(self.tmpdir), set(copier.paths())) + self.assertEqual( + self.all_dirs(self.tmpdir), set(["foo/deep/nested/directory", "qux"]) + ) + + self.assertEqual( + result.updated_files, + set(self.tmppath(p) for p in self.all_files(self.tmpdir)), + ) + self.assertEqual(result.existing_files, set()) + self.assertEqual(result.removed_files, set()) + self.assertEqual(result.removed_directories, set()) + + copier.remove("foo") + copier.add("test", GeneratedFile(b"test")) + result = copier.copy(self.tmpdir) + self.assertEqual(self.all_files(self.tmpdir), set(copier.paths())) + self.assertEqual(self.all_dirs(self.tmpdir), set(["qux"])) + self.assertEqual( + result.removed_files, + set( + self.tmppath(p) + for p in ("foo/bar", "foo/qux", "foo/deep/nested/directory/file") + ), + ) + + def test_symlink_directory_replaced(self): + """Directory symlinks in destination are replaced if they need to be + real directories.""" + if not self.symlink_supported: + return + + dest = self.tmppath("dest") + + copier = FileCopier() + copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz")) + + os.makedirs(self.tmppath("dest/foo")) + dummy = self.tmppath("dummy") + os.mkdir(dummy) + link = self.tmppath("dest/foo/bar") + os.symlink(dummy, link) + + result = copier.copy(dest) + + st = os.lstat(link) + self.assertFalse(stat.S_ISLNK(st.st_mode)) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + + self.assertEqual(self.all_files(dest), set(copier.paths())) + + self.assertEqual(result.removed_directories, set()) + self.assertEqual(len(result.updated_files), 1) + + def test_remove_unaccounted_directory_symlinks(self): + """Directory symlinks in destination that are not in the way are + deleted according to remove_unaccounted and + remove_all_directory_symlinks. + """ + if not self.symlink_supported: + return + + dest = self.tmppath("dest") + + copier = FileCopier() + copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz")) + + os.makedirs(self.tmppath("dest/foo")) + dummy = self.tmppath("dummy") + os.mkdir(dummy) + + os.mkdir(self.tmppath("dest/zot")) + link = self.tmppath("dest/zot/zap") + os.symlink(dummy, link) + + # If not remove_unaccounted but remove_empty_directories, then + # the symlinked directory remains (as does its containing + # directory). + result = copier.copy( + dest, + remove_unaccounted=False, + remove_empty_directories=True, + remove_all_directory_symlinks=False, + ) + + st = os.lstat(link) + self.assertTrue(stat.S_ISLNK(st.st_mode)) + self.assertFalse(stat.S_ISDIR(st.st_mode)) + + self.assertEqual(self.all_files(dest), set(copier.paths())) + self.assertEqual(self.all_dirs(dest), set(["foo/bar"])) + + self.assertEqual(result.removed_directories, set()) + self.assertEqual(len(result.updated_files), 1) + + # If remove_unaccounted but not remove_empty_directories, then + # only the symlinked directory is removed. + result = copier.copy( + dest, + remove_unaccounted=True, + remove_empty_directories=False, + remove_all_directory_symlinks=False, + ) + + st = os.lstat(self.tmppath("dest/zot")) + self.assertFalse(stat.S_ISLNK(st.st_mode)) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + + self.assertEqual(result.removed_files, set([link])) + self.assertEqual(result.removed_directories, set()) + + self.assertEqual(self.all_files(dest), set(copier.paths())) + self.assertEqual(self.all_dirs(dest), set(["foo/bar", "zot"])) + + # If remove_unaccounted and remove_empty_directories, then + # both the symlink and its containing directory are removed. + link = self.tmppath("dest/zot/zap") + os.symlink(dummy, link) + + result = copier.copy( + dest, + remove_unaccounted=True, + remove_empty_directories=True, + remove_all_directory_symlinks=False, + ) + + self.assertEqual(result.removed_files, set([link])) + self.assertEqual(result.removed_directories, set([self.tmppath("dest/zot")])) + + self.assertEqual(self.all_files(dest), set(copier.paths())) + self.assertEqual(self.all_dirs(dest), set(["foo/bar"])) + + def test_permissions(self): + """Ensure files without write permission can be deleted.""" + with open(self.tmppath("dummy"), "a"): + pass + + p = self.tmppath("no_perms") + with open(p, "a"): + pass + + # Make file and directory unwritable. Reminder: making a directory + # unwritable prevents modifications (including deletes) from the list + # of files in that directory. + os.chmod(p, 0o400) + os.chmod(self.tmpdir, 0o400) + + copier = FileCopier() + copier.add("dummy", GeneratedFile(b"content")) + result = copier.copy(self.tmpdir) + self.assertEqual(result.removed_files_count, 1) + self.assertFalse(os.path.exists(p)) + + def test_no_remove(self): + copier = FileCopier() + copier.add("foo", GeneratedFile(b"foo")) + + with open(self.tmppath("bar"), "a"): + pass + + os.mkdir(self.tmppath("emptydir")) + d = self.tmppath("populateddir") + os.mkdir(d) + + with open(self.tmppath("populateddir/foo"), "a"): + pass + + result = copier.copy(self.tmpdir, remove_unaccounted=False) + + self.assertEqual( + self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"]) + ) + self.assertEqual(self.all_dirs(self.tmpdir), set(["populateddir"])) + self.assertEqual(result.removed_files, set()) + self.assertEqual(result.removed_directories, set([self.tmppath("emptydir")])) + + def test_no_remove_empty_directories(self): + copier = FileCopier() + copier.add("foo", GeneratedFile(b"foo")) + + with open(self.tmppath("bar"), "a"): + pass + + os.mkdir(self.tmppath("emptydir")) + d = self.tmppath("populateddir") + os.mkdir(d) + + with open(self.tmppath("populateddir/foo"), "a"): + pass + + result = copier.copy( + self.tmpdir, remove_unaccounted=False, remove_empty_directories=False + ) + + self.assertEqual( + self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"]) + ) + self.assertEqual(self.all_dirs(self.tmpdir), set(["emptydir", "populateddir"])) + self.assertEqual(result.removed_files, set()) + self.assertEqual(result.removed_directories, set()) + + def test_optional_exists_creates_unneeded_directory(self): + """Demonstrate that a directory not strictly required, but specified + as the path to an optional file, will be unnecessarily created. + + This behaviour is wrong; fixing it is tracked by Bug 972432; + and this test exists to guard against unexpected changes in + behaviour. + """ + + dest = self.tmppath("dest") + + copier = FileCopier() + copier.add("foo/bar", ExistingFile(required=False)) + + result = copier.copy(dest) + + st = os.lstat(self.tmppath("dest/foo")) + self.assertFalse(stat.S_ISLNK(st.st_mode)) + self.assertTrue(stat.S_ISDIR(st.st_mode)) + + # What's worse, we have no record that dest was created. + self.assertEqual(len(result.updated_files), 0) + + # But we do have an erroneous record of an optional file + # existing when it does not. + self.assertIn(self.tmppath("dest/foo/bar"), result.existing_files) + + def test_remove_unaccounted_file_registry(self): + """Test FileCopier.copy(remove_unaccounted=FileRegistry())""" + + dest = self.tmppath("dest") + + copier = FileCopier() + copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz")) + copier.add("foo/bar/qux", GeneratedFile(b"foobarqux")) + copier.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga")) + copier.add("foo/toto/tata", GeneratedFile(b"footototata")) + + os.makedirs(os.path.join(dest, "bar")) + with open(os.path.join(dest, "bar", "bar"), "w") as fh: + fh.write("barbar") + os.makedirs(os.path.join(dest, "foo", "toto")) + with open(os.path.join(dest, "foo", "toto", "toto"), "w") as fh: + fh.write("foototototo") + + result = copier.copy(dest, remove_unaccounted=False) + + self.assertEqual( + self.all_files(dest), set(copier.paths()) | {"foo/toto/toto", "bar/bar"} + ) + self.assertEqual( + self.all_dirs(dest), {"foo/bar", "foo/hoge", "foo/toto", "bar"} + ) + + copier2 = FileCopier() + copier2.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga")) + + # We expect only files copied from the first copier to be removed, + # not the extra file that was there beforehand. + result = copier2.copy(dest, remove_unaccounted=copier) + + self.assertEqual( + self.all_files(dest), set(copier2.paths()) | {"foo/toto/toto", "bar/bar"} + ) + self.assertEqual(self.all_dirs(dest), {"foo/hoge", "foo/toto", "bar"}) + self.assertEqual(result.updated_files, {self.tmppath("dest/foo/hoge/fuga")}) + self.assertEqual(result.existing_files, set()) + self.assertEqual( + result.removed_files, + { + self.tmppath(p) + for p in ("dest/foo/bar/baz", "dest/foo/bar/qux", "dest/foo/toto/tata") + }, + ) + self.assertEqual(result.removed_directories, {self.tmppath("dest/foo/bar")}) + + +class TestJarrer(unittest.TestCase): + def check_jar(self, dest, copier): + jar = JarReader(fileobj=dest) + self.assertEqual([f.filename for f in jar], copier.paths()) + for f in jar: + self.assertEqual(f.uncompressed_data.read(), copier[f.filename].content) + + def test_jarrer(self): + copier = Jarrer() + copier.add("foo/bar", GeneratedFile(b"foobar")) + copier.add("foo/qux", GeneratedFile(b"fooqux")) + copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz")) + copier.add("bar", GeneratedFile(b"bar")) + copier.add("qux/foo", GeneratedFile(b"quxfoo")) + copier.add("qux/bar", GeneratedFile(b"")) + + dest = MockDest() + copier.copy(dest) + self.check_jar(dest, copier) + + copier.remove("foo") + copier.add("test", GeneratedFile(b"test")) + copier.copy(dest) + self.check_jar(dest, copier) + + copier.remove("test") + copier.add("test", GeneratedFile(b"replaced-content")) + copier.copy(dest) + self.check_jar(dest, copier) + + copier.copy(dest) + self.check_jar(dest, copier) + + preloaded = ["qux/bar", "bar"] + copier.preload(preloaded) + copier.copy(dest) + + dest.seek(0) + jar = JarReader(fileobj=dest) + self.assertEqual( + [f.filename for f in jar], + preloaded + [p for p in copier.paths() if p not in preloaded], + ) + self.assertEqual(jar.last_preloaded, preloaded[-1]) + + def test_jarrer_compress(self): + copier = Jarrer() + copier.add("foo/bar", GeneratedFile(b"ffffff")) + copier.add("foo/qux", GeneratedFile(b"ffffff"), compress=False) + + dest = MockDest() + copier.copy(dest) + self.check_jar(dest, copier) + + dest.seek(0) + jar = JarReader(fileobj=dest) + self.assertTrue(jar["foo/bar"].compressed) + self.assertFalse(jar["foo/qux"].compressed) + + +if __name__ == "__main__": + mozunit.main() |