summaryrefslogtreecommitdiffstats
path: root/tests/test_apt_cache.py
blob: cc52400c9913cdc4afc79be0ca379b1bfe077698 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
#!/usr/bin/python3
#
# Copyright (C) 2010 Julian Andres Klode <jak@debian.org>
#               2010 Michael Vogt <mvo@ubuntu.com>
#
# Copying and distribution of this file, with or without modification,
# are permitted in any medium without royalty provided the copyright
# notice and this notice are preserved.
"""Unit tests for verifying the correctness of check_dep, etc in apt_pkg."""

import glob
import logging
import os
import shutil
import sys
import tempfile
import unittest

from test_all import get_library_dir

libdir = get_library_dir()
if libdir:
    sys.path.insert(0, libdir)

import apt_pkg
import testcommon

import apt


def if_sources_list_is_readable(f):
    def wrapper(*args, **kwargs):
        if os.access("/etc/apt/sources.list", os.R_OK):
            f(*args, **kwargs)
        else:
            logging.warning("skipping '%s' because sources.list is not readable" % f)

    return wrapper


def get_open_file_descriptors():
    try:
        fds = os.listdir("/proc/self/fd")
    except OSError:
        logging.warning("failed to list /proc/self/fd")
        return set()
    return set(map(int, fds))


