diff options
Diffstat (limited to 'tests/test_apt_cache.py')
-rw-r--r-- | tests/test_apt_cache.py | 400 |
1 files changed, 400 insertions, 0 deletions
diff --git a/tests/test_apt_cache.py b/tests/test_apt_cache.py new file mode 100644 index 0000000..cc52400 --- /dev/null +++ b/tests/test_apt_cache.py @@ -0,0 +1,400 @@ +#!/usr/bin/python3 +# +# Copyright (C) 2010 Julian Andres Klode <jak@debian.org> +# 2010 Michael Vogt <mvo@ubuntu.com> +# +# Copying and distribution of this file, with or without modification, +# are permitted in any medium without royalty provided the copyright +# notice and this notice are preserved. +"""Unit tests for verifying the correctness of check_dep, etc in apt_pkg.""" + +import glob +import logging +import os +import shutil +import sys +import tempfile +import unittest + +from test_all import get_library_dir + +libdir = get_library_dir() +if libdir: + sys.path.insert(0, libdir) + +import apt_pkg +import testcommon + +import apt + + +def if_sources_list_is_readable(f): + def wrapper(*args, **kwargs): + if os.access("/etc/apt/sources.list", os.R_OK): + f(*args, **kwargs) + else: + logging.warning("skipping '%s' because sources.list is not readable" % f) + + return wrapper + + +def get_open_file_descriptors(): + try: + fds = os.listdir("/proc/self/fd") + except OSError: + logging.warning("failed to list /proc/self/fd") + return set() + return set(map(int, fds)) + + +class TestAptCache(testcommon.TestCase): + """test the apt cache""" + + def setUp(self): + testcommon.TestCase.setUp(self) + apt_pkg.config.clear("APT::Update::Post-Invoke") + apt_pkg.config.clear("APT::Update::Post-Invoke-Success") + + @if_sources_list_is_readable + def test_apt_cache(self): + """cache: iterate all packages and all dependencies""" + cache = apt.Cache() + # number is not meaningful and just need to be "big enough", + # the important bit is the test against __len__ + self.assertTrue(len(cache) > 100) + # go over the cache and all dependencies, just to see if + # that is possible and does not crash + for pkg in cache: + if pkg.candidate: + for or_deps in pkg.candidate.dependencies: + for dep in or_deps: + self.assertTrue(dep.name) + self.assertTrue(isinstance(dep.relation, str)) + self.assertTrue(dep.pre_depend in (True, False)) + + # accessing record should take a reasonable time; in + # particular, when using compressed indexes, it should not use + # tons of seek operations + r = pkg.candidate.record + self.assertEqual(r["Package"], pkg.shortname) + self.assertTrue("Version" in r) + self.assertTrue(len(r["Description"]) > 0) + self.assertTrue(str(r).startswith("Package: %s\n" % pkg.shortname)) + + @if_sources_list_is_readable + def test_cache_close_leak_fd(self): + fds_before_open = get_open_file_descriptors() + cache = apt.Cache() + opened_fd = get_open_file_descriptors().difference(fds_before_open) + cache.close() + fds_after_close = get_open_file_descriptors() + unclosed_fd = opened_fd.intersection(fds_after_close) + self.assertEqual(fds_before_open, fds_after_close) + self.assertEqual(unclosed_fd, set()) + + def test_cache_open_twice_leaks_fds(self): + cache = apt.Cache() + fds_before_open = get_open_file_descriptors() + cache.open() + fds_after_open_twice = get_open_file_descriptors() + self.assertEqual(fds_before_open, fds_after_open_twice) + + @if_sources_list_is_readable + def test_cache_close_download_fails(self): + cache = apt.Cache() + self.assertEqual(cache.required_download, 0) + cache.close() + with self.assertRaises(apt.cache.CacheClosedException): + cache.required_download + + def test_get_provided_packages(self): + apt.apt_pkg.config.set("Apt::architecture", "i386") + cache = apt.Cache(rootdir="./data/test-provides/") + cache.open() + if len(cache) == 0: + logging.warning("skipping test_get_provided_packages, cache empty?!?") + return + # a true virtual pkg + li = cache.get_providing_packages("mail-transport-agent") + self.assertTrue(len(li) > 0) + self.assertTrue("postfix" in [p.name for p in li]) + self.assertTrue("mail-transport-agent" in cache["postfix"].candidate.provides) + + def test_low_level_pkg_provides(self): + apt.apt_pkg.config.set("Apt::architecture", "i386") + # create highlevel cache and get the lowlevel one from it + highlevel_cache = apt.Cache(rootdir="./data/test-provides") + if len(highlevel_cache) == 0: + logging.warning("skipping test_log_level_pkg_provides, cache empty?!?") + return + # low level cache provides list of the pkg + cache = highlevel_cache._cache + li = cache["mail-transport-agent"].provides_list + # arbitrary number, just needs to be higher enough + self.assertEqual(len(li), 2) + for providesname, providesver, version in li: + self.assertEqual(providesname, "mail-transport-agent") + if version.parent_pkg.name == "postfix": + break + else: + self.assertNotReached() + + @if_sources_list_is_readable + def test_dpkg_journal_dirty(self): + # create tmp env + tmpdir = tempfile.mkdtemp() + dpkg_dir = os.path.join(tmpdir, "var", "lib", "dpkg") + os.makedirs(os.path.join(dpkg_dir, "updates")) + open(os.path.join(dpkg_dir, "status"), "w").close() + apt_pkg.config.set("Dir::State::status", os.path.join(dpkg_dir, "status")) + cache = apt.Cache() + # test empty + self.assertFalse(cache.dpkg_journal_dirty) + # that is ok, only [0-9] are dpkg jounral entries + open(os.path.join(dpkg_dir, "updates", "xxx"), "w").close() + self.assertFalse(cache.dpkg_journal_dirty) + # that is a dirty journal + open(os.path.join(dpkg_dir, "updates", "000"), "w").close() + self.assertTrue(cache.dpkg_journal_dirty) + + @if_sources_list_is_readable + def test_apt_update(self): + rootdir = "./data/tmp" + if os.path.exists(rootdir): + shutil.rmtree(rootdir) + try: + os.makedirs(os.path.join(rootdir, "var/lib/apt/lists/partial")) + except OSError: + pass + state_dir = os.path.join(rootdir, "var/lib/apt") + lists_dir = os.path.join(rootdir, "var/lib/apt/lists") + old_state = apt_pkg.config.find("dir::state") + apt_pkg.config.set("dir::state", state_dir) + # set a local sources.list that does not need the network + base_sources = os.path.abspath(os.path.join(rootdir, "sources.list")) + old_source_list = apt_pkg.config.find("dir::etc::sourcelist") + old_source_parts = apt_pkg.config.find("dir::etc::sourceparts") + apt_pkg.config.set("dir::etc::sourcelist", base_sources) + # TODO: /dev/null is not a dir, perhaps find something better + apt_pkg.config.set("dir::etc::sourceparts", "/dev/null") + # main sources.list + sources_list = base_sources + with open(sources_list, "w") as f: + repo = os.path.abspath("./data/test-repo2") + f.write("deb [allow-insecure=yes] copy:%s /\n" % repo) + + # test single sources.list fetching + sources_list = os.path.join(rootdir, "test.list") + with open(sources_list, "w") as f: + repo_dir = os.path.abspath("./data/test-repo") + f.write("deb [allow-insecure=yes] copy:%s /\n" % repo_dir) + + self.assertTrue(os.path.exists(sources_list)) + # write marker to ensure listcleaner is not run + open("./data/tmp/var/lib/apt/lists/marker", "w").close() + + # update a single sources.list + cache = apt.Cache() + cache.update(sources_list=sources_list) + # verify we just got the excpected package file + needle_packages = glob.glob(lists_dir + "/*tests_data_test-repo_Packages*") + self.assertEqual(len(needle_packages), 1) + # verify that we *only* got the Packages file from a single source + all_packages = glob.glob(lists_dir + "/*_Packages*") + self.assertEqual(needle_packages, all_packages) + # verify that the listcleaner was not run and the marker file is + # still there + self.assertTrue("marker" in os.listdir(lists_dir)) + # now run update again (without the "normal" sources.list that + # contains test-repo2 and verify that we got the normal sources.list + cache.update() + needle_packages = glob.glob(lists_dir + "/*tests_data_test-repo2_Packages*") + self.assertEqual(len(needle_packages), 1) + all_packages = glob.glob(lists_dir + "/*_Packages*") + self.assertEqual(needle_packages, all_packages) + + # and another update with a single source only + cache = apt.Cache() + cache.update(sources_list=sources_list) + all_packages = glob.glob(lists_dir + "/*_Packages*") + self.assertEqual(len(all_packages), 2) + apt_pkg.config.set("dir::state", old_state) + apt_pkg.config.set("dir::etc::sourcelist", old_source_list) + apt_pkg.config.set("dir::etc::sourceparts", old_source_parts) + + def test_package_cmp(self): + cache = apt.Cache(rootdir="/") + li = [] + li.append(cache["intltool"]) + li.append(cache["python3"]) + li.append(cache["apt"]) + li.sort() + self.assertEqual([p.name for p in li], ["apt", "intltool", "python3"]) + + def test_get_architectures(self): + main_arch = apt.apt_pkg.config.get("APT::Architecture") + arches = apt_pkg.get_architectures() + self.assertTrue(main_arch in arches) + + def test_phasing_applied(self): + """checks the return type of phasing_applied.""" + cache = apt.Cache() + pkg = cache["apt"] + self.assertIsInstance(pkg.phasing_applied, bool) + + def test_is_security_update(self): + """checks the return type of is_security_update.""" + cache = apt.Cache() + pkg = cache["apt"] + self.assertIsInstance(pkg.installed.is_security_update, bool) + + def test_apt_cache_reopen_is_safe(self): + """cache: check that we cannot use old package objects after reopen""" + cache = apt.Cache() + old_depcache = cache._depcache + old_package = cache["apt"] + old_pkg = old_package._pkg + old_version = old_package.candidate + old_ver = old_version._cand + + cache.open() + new_depcache = cache._depcache + new_package = cache["apt"] + new_pkg = new_package._pkg + new_version = new_package.candidate + new_ver = new_version._cand + + # get candidate + self.assertRaises(ValueError, old_depcache.get_candidate_ver, new_pkg) + self.assertRaises(ValueError, new_depcache.get_candidate_ver, old_pkg) + self.assertEqual(new_ver, new_depcache.get_candidate_ver(new_pkg)) + self.assertEqual(old_package.candidate._cand, old_ver) # Remap success + self.assertEqual( + old_package.candidate._cand, new_depcache.get_candidate_ver(new_pkg) + ) + + # set candidate + new_package.candidate = old_version + old_depcache.set_candidate_ver(old_pkg, old_ver) + self.assertRaises(ValueError, old_depcache.set_candidate_ver, old_pkg, new_ver) + self.assertRaises(ValueError, new_depcache.set_candidate_ver, old_pkg, old_ver) + self.assertRaises(ValueError, new_depcache.set_candidate_ver, old_pkg, new_ver) + self.assertRaises(ValueError, new_depcache.set_candidate_ver, new_pkg, old_ver) + new_depcache.set_candidate_ver(new_pkg, new_ver) + + @staticmethod + def write_status_file(packages): + with open(apt_pkg.config["Dir::State::Status"], "w") as fobj: + for package in packages: + print("Package:", package, file=fobj) + print("Status: install ok installed", file=fobj) + print("Priority: optional", file=fobj) + print("Section: admin", file=fobj) + print("Installed-Size: 1", file=fobj) + print("Maintainer: X <x@x.invalid>", file=fobj) + print("Architecture: all", file=fobj) + print("Version: 1", file=fobj) + print("Description: blah", file=fobj) + print("", file=fobj) + + def test_apt_cache_reopen_is_safe_out_of_bounds(self): + """Check that out of bounds access is remapped correctly.""" + with tempfile.NamedTemporaryFile() as status: + apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null" + apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null" + apt_pkg.config["Dir::State::Status"] = status.name + apt_pkg.init_system() + + self.write_status_file("abcdefghijklmnopqrstuvwxyz") + c = apt.Cache() + p = c["z"] + p_id = p.id + self.write_status_file("az") + apt_pkg.init_system() + c.open() + self.assertNotEqual(p.id, p_id) + self.assertLess(p.id, 2) + p.mark_delete() + self.assertEqual([p], c.get_changes()) + + def test_apt_cache_reopen_is_safe_out_of_bounds_no_match(self): + """Check that installing gone package raises correct exception""" + with tempfile.NamedTemporaryFile() as status: + apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null" + apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null" + apt_pkg.config["Dir::State::Status"] = status.name + apt_pkg.init_system() + + self.write_status_file("abcdefghijklmnopqrstuvwxyz") + c = apt.Cache() + p = c["z"] + p_id = p.id + self.write_status_file("a") + apt_pkg.init_system() + c.open() + self.assertEqual(p.id, p_id) # Could not be remapped + self.assertRaises(apt_pkg.CacheMismatchError, p.mark_delete) + + def test_apt_cache_reopen_is_safe_swap(self): + """Check that swapping a and b does not mark the wrong package.""" + with tempfile.NamedTemporaryFile() as status: + apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null" + apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null" + apt_pkg.config["Dir::State::Status"] = status.name + apt_pkg.init_system() + + self.write_status_file("abcdefghijklmnopqrstuvwxyz") + c = apt.Cache() + p = c["a"] + a_id = p.id + p_hash = hash(p) + set_of_p = {p} + self.write_status_file("baz") + apt_pkg.init_system() + c.open() + # b now has the same id as a in the old cache + self.assertEqual(c["b"].id, a_id) + self.assertNotEqual(p.id, a_id) + # Marking a should still mark a, not b. + p.mark_delete() + self.assertEqual([p], c.get_changes()) + + # Ensure that p can still be found in a set of it, as a test + # for bug https://bugs.launchpad.net/bugs/1780099 + self.assertEqual(hash(p), p_hash) + self.assertIn(p, set_of_p) + + def test_apt_cache_iteration_safe(self): + """Check that iterating does not produce different results. + + This failed in 1.7.0~alpha2, because one part of the code + looked up packages in the weak dict using the pretty name, + and the other using the full name.""" + + with tempfile.NamedTemporaryFile() as status: + apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null" + apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null" + apt_pkg.config["Dir::State::Status"] = status.name + apt_pkg.init_system() + + self.write_status_file("abcdefghijklmnopqrstuvwxyz") + + c = apt.Cache() + c["a"].mark_delete() + self.assertEqual([c["a"]], [p for p in c if p.marked_delete]) + + def test_problemresolver_keep_phased_updates(self): + """Check that the c++ function can be called.""" + with tempfile.NamedTemporaryFile() as status: + apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null" + apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null" + apt_pkg.config["Dir::State::Status"] = status.name + apt_pkg.init_system() + + cache = apt.Cache() + problemresolver = apt.ProblemResolver(cache) + self.assertIsNone(problemresolver.keep_phased_updates()) + + +if __name__ == "__main__": + unittest.main() |