class TestAptCache(testcommon.TestCase):
    """test the apt cache"""

    def setUp(self):
        testcommon.TestCase.setUp(self)
        apt_pkg.config.clear("APT::Update::Post-Invoke")
        apt_pkg.config.clear("APT::Update::Post-Invoke-Success")

    @if_sources_list_is_readable
    def test_apt_cache(self):
        """cache: iterate all packages and all dependencies"""
        cache = apt.Cache()
        # number is not meaningful and just need to be "big enough",
        # the important bit is the test against __len__
        self.assertTrue(len(cache) > 100)
        # go over the cache and all dependencies, just to see if
        # that is possible and does not crash
        for pkg in cache:
            if pkg.candidate:
                for or_deps in pkg.candidate.dependencies:
                    for dep in or_deps:
                        self.assertTrue(dep.name)
                        self.assertTrue(isinstance(dep.relation, str))
                        self.assertTrue(dep.pre_depend in (True, False))

                # accessing record should take a reasonable time; in
                # particular, when using compressed indexes, it should not use
                # tons of seek operations
                r = pkg.candidate.record
                self.assertEqual(r["Package"], pkg.shortname)
                self.assertTrue("Version" in r)
                self.assertTrue(len(r["Description"]) > 0)
                self.assertTrue(str(r).startswith("Package: %s\n" % pkg.shortname))

    @if_sources_list_is_readable
    def test_cache_close_leak_fd(self):
        fds_before_open = get_open_file_descriptors()
        cache = apt.Cache()
        opened_fd = get_open_file_descriptors().difference(fds_before_open)
        cache.close()
        fds_after_close = get_open_file_descriptors()
        unclosed_fd = opened_fd.intersection(fds_after_close)
        self.assertEqual(fds_before_open, fds_after_close)
        self.assertEqual(unclosed_fd, set())

    def test_cache_open_twice_leaks_fds(self):
        cache = apt.Cache()
        fds_before_open = get_open_file_descriptors()
        cache.open()
        fds_after_open_twice = get_open_file_descriptors()
        self.assertEqual(fds_before_open, fds_after_open_twice)

    @if_sources_list_is_readable
    def test_cache_close_download_fails(self):
        cache = apt.Cache()
        self.assertEqual(cache.required_download, 0)
        cache.close()
        with self.assertRaises(apt.cache.CacheClosedException):
            cache.required_download

    def test_get_provided_packages(self):
        apt.apt_pkg.config.set("Apt::architecture", "i386")
        cache = apt.Cache(rootdir="./data/test-provides/")
        cache.open()
        if len(cache) == 0:
            logging.warning("skipping test_get_provided_packages, cache empty?!?")
            return
        # a true virtual pkg
        li = cache.get_providing_packages("mail-transport-agent")
        self.assertTrue(len(li) > 0)
        self.assertTrue("postfix" in [p.name for p in li])
        self.assertTrue("mail-transport-agent" in cache["postfix"].candidate.provides)

    def test_low_level_pkg_provides(self):
        apt.apt_pkg.config.set("Apt::architecture", "i386")
        # create highlevel cache and get the lowlevel one from it
        highlevel_cache = apt.Cache(rootdir="./data/test-provides")
        if len(highlevel_cache) == 0:
            logging.warning("skipping test_log_level_pkg_provides, cache empty?!?")
            return
        # low level cache provides list of the pkg
        cache = highlevel_cache._cache
        li = cache["mail-transport-agent"].provides_list
        # arbitrary number, just needs to be higher enough
        self.assertEqual(len(li), 2)
        for providesname, providesver, version in li:
            self.assertEqual(providesname, "mail-transport-agent")
            if version.parent_pkg.name == "postfix":
                break
        else:
            self.assertNotReached()

    @if_sources_list_is_readable
    def test_dpkg_journal_dirty(self):
        # create tmp env
        tmpdir = tempfile.mkdtemp()
        dpkg_dir = os.path.join(tmpdir, "var", "lib", "dpkg")
        os.makedirs(os.path.join(dpkg_dir, "updates"))
        open(os.path.join(dpkg_dir, "status"), "w").close()
        apt_pkg.config.set("Dir::State::status", os.path.join(dpkg_dir, "status"))
        cache = apt.Cache()
        # test empty
        self.assertFalse(cache.dpkg_journal_dirty)
        # that is ok, only [0-9] are dpkg jounral entries
        open(os.path.join(dpkg_dir, "updates", "xxx"), "w").close()
        self.assertFalse(cache.dpkg_journal_dirty)
        # that is a dirty journal
        open(os.path.join(dpkg_dir, "updates", "000"), "w").close()
        self.assertTrue(cache.dpkg_journal_dirty)

    @if_sources_list_is_readable
    def test_apt_update(self):
        rootdir = "./data/tmp"
        if os.path.exists(rootdir):
            shutil.rmtree(rootdir)
        try:
            os.makedirs(os.path.join(rootdir, "var/lib/apt/lists/partial"))
        except OSError:
            pass
        state_dir = os.path.join(rootdir, "var/lib/apt")
        lists_dir = os.path.join(rootdir, "var/lib/apt/lists")
        old_state = apt_pkg.config.find("dir::state")
        apt_pkg.config.set("dir::state", state_dir)
        # set a local sources.list that does not need the network
        base_sources = os.path.abspath(os.path.join(rootdir, "sources.list"))
        old_source_list = apt_pkg.config.find("dir::etc::sourcelist")
        old_source_parts = apt_pkg.config.find("dir::etc::sourceparts")
        apt_pkg.config.set("dir::etc::sourcelist", base_sources)
        # TODO: /dev/null is not a dir, perhaps find something better
        apt_pkg.config.set("dir::etc::sourceparts", "/dev/null")
        # main sources.list
        sources_list = base_sources
        with open(sources_list, "w") as f:
            repo = os.path.abspath("./data/test-repo2")
            f.write("deb [allow-insecure=yes] copy:%s /\n" % repo)

        # test single sources.list fetching
        sources_list = os.path.join(rootdir, "test.list")
        with open(sources_list, "w") as f:
            repo_dir = os.path.abspath("./data/test-repo")
            f.write("deb [allow-insecure=yes] copy:%s /\n" % repo_dir)

        self.assertTrue(os.path.exists(sources_list))
        # write marker to ensure listcleaner is not run
        open("./data/tmp/var/lib/apt/lists/marker", "w").close()

        # update a single sources.list
        cache = apt.Cache()
        cache.update(sources_list=sources_list)
        # verify we just got the excpected package file
        needle_packages = glob.glob(lists_dir + "/*tests_data_test-repo_Packages*")
        self.assertEqual(len(needle_packages), 1)
        # verify that we *only* got the Packages file from a single source
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(needle_packages, all_packages)
        # verify that the listcleaner was not run and the marker file is
        # still there
        self.assertTrue("marker" in os.listdir(lists_dir))
        # now run update again (without the "normal" sources.list that
        # contains test-repo2 and verify that we got the normal sources.list
        cache.update()
        needle_packages = glob.glob(lists_dir + "/*tests_data_test-repo2_Packages*")
        self.assertEqual(len(needle_packages), 1)
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(needle_packages, all_packages)

        # and another update with a single source only
        cache = apt.Cache()
        cache.update(sources_list=sources_list)
        all_packages = glob.glob(lists_dir + "/*_Packages*")
        self.assertEqual(len(all_packages), 2)
        apt_pkg.config.set("dir::state", old_state)
        apt_pkg.config.set("dir::etc::sourcelist", old_source_list)
        apt_pkg.config.set("dir::etc::sourceparts", old_source_parts)

    def test_package_cmp(self):
        cache = apt.Cache(rootdir="/")
        li = []
        li.append(cache["intltool"])
        li.append(cache["python3"])
        li.append(cache["apt"])
        li.sort()
        self.assertEqual([p.name for p in li], ["apt", "intltool", "python3"])

    def test_get_architectures(self):
        main_arch = apt.apt_pkg.config.get("APT::Architecture")
        arches = apt_pkg.get_architectures()
        self.assertTrue(main_arch in arches)

    def test_phasing_applied(self):
        """checks the return type of phasing_applied."""
        cache = apt.Cache()
        pkg = cache["apt"]
        self.assertIsInstance(pkg.phasing_applied, bool)

    def test_is_security_update(self):
        """checks the return type of is_security_update."""
        cache = apt.Cache()
        pkg = cache["apt"]
        self.assertIsInstance(pkg.installed.is_security_update, bool)

    def test_apt_cache_reopen_is_safe(self):
        """cache: check that we cannot use old package objects after reopen"""
        cache = apt.Cache()
        old_depcache = cache._depcache
        old_package = cache["apt"]
        old_pkg = old_package._pkg
        old_version = old_package.candidate
        old_ver = old_version._cand

        cache.open()
        new_depcache = cache._depcache
        new_package = cache["apt"]
        new_pkg = new_package._pkg
        new_version = new_package.candidate
        new_ver = new_version._cand

        # get candidate
        self.assertRaises(ValueError, old_depcache.get_candidate_ver, new_pkg)
        self.assertRaises(ValueError, new_depcache.get_candidate_ver, old_pkg)
        self.assertEqual(new_ver, new_depcache.get_candidate_ver(new_pkg))
        self.assertEqual(old_package.candidate._cand, old_ver)  # Remap success
        self.assertEqual(
            old_package.candidate._cand, new_depcache.get_candidate_ver(new_pkg)
        )

        # set candidate
        new_package.candidate = old_version
        old_depcache.set_candidate_ver(old_pkg, old_ver)
        self.assertRaises(ValueError, old_depcache.set_candidate_ver, old_pkg, new_ver)
        self.assertRaises(ValueError, new_depcache.set_candidate_ver, old_pkg, old_ver)
        self.assertRaises(ValueError, new_depcache.set_candidate_ver, old_pkg, new_ver)
        self.assertRaises(ValueError, new_depcache.set_candidate_ver, new_pkg, old_ver)
        new_depcache.set_candidate_ver(new_pkg, new_ver)

    @staticmethod
    def write_status_file(packages):
        with open(apt_pkg.config["Dir::State::Status"], "w") as fobj:
            for package in packages:
                print("Package:", package, file=fobj)
                print("Status: install ok installed", file=fobj)
                print("Priority: optional", file=fobj)
                print("Section: admin", file=fobj)
                print("Installed-Size: 1", file=fobj)
                print("Maintainer: X <x@x.invalid>", file=fobj)
                print("Architecture: all", file=fobj)
                print("Version: 1", file=fobj)
                print("Description: blah", file=fobj)
                print("", file=fobj)

    def test_apt_cache_reopen_is_safe_out_of_bounds(self):
        """Check that out of bounds access is remapped correctly."""
        with tempfile.NamedTemporaryFile() as status:
            apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null"
            apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null"
            apt_pkg.config["Dir::State::Status"] = status.name
            apt_pkg.init_system()

            self.write_status_file("abcdefghijklmnopqrstuvwxyz")
            c = apt.Cache()
            p = c["z"]
            p_id = p.id
            self.write_status_file("az")
            apt_pkg.init_system()
            c.open()
            self.assertNotEqual(p.id, p_id)
            self.assertLess(p.id, 2)
            p.mark_delete()
            self.assertEqual([p], c.get_changes())

    def test_apt_cache_reopen_is_safe_out_of_bounds_no_match(self):
        """Check that installing gone package raises correct exception"""
        with tempfile.NamedTemporaryFile() as status:
            apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null"
            apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null"
            apt_pkg.config["Dir::State::Status"] = status.name
            apt_pkg.init_system()

            self.write_status_file("abcdefghijklmnopqrstuvwxyz")
            c = apt.Cache()
            p = c["z"]
            p_id = p.id
            self.write_status_file("a")
            apt_pkg.init_system()
            c.open()
            self.assertEqual(p.id, p_id)  # Could not be remapped
            self.assertRaises(apt_pkg.CacheMismatchError, p.mark_delete)

    def test_apt_cache_reopen_is_safe_swap(self):
        """Check that swapping a and b does not mark the wrong package."""
        with tempfile.NamedTemporaryFile() as status:
            apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null"
            apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null"
            apt_pkg.config["Dir::State::Status"] = status.name
            apt_pkg.init_system()

            self.write_status_file("abcdefghijklmnopqrstuvwxyz")
            c = apt.Cache()
            p = c["a"]
            a_id = p.id
            p_hash = hash(p)
            set_of_p = {p}
            self.write_status_file("baz")
            apt_pkg.init_system()
            c.open()
            # b now has the same id as a in the old cache
            self.assertEqual(c["b"].id, a_id)
            self.assertNotEqual(p.id, a_id)
            # Marking a should still mark a, not b.
            p.mark_delete()
            self.assertEqual([p], c.get_changes())

            # Ensure that p can still be found in a set of it, as a test
            # for bug https://bugs.launchpad.net/bugs/1780099
            self.assertEqual(hash(p), p_hash)
            self.assertIn(p, set_of_p)

    def test_apt_cache_iteration_safe(self):
        """Check that iterating does not produce different results.

        This failed in 1.7.0~alpha2, because one part of the code
        looked up packages in the weak dict using the pretty name,
        and the other using the full name."""

        with tempfile.NamedTemporaryFile() as status:
            apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null"
            apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null"
            apt_pkg.config["Dir::State::Status"] = status.name
            apt_pkg.init_system()

            self.write_status_file("abcdefghijklmnopqrstuvwxyz")

            c = apt.Cache()
            c["a"].mark_delete()
            self.assertEqual([c["a"]], [p for p in c if p.marked_delete])

    def test_problemresolver_keep_phased_updates(self):
        """Check that the c++ function can be called."""
        with tempfile.NamedTemporaryFile() as status:
            apt_pkg.config["Dir::Etc::SourceList"] = "/dev/null"
            apt_pkg.config["Dir::Etc::SourceParts"] = "/dev/null"
            apt_pkg.config["Dir::State::Status"] = status.name
            apt_pkg.init_system()

            cache = apt.Cache()
            problemresolver = apt.ProblemResolver(cache)
            self.assertIsNone(problemresolver.keep_phased_updates())


if __name__ == "__main__":
    unittest.main()