diff options
Diffstat (limited to 'aptsources')
-rw-r--r-- | aptsources/__init__.py | 6 | ||||
-rw-r--r-- | aptsources/_deb822.py | 144 | ||||
-rw-r--r-- | aptsources/distinfo.py | 415 | ||||
-rw-r--r-- | aptsources/distro.py | 648 | ||||
-rw-r--r-- | aptsources/sourceslist.py | 1083 |
5 files changed, 2296 insertions, 0 deletions
diff --git a/aptsources/__init__.py b/aptsources/__init__.py new file mode 100644 index 0000000..2ccf4fc --- /dev/null +++ b/aptsources/__init__.py @@ -0,0 +1,6 @@ +import apt_pkg + +# init the package system, but do not re-initialize config +if "APT" not in apt_pkg.config: + apt_pkg.init_config() +apt_pkg.init_system() diff --git a/aptsources/_deb822.py b/aptsources/_deb822.py new file mode 100644 index 0000000..d3b3a7c --- /dev/null +++ b/aptsources/_deb822.py @@ -0,0 +1,144 @@ +#!/usr/bin/python3 +# +# Copyright (C) Canonical Ltd +# +# SPDX-License-Identifier: GPL-2.0+ + +"""deb822 parser with support for comment headers and footers.""" + +import collections +import io +import typing + +import apt_pkg + +T = typing.TypeVar("T") + + +class Section: + """A single deb822 section, possibly with comments. + + This represents a single deb822 section. + """ + + tags: collections.OrderedDict[str, str] + _case_mapping: dict[str, str] + header: str + footer: str + + def __init__(self, section: typing.Union[str, "Section"]): + if isinstance(section, Section): + self.tags = collections.OrderedDict(section.tags) + self._case_mapping = {k.casefold(): k for k in self.tags} + self.header = section.header + self.footer = section.footer + return + + comments = ["", ""] + in_section = False + trimmed_section = "" + + for line in section.split("\n"): + if line.startswith("#"): + # remove the leading # + line = line[1:] + comments[in_section] += line + "\n" + continue + + in_section = True + trimmed_section += line + "\n" + + self.tags = collections.OrderedDict(apt_pkg.TagSection(trimmed_section)) + self._case_mapping = {k.casefold(): k for k in self.tags} + self.header, self.footer = comments + + def __getitem__(self, key: str) -> str: + """Get the value of a field.""" + return self.tags[self._case_mapping.get(key.casefold(), key)] + + def __delitem__(self, key: str) -> None: + """Delete a field""" + del self.tags[self._case_mapping.get(key.casefold(), key)] + + def __setitem__(self, key: str, val: str) -> None: + """Set the value of a field.""" + if key.casefold() not in self._case_mapping: + self._case_mapping[key.casefold()] = key + self.tags[self._case_mapping[key.casefold()]] = val + + def __bool__(self) -> bool: + return bool(self.tags) + + @typing.overload + def get(self, key: str) -> str | None: + ... + + @typing.overload + def get(self, key: str, default: T) -> T | str: + ... + + def get(self, key: str, default: T | None = None) -> T | None | str: + try: + return self[key] + except KeyError: + return default + + @staticmethod + def __comment_lines(content: str) -> str: + return ( + "\n".join("#" + line for line in content.splitlines()) + "\n" + if content + else "" + ) + + def __str__(self) -> str: + """Canonical string rendering of this section.""" + return ( + self.__comment_lines(self.header) + + "".join(f"{k}: {v}\n" for k, v in self.tags.items()) + + self.__comment_lines(self.footer) + ) + + +class File: + """ + Parse a given file object into a list of Section objects. + """ + + def __init__(self, fobj: io.TextIOBase): + self.sections = [] + section = "" + for line in fobj: + if not line.isspace(): + # A line is part of the section if it has non-whitespace characters + section += line + elif section: + # Our line is just whitespace and we have gathered section content, so let's write out the section + self.sections.append(Section(section)) + section = "" + + # The final section may not be terminated by an empty line + if section: + self.sections.append(Section(section)) + + def __iter__(self) -> typing.Iterator[Section]: + return iter(self.sections) + + def __str__(self) -> str: + return "\n".join(str(s) for s in self.sections) + + +if __name__ == "__main__": + st = """# Header +# More header +K1: V1 +# Inline +K2: V2 + # not a comment +# Footer +# More footer +""" + + s = Section(st) + + print(s) diff --git a/aptsources/distinfo.py b/aptsources/distinfo.py new file mode 100644 index 0000000..bd30f81 --- /dev/null +++ b/aptsources/distinfo.py @@ -0,0 +1,415 @@ +# distinfo.py - provide meta information for distro repositories +# +# Copyright (c) 2005 Gustavo Noronha Silva <kov@debian.org> +# Copyright (c) 2006-2007 Sebastian Heinlein <glatzor@ubuntu.com> +# +# Authors: Gustavo Noronha Silva <kov@debian.org> +# Sebastian Heinlein <glatzor@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import csv +import errno +import logging +import os +import re +from collections.abc import Iterator +from subprocess import PIPE, Popen +from typing import cast + +import apt_pkg +from apt_pkg import gettext as _ + + +def _expand_template(template: str, csv_path: str) -> Iterator[str]: + """Expand the given template. + + A template file consists of a header, followed by paragraphs + of templated suites, followed by a footer. A templated suite + is any paragraph where the Suite field contains {. + + This function expands all templated suites using the information + found in the CSV file supplied by distro-info-data. + + It yields lines of template info. + """ + + known_suites = set() + + # Copy out any header, and gather all hardcoded suites + with apt_pkg.TagFile(template) as tmpl: + for section in tmpl: + if "X-Exclude-Suites" in section: + known_suites.update(section["X-Exclude-Suites"].split(", ")) + if "Suite" in section: + if "{" in section["Suite"]: + break + + known_suites.add(section["Suite"]) + + yield from str(section).splitlines() + else: + # We did not break, so we did copy all of them + return + + for section in tmpl: + if "Suite" in section: + known_suites.add(section["Suite"]) + + with open(csv_path) as csv_object: + releases = reversed(list(csv.DictReader(csv_object))) + + # Perform template substitution on the middle of the list + for rel in releases: + if rel["series"] in known_suites: + continue + yield "" + rel["version"] = rel["version"].replace(" LTS", "") + with apt_pkg.TagFile(template) as tmpl: + for section in tmpl: + # Only work on template sections, this skips head and tails + if "Suite" not in section or "{" not in section["Suite"]: + continue + if "X-Version" in section: + # Version requirements. Maybe should be made nicer + ver = rel["version"] + if any( + ( + field.startswith("le") + and apt_pkg.version_compare(field[3:], ver) < 0 + ) + or ( + field.startswith("ge") + and apt_pkg.version_compare(field[3:], ver) > 0 + ) + for field in section["X-Version"].split(", ") + ): + continue + + for line in str(section).format(**rel).splitlines(): + if line.startswith("X-Version"): + continue + yield line + + # Copy out remaining suites + with apt_pkg.TagFile(template) as tmpl: + # Skip the head again, we don't want to copy it twice + for section in tmpl: + if "Suite" in section and "{" in section["Suite"]: + break + + for section in tmpl: + # Ignore any template parts and copy the rest out, + # this is the inverse of the template substitution loop + if "Suite" in section and "{" in section["Suite"]: + continue + + yield from str(section).splitlines() + + +class Template: + def __init__(self) -> None: + self.name: str | None = None + self.child = False + self.parents: list[Template] = [] # ref to parent template(s) + self.match_name: str | None = None + self.description: str | None = None + self.base_uri: str | None = None + self.type: str | None = None + self.components: list[Component] = [] + self.children: list[Template] = [] + self.match_uri: str | None = None + self.mirror_set: dict[str, Mirror] = {} + self.distribution: str | None = None + self.available = True + self.official = True + + def has_component(self, comp: str) -> bool: + """Check if the distribution provides the given component""" + return comp in (c.name for c in self.components) + + def is_mirror(self, url: str) -> bool: + """Check if a given url of a repository is a valid mirror""" + proto, hostname, dir = split_url(url) + if hostname in self.mirror_set: + return self.mirror_set[hostname].has_repository(proto, dir) + else: + return False + + +class Component: + def __init__( + self, + name: str, + desc: str | None = None, + long_desc: str | None = None, + parent_component: str | None = None, + ): + self.name = name + self.description = desc + self.description_long = long_desc + self.parent_component = parent_component + + def get_parent_component(self) -> str | None: + return self.parent_component + + def set_parent_component(self, parent: str) -> None: + self.parent_component = parent + + def get_description(self) -> str | None: + if self.description_long is not None: + return self.description_long + elif self.description is not None: + return self.description + else: + return None + + def set_description(self, desc: str) -> None: + self.description = desc + + def set_description_long(self, desc: str) -> None: + self.description_long = desc + + def get_description_long(self) -> str | None: + return self.description_long + + +class Mirror: + """Storage for mirror related information""" + + def __init__( + self, proto: str, hostname: str, dir: str, location: str | None = None + ): + self.hostname = hostname + self.repositories: list[Repository] = [] + self.add_repository(proto, dir) + self.location = location + + def add_repository(self, proto: str, dir: str) -> None: + self.repositories.append(Repository(proto, dir)) + + def get_repositories_for_proto(self, proto: str) -> list["Repository"]: + return [r for r in self.repositories if r.proto == proto] + + def has_repository(self, proto: str, dir: str) -> bool: + if dir is None: + return False + for r in self.repositories: + if r.proto == proto and dir in r.dir: + return True + return False + + def get_repo_urls(self) -> list[str]: + return [r.get_url(self.hostname) for r in self.repositories] + + def get_location(self) -> str | None: + return self.location + + def set_location(self, location: str) -> None: + self.location = location + + +class Repository: + def __init__(self, proto: str, dir: str) -> None: + self.proto = proto + self.dir = dir + + def get_info(self) -> tuple[str, str]: + return self.proto, self.dir + + def get_url(self, hostname: str) -> str: + return f"{self.proto}://{hostname}/{self.dir}" + + +def split_url(url: str) -> list[str]: + """split a given URL into the protocoll, the hostname and the dir part""" + split = re.split(":*\\/+", url, maxsplit=2) + while len(split) < 3: + split.append(None) + return split + + +class DistInfo: + def __init__( + self, + dist: str | None = None, + base_dir: str = "/usr/share/python-apt/templates", + ): + self.metarelease_uri = "" + self.templates: list[Template] = [] + self.arch = apt_pkg.config.find("APT::Architecture") + + location = None + match_loc = re.compile(r"^#LOC:(.+)$") + match_mirror_line = re.compile( + r"^(#LOC:.+)|(((http)|(ftp)|(rsync)|(file)|(mirror)|(https))://" + r"[A-Za-z0-9/\.:\-_@]+)$" + ) + # match_mirror_line = re.compile(r".+") + + if not dist: + try: + dist = ( + Popen( + ["lsb_release", "-i", "-s"], + universal_newlines=True, + stdout=PIPE, + ) + .communicate()[0] + .strip() + ) + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("lsb_release failed, using defaults: %s" % exc) + dist = "Debian" + + self.dist = dist + + map_mirror_sets = {} + + dist_fname = f"{base_dir}/{dist}.info" + csv_fname = f"/usr/share/distro-info/{dist.lower()}.csv" + + # FIXME: Logic doesn't work with types. + template = cast(Template, None) + component = cast(Component, None) + for line in _expand_template(dist_fname, csv_fname): + tokens = line.split(":", 1) + if len(tokens) < 2: + continue + field = tokens[0].strip() + value = tokens[1].strip() + if field == "ChangelogURI": + self.changelogs_uri = _(value) + elif field == "MetaReleaseURI": + self.metarelease_uri = value + elif field == "Suite": + self.finish_template(template, component) + component = cast(Component, None) # FIXME + template = Template() + template.name = value + template.distribution = dist + template.match_name = "^%s$" % value + elif field == "MatchName": + template.match_name = value + elif field == "ParentSuite": + template.child = True + for nanny in self.templates: + # look for parent and add back ref to it + if nanny.name == value: + template.parents.append(nanny) + nanny.children.append(template) + elif field == "Available": + template.available = apt_pkg.string_to_bool(value) + elif field == "Official": + template.official = apt_pkg.string_to_bool(value) + elif field == "RepositoryType": + template.type = value + elif field == "BaseURI" and not template.base_uri: + template.base_uri = value + elif field == "BaseURI-%s" % self.arch: + template.base_uri = value + elif field == "MatchURI" and not template.match_uri: + template.match_uri = value + elif field == "MatchURI-%s" % self.arch: + template.match_uri = value + elif field == "MirrorsFile" or field == "MirrorsFile-%s" % self.arch: + # Make the path absolute. + value = ( + os.path.isabs(value) + and value + or os.path.abspath(os.path.join(base_dir, value)) + ) + if value not in map_mirror_sets: + mirror_set: dict[str, Mirror] = {} + try: + with open(value) as value_f: + mirror_data = list( + filter( + match_mirror_line.match, + [x.strip() for x in value_f], + ) + ) + except Exception: + print(f"WARNING: Failed to read mirror file {value}") + mirror_data = [] + for line in mirror_data: + if line.startswith("#LOC:"): + location = match_loc.sub(r"\1", line) + continue + (proto, hostname, dir) = split_url(line) + if hostname in mirror_set: + mirror_set[hostname].add_repository(proto, dir) + else: + mirror_set[hostname] = Mirror( + proto, hostname, dir, location + ) + map_mirror_sets[value] = mirror_set + template.mirror_set = map_mirror_sets[value] + elif field == "Description": + template.description = _(value) + elif field == "Component": + if component and not template.has_component(component.name): + template.components.append(component) + component = Component(value) + elif field == "CompDescription": + component.set_description(_(value)) + elif field == "CompDescriptionLong": + component.set_description_long(_(value)) + elif field == "ParentComponent": + component.set_parent_component(value) + self.finish_template(template, component) + template = cast(Template, None) + component = cast(Component, None) + + def finish_template(self, template: Template, component: Component | None) -> None: + "finish the current tempalte" + if not template: + return + # reuse some properties of the parent template + if template.match_uri is None and template.child: + for t in template.parents: + if t.match_uri: + template.match_uri = t.match_uri + break + if template.mirror_set == {} and template.child: + for t in template.parents: + if t.match_uri: + template.mirror_set = t.mirror_set + break + if component and not template.has_component(component.name): + template.components.append(component) + component = None + # the official attribute is inherited + for t in template.parents: + template.official = t.official + self.templates.append(template) + + +if __name__ == "__main__": + d = DistInfo("Ubuntu", "/usr/share/python-apt/templates") + logging.info(d.changelogs_uri) + for template in d.templates: + logging.info("\nSuite: %s" % template.name) + logging.info("Desc: %s" % template.description) + logging.info("BaseURI: %s" % template.base_uri) + logging.info("MatchURI: %s" % template.match_uri) + if template.mirror_set != {}: + logging.info("Mirrors: %s" % list(template.mirror_set.keys())) + for comp in template.components: + logging.info(f" {comp.name} -{comp.description} -{comp.description_long}") + for child in template.children: + logging.info(" %s" % child.description) diff --git a/aptsources/distro.py b/aptsources/distro.py new file mode 100644 index 0000000..546d0e7 --- /dev/null +++ b/aptsources/distro.py @@ -0,0 +1,648 @@ +# distro.py - Provide a distro abstraction of the sources.list +# +# Copyright (c) 2004-2009 Canonical Ltd. +# Copyright (c) 2006-2007 Sebastian Heinlein +# Copyright (c) 2016 Harald Sitter +# +# Authors: Sebastian Heinlein <glatzor@ubuntu.com> +# Michael Vogt <mvo@debian.org> +# Harald Sitter <sitter@kde.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import gettext +import logging +import os +import re +import shlex +import warnings +from xml.etree.ElementTree import ElementTree + +from apt_pkg import gettext as _ + + +class NoDistroTemplateException(Exception): + pass + + +class Distribution: + def __init__(self, id, codename, description, release, is_like=[]): + """Container for distribution specific informations""" + # LSB information + self.id = id + self.codename = codename + self.description = description + self.release = release + self.is_like = is_like + + self.binary_type = "deb" + self.source_type = "deb-src" + + def get_sources(self, sourceslist): + """ + Find the corresponding template, main and child sources + for the distribution + """ + + self.sourceslist = sourceslist + # corresponding sources + self.source_template = None + self.child_sources = [] + self.main_sources = [] + self.disabled_sources = [] + self.cdrom_sources = [] + self.download_comps = [] + self.enabled_comps = [] + self.cdrom_comps = [] + self.used_media = [] + self.get_source_code = False + self.source_code_sources = [] + + # location of the sources + self.default_server = "" + self.main_server = "" + self.nearest_server = "" + self.used_servers = [] + + # find the distro template + for template in self.sourceslist.matcher.templates: + if self.is_codename(template.name) and template.distribution == self.id: + # print "yeah! found a template for %s" % self.description + # print template.description, template.base_uri, \ + # template.components + self.source_template = template + break + if self.source_template is None: + raise NoDistroTemplateException( + "Error: could not find a distribution template for %s/%s" + % (self.id, self.codename) + ) + + # find main and child sources + media = [] + comps = [] + cdrom_comps = [] + enabled_comps = [] + # source_code = [] + for source in self.sourceslist.exploded_list(): + if ( + not source.invalid + and self.is_codename(source.dist) + and source.template + and source.template.official + and self.is_codename(source.template.name) + ): + # print "yeah! found a distro repo: %s" % source.line + # cdroms need do be handled differently + if source.uri.startswith("cdrom:") and not source.disabled: + self.cdrom_sources.append(source) + cdrom_comps.extend(source.comps) + elif source.uri.startswith("cdrom:") and source.disabled: + self.cdrom_sources.append(source) + elif source.type == self.binary_type and not source.disabled: + self.main_sources.append(source) + comps.extend(source.comps) + media.append(source.uri) + elif source.type == self.binary_type and source.disabled: + self.disabled_sources.append(source) + elif source.type == self.source_type and not source.disabled: + self.source_code_sources.append(source) + elif source.type == self.source_type and source.disabled: + self.disabled_sources.append(source) + if not source.invalid and source.template in self.source_template.children: + if not source.disabled and source.type == self.binary_type: + self.child_sources.append(source) + elif not source.disabled and source.type == self.source_type: + self.source_code_sources.append(source) + else: + self.disabled_sources.append(source) + self.download_comps = set(comps) + self.cdrom_comps = set(cdrom_comps) + enabled_comps.extend(comps) + enabled_comps.extend(cdrom_comps) + self.enabled_comps = set(enabled_comps) + self.used_media = set(media) + self.get_mirrors() + + def get_mirrors(self, mirror_template=None): + """ + Provide a set of mirrors where you can get the distribution from + """ + # the main server is stored in the template + self.main_server = self.source_template.base_uri + + # other used servers + for medium in self.used_media: + if not medium.startswith("cdrom:"): + # seems to be a network source + self.used_servers.append(medium) + + if len(self.main_sources) == 0: + self.default_server = self.main_server + else: + self.default_server = self.main_sources[0].uri + + # get a list of country codes and real names + self.countries = {} + fname = "/usr/share/xml/iso-codes/iso_3166.xml" + if os.path.exists(fname): + et = ElementTree(file=fname) + # python2.6 compat, the next two lines can get removed + # once we do not use py2.6 anymore + if getattr(et, "iter", None) is None: + et.iter = et.getiterator + it = et.iter("iso_3166_entry") + for elm in it: + try: + descr = elm.attrib["common_name"] + except KeyError: + descr = elm.attrib["name"] + try: + code = elm.attrib["alpha_2_code"] + except KeyError: + code = elm.attrib["alpha_3_code"] + self.countries[code.lower()] = gettext.dgettext("iso_3166", descr) + + # try to guess the nearest mirror from the locale + self.country = None + self.country_code = None + locale = os.getenv("LANG", default="en_UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a + 1 : z].lower() + + if mirror_template: + self.nearest_server = mirror_template % country_code + + if country_code in self.countries: + self.country = self.countries[country_code] + self.country_code = country_code + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Customize for different distributions""" + country = None + i = server.find("://") + li = server.find(".archive.ubuntu.com") + if i != -1 and li != -1: + country = server[i + len("://") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return _("Server for %s") % self.countries[country] + else: + return "%s" % server.rstrip("/ ") + + def get_server_list(self): + """Return a list of used and suggested servers""" + + def compare_mirrors(mir1, mir2): + """Helper function that handles comaprision of mirror urls + that could contain trailing slashes""" + return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) + + # Store all available servers: + # Name, URI, active + mirrors = [] + if len(self.used_servers) < 1 or ( + len(self.used_servers) == 1 + and compare_mirrors(self.used_servers[0], self.main_server) + ): + mirrors.append([_("Main server"), self.main_server, True]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + elif len(self.used_servers) == 1 and not compare_mirrors( + self.used_servers[0], self.main_server + ): + mirrors.append([_("Main server"), self.main_server, False]) + # Only one server is used + server = self.used_servers[0] + + # Append the nearest server if it's not already used + if self.nearest_server: + if not compare_mirrors(server, self.nearest_server): + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + if server: + mirrors.append([self._get_mirror_name(server), server, True]) + + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Main server"), self.main_server, False]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + mirror_entry = [self._get_mirror_name(server), server, False] + if compare_mirrors(server, self.nearest_server) or compare_mirrors( + server, self.main_server + ): + continue + elif mirror_entry not in mirrors: + mirrors.append(mirror_entry) + + return mirrors + + def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): + """ + Add distribution specific sources + """ + if uri is None: + # FIXME: Add support for the server selector + uri = self.default_server + if dist is None: + dist = self.codename + if comps is None: + comps = list(self.enabled_comps) + if type is None: + type = self.binary_type + + parent = None + file = None + for parent in reversed(self.child_sources) or reversed(self.main_sources): + file = parent.file + break + + new_source = self.sourceslist.add( + type, uri, dist, comps, comment, parent=parent, file=file + ) + # if source code is enabled add a deb-src line after the new + # source + if self.get_source_code and type == self.binary_type: + self.sourceslist.add( + self.source_type, + uri, + dist, + comps, + comment, + file=new_source.file, + parent=new_source, + pos=self.sourceslist.list.index(new_source) + 1, + ) + + def enable_component(self, comp): + """ + Enable a component in all main, child and source code sources + (excluding cdrom based sources) + + comp: the component that should be enabled + """ + comps = list([comp]) + # look for parent components that we may have to add + for source in self.main_sources: + for c in source.template.components: + if c.name == comp and c.parent_component: + if c.parent_component not in comps: + comps.append(c.parent_component) + for c in comps: + self._enable_component(c) + + def _enable_component(self, comp): + def add_component_only_once(source, comps_per_dist): + """ + Check if we already added the component to the repository, since + a repository could be splitted into different apt lines. If not + add the component + """ + # if we don't have that distro, just return (can happen for e.g. + # dapper-update only in deb-src + if source.dist not in comps_per_dist: + return + # if we have seen this component already for this distro, + # return (nothing to do) + if comp in comps_per_dist[source.dist]: + return + # add it + source.comps = source.comps + [comp] + comps_per_dist[source.dist].add(comp) + + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + # store what comps are enabled already per distro (where distro is + # e.g. "dapper", "dapper-updates") + comps_per_dist = {} + comps_per_sdist = {} + for s in sources: + if s.type == self.binary_type: + if s.dist not in comps_per_dist: + comps_per_dist[s.dist] = set() + for c in s.comps: + comps_per_dist[s.dist].add(c) + for s in self.source_code_sources: + if s.type == self.source_type: + if s.dist not in comps_per_sdist: + comps_per_sdist[s.dist] = set() + for c in s.comps: + comps_per_sdist[s.dist].add(c) + + # check if there is a main source at all + if len(self.main_sources) < 1: + # create a new main source + self.add_source(comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in sources: + add_component_only_once(source, comps_per_dist) + + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + # check if there is a main source code source at all + if self.get_source_code: + if len(self.source_code_sources) < 1: + # create a new main source + self.add_source(type=self.source_type, comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + def disable_component(self, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + """ + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + if comp in self.cdrom_comps: + sources = [] + sources.extend(self.main_sources) + for source in sources: + if comp in source.comps: + comps = source.comps + comps.remove(comp) + source.comps = comps + if len(source.comps) < 1: + self.sourceslist.remove(source) + + def change_server(self, uri): + """Change the server of all distro specific sources to + a given host""" + + def change_server_of_source(source, uri, seen): + # Avoid creating duplicate entries + source.uri = uri + for comp in source.comps: + if [source.uri, source.dist, comp] in seen: + source.comps.remove(comp) + else: + seen.append([source.uri, source.dist, comp]) + if len(source.comps) < 1: + self.sourceslist.remove(source) + + seen_binary = [] + seen_source = [] + self.default_server = uri + for source in self.main_sources: + change_server_of_source(source, uri, seen_binary) + for source in self.child_sources: + # Do not change the forces server of a child source + if ( + source.template.base_uri is None + or source.template.base_uri != source.uri + ): + change_server_of_source(source, uri, seen_binary) + for source in self.source_code_sources: + change_server_of_source(source, uri, seen_source) + + def is_codename(self, name): + """Compare a given name with the release codename.""" + if name == self.codename: + return True + else: + return False + + +class DebianDistribution(Distribution): + """Class to support specific Debian features""" + + def is_codename(self, name): + """Compare a given name with the release codename and check if + if it can be used as a synonym for a development releases""" + if name == self.codename or self.release in ("testing", "unstable"): + return True + else: + return False + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Debian specific""" + country = None + i = server.find("://ftp.") + li = server.find(".debian.org") + if i != -1 and li != -1: + country = server[i + len("://ftp.") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return ( + _("Server for %s") + % gettext.dgettext( + "iso_3166", self.countries[country].rstrip() + ).rstrip() + ) + else: + return "%s" % server.rstrip("/ ") + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://ftp.%s.debian.org/debian/" + ) + + +class UbuntuDistribution(Distribution): + """Class to support specific Ubuntu features""" + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/" + ) + + +class UbuntuRTMDistribution(UbuntuDistribution): + """Class to support specific Ubuntu RTM features""" + + def get_mirrors(self): + self.main_server = self.source_template.base_uri + + +def _lsb_release(): + """Call lsb_release --idrc and return a mapping.""" + import errno + from subprocess import PIPE, Popen + + result = { + "Codename": "sid", + "Distributor ID": "Debian", + "Description": "Debian GNU/Linux unstable (sid)", + "Release": "unstable", + } + try: + out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0] + # Convert to unicode string, needed for Python 3.1 + out = out.decode("utf-8") + result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line) + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("lsb_release failed, using defaults:" % exc) + return result + + +def _system_image_channel(): + """Get the current channel from system-image-cli -i if possible.""" + import errno + from subprocess import DEVNULL, PIPE, Popen + + try: + out = Popen( + ["system-image-cli", "-i"], + stdout=PIPE, + stderr=DEVNULL, + universal_newlines=True, + ).communicate()[0] + for line in out.splitlines(): + if line.startswith("channel: "): + return line.split(": ", 1)[1] + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("system-image-cli failed, using defaults: %s" % exc) + return None + + +class _OSRelease: + DEFAULT_OS_RELEASE_FILE = "/etc/os-release" + OS_RELEASE_FILE = "/etc/os-release" + + def __init__(self, lsb_compat=True): + self.result = {} + self.valid = False + self.file = _OSRelease.OS_RELEASE_FILE + + if not os.path.isfile(self.file): + return + + self.parse() + self.valid = True + + if lsb_compat: + self.inject_lsb_compat() + + def inject_lsb_compat(self): + self.result["Distributor ID"] = self.result["ID"] + self.result["Description"] = self.result["PRETTY_NAME"] + # Optionals as per os-release spec. + self.result["Codename"] = self.result.get("VERSION_CODENAME") + if not self.result["Codename"]: + # Transient Ubuntu 16.04 field (LP: #1598212) + self.result["Codename"] = self.result.get("UBUNTU_CODENAME") + self.result["Release"] = self.result.get("VERSION_ID") + + def parse(self): + f = open(self.file) + for line in f: + line = line.strip() + if not line: + continue + self.parse_entry(*line.split("=", 1)) + f.close() + + def parse_entry(self, key, value): + value = self.parse_value(value) # Values can be shell strings... + if key == "ID_LIKE" and isinstance(value, str): + # ID_LIKE is specified as quoted space-separated list. This will + # be parsed as string that we need to split manually. + value = value.split(" ") + self.result[key] = value + + def parse_value(self, value): + values = shlex.split(value) + if len(values) == 1: + return values[0] + return values + + +def get_distro(id=None, codename=None, description=None, release=None, is_like=[]): + """ + Check the currently used distribution and return the corresponding + distriubtion class that supports distro specific features. + + If no paramter are given the distro will be auto detected via + a call to lsb-release + """ + # make testing easier + if not (id and codename and description and release): + if id or codename or description or release: + warnings.warn( + "Provided only a subset of arguments", DeprecationWarning, stacklevel=2 + ) + os_release = _OSRelease() + os_result = [] + lsb_result = _lsb_release() + if os_release.valid: + os_result = os_release.result + # TODO: We cannot presently use os-release to fully replace lsb_release + # because os-release's ID, VERSION_ID and VERSION_CODENAME fields + # are specified as lowercase. In lsb_release they can be upcase + # or captizalized. So, switching to os-release would consitute + # a behavior break a which point lsb_release support should be + # fully removed. + # This in particular is a problem for template matching, as this + # matches against Distribution objects and depends on string + # case. + lsb_result = _lsb_release() + id = lsb_result["Distributor ID"] + codename = lsb_result["Codename"] + description = lsb_result["Description"] + release = lsb_result["Release"] + # Not available with LSB, use get directly. + is_like = os_result.get("ID_LIKE", []) + if id == "Ubuntu": + channel = _system_image_channel() + if channel is not None and "ubuntu-rtm/" in channel: + id = "Ubuntu-RTM" + codename = channel.rsplit("/", 1)[1].split("-", 1)[0] + description = codename + release = codename + if id == "Ubuntu": + return UbuntuDistribution(id, codename, description, release, is_like) + if id == "Ubuntu-RTM": + return UbuntuRTMDistribution(id, codename, description, release, is_like) + elif id == "Debian": + return DebianDistribution(id, codename, description, release, is_like) + else: + return Distribution(id, codename, description, release, is_like) diff --git a/aptsources/sourceslist.py b/aptsources/sourceslist.py new file mode 100644 index 0000000..b227690 --- /dev/null +++ b/aptsources/sourceslist.py @@ -0,0 +1,1083 @@ +# sourceslist.py - Provide an abstraction of the sources.list +# +# Copyright (c) 2004-2023 Canonical Ltd. +# Copyright (c) 2004 Michiel Sikkes +# Copyright (c) 2006-2007 Sebastian Heinlein +# +# Authors: Michiel Sikkes <michiel@eyesopened.nl> +# Michael Vogt <mvo@debian.org> +# Sebastian Heinlein <glatzor@ubuntu.com> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import builtins +import glob +import io +import logging +import os.path +import re +import shutil +import time +import weakref +from collections.abc import Callable, Iterable, Iterator +from typing import Any, Generic, Optional, TypeVar, Union + +import apt_pkg + +from . import _deb822 +from .distinfo import DistInfo, Template + +# from apt_pkg import gettext as _ + +T = TypeVar("T") + +# some global helpers + +__all__ = [ + "is_mirror", + "Deb822SourceEntry", + "SourceEntry", + "NullMatcher", + "SourcesList", + "SourceEntryMatcher", +] + + +def is_mirror(master_uri: str, compare_uri: str) -> bool: + """check if the given add_url is idential or a mirror of orig_uri e.g.: + master_uri = archive.ubuntu.com + compare_uri = de.archive.ubuntu.com + -> True + """ + # remove traling spaces and "/" + compare_uri = compare_uri.rstrip("/ ") + master_uri = master_uri.rstrip("/ ") + # uri is identical + if compare_uri == master_uri: + # print "Identical" + return True + # add uri is a master site and orig_uri has the from "XX.mastersite" + # (e.g. de.archive.ubuntu.com) + try: + compare_srv = compare_uri.split("//")[1] + master_srv = master_uri.split("//")[1] + # print "%s == %s " % (add_srv, orig_srv) + except IndexError: # ok, somethings wrong here + # print "IndexError" + return False + # remove the leading "<country>." (if any) and see if that helps + if "." in compare_srv and compare_srv[compare_srv.index(".") + 1 :] == master_srv: + # print "Mirror" + return True + return False + + +def uniq(s: Iterable[T]) -> list[T]: + """simple and efficient way to return uniq collection + + This is not intended for use with a SourceList. It is provided + for internal use only. It does not have a leading underscore to + not break any old code that uses it; but it should not be used + in new code (and is not listed in __all__).""" + return list(set(s)) + + +class SingleValueProperty(property): + def __init__(self, key: str, doc: str): + self.key = key + self.__doc__ = doc + + def __get__( + self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None + ) -> str | None: + if obj is None: + return self # type: ignore + return obj.section.get(self.key, None) + + def __set__(self, obj: "Deb822SourceEntry", value: str | None) -> None: + if value is None: + del obj.section[self.key] + else: + obj.section[self.key] = value + + +class MultiValueProperty(property): + def __init__(self, key: str, doc: str): + self.key = key + self.__doc__ = doc + + def __get__( + self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None + ) -> list[str]: + if obj is None: + return self # type: ignore + return SourceEntry.mysplit(obj.section.get(self.key, "")) + + def __set__(self, obj: "Deb822SourceEntry", values: list[str]) -> None: + obj.section[self.key] = " ".join(values) + + +class ExplodedEntryProperty(property, Generic[T]): + def __init__(self, parent: T): + self.parent = parent + + def __get__( + self, obj: Optional["ExplodedDeb822SourceEntry"], objtype: type | None = None + ) -> T: + if obj is None: + return self # type: ignore + return self.parent.__get__(obj.parent) # type: ignore + + def __set__(self, obj: "ExplodedDeb822SourceEntry", value: T) -> None: + obj.split_out() + self.parent.__set__(obj.parent, value) # type: ignore + + +def DeprecatedProperty(prop: T) -> T: + return prop + + +def _null_weakref() -> None: + """Behaves like an expired weakref.ref, returning None""" + return None + + +class Deb822SourceEntry: + def __init__( + self, + section: _deb822.Section | str | None, + file: str, + list: Optional["SourcesList"] = None, + ): + if section is None: + self.section = _deb822.Section("") + elif isinstance(section, str): + self.section = _deb822.Section(section) + else: + self.section = section + + self._line = str(self.section) + self.file = file + self.template: Template | None = None # type DistInfo.Suite + self.may_merge = False + self._children = weakref.WeakSet["ExplodedDeb822SourceEntry"]() + + if list: + self._list: Callable[[], SourcesList | None] = weakref.ref(list) + else: + self._list = _null_weakref + + def __eq__(self, other: Any) -> Any: + # FIXME: Implement plurals more correctly + """equal operator for two sources.list entries""" + return ( + self.disabled == other.disabled + and self.type == other.type + and self.uri + and self.uri.rstrip("/") == other.uri.rstrip("/") + and self.dist == other.dist + and self.comps == other.comps + ) + + architectures = MultiValueProperty("Architectures", "The list of architectures") + types = MultiValueProperty("Types", "The list of types") + type = DeprecatedProperty(SingleValueProperty("Types", "The list of types")) + uris = MultiValueProperty("URIs", "URIs in the source") + uri = DeprecatedProperty(SingleValueProperty("URIs", "URIs in the source")) + suites = MultiValueProperty("Suites", "Suites in the source") + dist = DeprecatedProperty(SingleValueProperty("Suites", "Suites in the source")) + comps = MultiValueProperty("Components", "Components in the source") + + @property + def comment(self) -> str: + """Legacy attribute describing the paragraph header.""" + return self.section.header + + @comment.setter + def comment(self, comment: str) -> None: + """Legacy attribute describing the paragraph header.""" + self.section.header = comment + + @property + def trusted(self) -> bool | None: + """Return the value of the Trusted field""" + try: + return apt_pkg.string_to_bool(self.section["Trusted"]) + except KeyError: + return None + + @trusted.setter + def trusted(self, value: bool | None) -> None: + if value is None: + try: + del self.section["Trusted"] + except KeyError: + pass + else: + self.section["Trusted"] = "yes" if value else "no" + + @property + def disabled(self) -> bool: + """Check if Enabled: no is set.""" + return not apt_pkg.string_to_bool(self.section.get("Enabled", "yes")) + + @disabled.setter + def disabled(self, value: bool) -> None: + if value: + self.section["Enabled"] = "no" + else: + try: + del self.section["Enabled"] + except KeyError: + pass + + @property + def invalid(self) -> bool: + """A section is invalid if it doesn't have proper entries.""" + return not self.section + + @property + def line(self) -> str: + """The entire (original) paragraph.""" + return self._line + + def __str__(self) -> str: + return self.str().strip() + + def str(self) -> str: + """Section as a string, newline terminated.""" + return str(self.section) + + def set_enabled(self, enabled: bool) -> None: + """Deprecated (for deb822) accessor for .disabled""" + self.disabled = not enabled + + def merge(self, other: "AnySourceEntry") -> bool: + """Merge the two entries if they are compatible.""" + if ( + not self.may_merge + and self.template is None + and not all(child.template for child in self._children) + ): + return False + if self.file != other.file: + return False + if not isinstance(other, Deb822SourceEntry): + return False + if self.comment != other.comment and not any( + "Added by software-properties" in c for c in (self.comment, other.comment) + ): + return False + + for tag in set(list(self.section.tags) + list(other.section.tags)): + if tag.lower() in ( + "types", + "uris", + "suites", + "components", + "architectures", + "signed-by", + ): + continue + in_self = self.section.get(tag, None) + in_other = other.section.get(tag, None) + if in_self != in_other: + return False + + if ( + sum( + [ + set(self.types) != set(other.types), + set(self.uris) != set(other.uris), + set(self.suites) != set(other.suites), + set(self.comps) != set(other.comps), + set(self.architectures) != set(other.architectures), + ] + ) + > 1 + ): + return False + + for typ in other.types: + if typ not in self.types: + self.types += [typ] + + for uri in other.uris: + if uri not in self.uris: + self.uris += [uri] + + for suite in other.suites: + if suite not in self.suites: + self.suites += [suite] + + for component in other.comps: + if component not in self.comps: + self.comps += [component] + + for arch in other.architectures: + if arch not in self.architectures: + self.architectures += [arch] + + return True + + def _reparent_children(self, to: "Deb822SourceEntry") -> None: + """If we end up being split, check if any of our children need to be reparented to the new parent.""" + for child in self._children: + for typ in to.types: + for uri in to.uris: + for suite in to.suites: + if (child._type, child._uri, child._suite) == (typ, uri, suite): + assert child.parent == self + child._parent = weakref.ref(to) + + +class ExplodedDeb822SourceEntry: + """This represents a bit of a deb822 paragraph corresponding to a legacy sources.list entry""" + + # Mostly we use slots to prevent accidentally assigning unproxied attributes + __slots__ = ["_parent", "_type", "_uri", "_suite", "template", "__weakref__"] + + def __init__(self, parent: Deb822SourceEntry, typ: str, uri: str, suite: str): + self._parent = weakref.ref(parent) + self._type = typ + self._uri = uri + self._suite = suite + self.template = parent.template + parent._children.add(self) + + @property + def parent(self) -> Deb822SourceEntry: + if self._parent is not None: + if (parent := self._parent()) is not None: + return parent + raise ValueError("The parent entry is no longer valid") + + @property + def uri(self) -> str: + self.__check_valid() + return self._uri + + @uri.setter + def uri(self, uri: str) -> None: + self.split_out() + self.parent.uris = [u if u != self._uri else uri for u in self.parent.uris] + self._uri = uri + + @property + def types(self) -> list[str]: + return [self.type] + + @property + def suites(self) -> list[str]: + return [self.dist] + + @property + def uris(self) -> list[str]: + return [self.uri] + + @property + def type(self) -> str: + self.__check_valid() + return self._type + + @type.setter + def type(self, typ: str) -> None: + self.split_out() + self.parent.types = [typ] + self._type = typ + self.__check_valid() + assert self._type == typ + assert self.parent.types == [self._type] + + @property + def dist(self) -> str: + self.__check_valid() + return self._suite + + @dist.setter + def dist(self, suite: str) -> None: + self.split_out() + self.parent.suites = [suite] + self._suite = suite + self.__check_valid() + assert self._suite == suite + assert self.parent.suites == [self._suite] + + def __check_valid(self) -> None: + if self.parent._list() is None: + raise ValueError("The parent entry is dead") + for type in self.parent.types: + for uri in self.parent.uris: + for suite in self.parent.suites: + if (type, uri, suite) == (self._type, self._uri, self._suite): + return + raise ValueError(f"Could not find parent of {self}") + + def split_out(self) -> None: + parent = self.parent + if (parent.types, parent.uris, parent.suites) == ( + [self._type], + [self._uri], + [self._suite], + ): + return + sources_list = parent._list() + if sources_list is None: + raise ValueError("The parent entry is dead") + + try: + index = sources_list.list.index(parent) + except ValueError as e: + raise ValueError( + f"Parent entry for partial deb822 {self} no longer valid" + ) from e + + sources_list.remove(parent) + + reparented = False + for type in reversed(parent.types): + for uri in reversed(parent.uris): + for suite in reversed(parent.suites): + new = Deb822SourceEntry( + section=_deb822.Section(parent.section), + file=parent.file, + list=sources_list, + ) + new.types = [type] + new.uris = [uri] + new.suites = [suite] + new.may_merge = True + + parent._reparent_children(new) + sources_list.list.insert(index, new) + if (type, uri, suite) == (self._type, self._uri, self._suite): + self._parent = weakref.ref(new) + reparented = True + if not reparented: + raise ValueError(f"Could not find parent of {self}") + + def __repr__(self) -> str: + return f"<child {self._type} {self._uri} {self._suite} of {self._parent}" + + architectures = ExplodedEntryProperty(Deb822SourceEntry.architectures) + comps = ExplodedEntryProperty(Deb822SourceEntry.comps) + invalid = ExplodedEntryProperty(Deb822SourceEntry.invalid) + disabled = ExplodedEntryProperty[bool](Deb822SourceEntry.disabled) # type: ignore + trusted = ExplodedEntryProperty(Deb822SourceEntry.trusted) + comment = ExplodedEntryProperty(Deb822SourceEntry.comment) + + def set_enabled(self, enabled: bool) -> None: + """Set the source to enabled.""" + self.disabled = not enabled + + @property + def file(self) -> str: + """Return the file.""" + return self.parent.file + + +class SourceEntry: + """single sources.list entry""" + + def __init__(self, line: str, file: str | None = None): + self.invalid = False # is the source entry valid + self.disabled = False # is it disabled ('#' in front) + self.type = "" # what type (deb, deb-src) + self.architectures: list[str] = [] # architectures + self.trusted: bool | None = None # Trusted + self.uri = "" # base-uri + self.dist = "" # distribution (dapper, edgy, etc) + self.comps: list[str] = [] # list of available componetns (may empty) + self.comment = "" # (optional) comment + self.line = line # the original sources.list line + if file is None: + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file.endswith(".sources"): + raise ValueError("Classic SourceEntry cannot be written to .sources file") + self.file = file # the file that the entry is located in + self.parse(line) + self.template: Template | None = None # type DistInfo.Suite + self.children: list[SourceEntry] = [] + + def __eq__(self, other: Any) -> Any: + """equal operator for two sources.list entries""" + return ( + self.disabled == other.disabled + and self.type == other.type + and self.uri.rstrip("/") == other.uri.rstrip("/") + and self.dist == other.dist + and self.comps == other.comps + ) + + @staticmethod + def mysplit(line: str) -> list[str]: + """a split() implementation that understands the sources.list + format better and takes [] into account (for e.g. cdroms)""" + line = line.strip() + pieces = [] + tmp = "" + # we are inside a [..] block + p_found = False + space_found = False + for i in range(len(line)): + if line[i] == "[": + if space_found: + space_found = False + p_found = True + pieces.append(tmp) + tmp = line[i] + else: + p_found = True + tmp += line[i] + elif line[i] == "]": + p_found = False + tmp += line[i] + elif space_found and not line[i].isspace(): + # we skip one or more space + space_found = False + pieces.append(tmp) + tmp = line[i] + elif line[i].isspace() and not p_found: + # found a whitespace + space_found = True + else: + tmp += line[i] + # append last piece + if len(tmp) > 0: + pieces.append(tmp) + return pieces + + def parse(self, line: str) -> None: + """parse a given sources.list (textual) line and break it up + into the field we have""" + self.line = line + line = line.strip() + # check if the source is enabled/disabled + if line == "" or line == "#": # empty line + self.invalid = True + return + if line[0] == "#": + self.disabled = True + pieces = line[1:].strip().split() + # if it looks not like a disabled deb line return + if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"): + self.invalid = True + return + else: + line = line[1:] + # check for another "#" in the line (this is treated as a comment) + i = line.find("#") + if i > 0: + self.comment = line[i + 1 :] + line = line[:i] + # source is ok, split it and see what we have + pieces = self.mysplit(line) + # Sanity check + if len(pieces) < 3: + self.invalid = True + return + # Type, deb or deb-src + self.type = pieces[0].strip() + # Sanity check + if self.type not in ("deb", "deb-src", "rpm", "rpm-src"): + self.invalid = True + return + + if pieces[1].strip()[0] == "[": + options = pieces.pop(1).strip("[]").split() + for option in options: + try: + key, value = option.split("=", 1) + except Exception: + self.invalid = True + else: + if key == "arch": + self.architectures = value.split(",") + elif key == "trusted": + self.trusted = apt_pkg.string_to_bool(value) + else: + self.invalid = True + + # URI + self.uri = pieces[1].strip() + if len(self.uri) < 1: + self.invalid = True + # distro and components (optional) + # Directory or distro + self.dist = pieces[2].strip() + if len(pieces) > 3: + # List of components + self.comps = pieces[3:] + else: + self.comps = [] + + def set_enabled(self, new_value: bool) -> None: + """set a line to enabled or disabled""" + self.disabled = not new_value + # enable, remove all "#" from the start of the line + if new_value: + self.line = self.line.lstrip().lstrip("#") + else: + # disabled, add a "#" + if self.line.strip()[0] != "#": + self.line = "#" + self.line + + def __str__(self) -> str: + """debug helper""" + return self.str().strip() + + def str(self) -> str: + """return the current line as string""" + if self.invalid: + return self.line + line = "" + if self.disabled: + line = "# " + + line += self.type + + if self.architectures and self.trusted is not None: + line += " [arch={} trusted={}]".format( + ",".join(self.architectures), + "yes" if self.trusted else "no", + ) + elif self.trusted is not None: + line += " [trusted=%s]" % ("yes" if self.trusted else "no") + elif self.architectures: + line += " [arch=%s]" % ",".join(self.architectures) + line += f" {self.uri} {self.dist}" + if len(self.comps) > 0: + line += " " + " ".join(self.comps) + if self.comment != "": + line += " #" + self.comment + line += "\n" + return line + + @property + def types(self) -> list[builtins.str]: + """deb822 compatible accessor for the type""" + return [self.type] + + @property + def uris(self) -> list[builtins.str]: + """deb822 compatible accessor for the uri""" + return [self.uri] + + @property + def suites(self) -> list[builtins.str]: + """deb822 compatible accessor for the suite""" + return [self.dist] + + +AnySourceEntry = Union[SourceEntry, Deb822SourceEntry] +AnyExplodedSourceEntry = Union[ + SourceEntry, Deb822SourceEntry, ExplodedDeb822SourceEntry +] + + +class NullMatcher: + """a Matcher that does nothing""" + + def match(self, s: AnyExplodedSourceEntry) -> bool: + return True + + +class SourcesList: + """represents the full sources.list + sources.list.d file""" + + def __init__( + self, + withMatcher: bool = True, + matcherPath: str = "/usr/share/python-apt/templates/", + *, + deb822: bool = False, + ): + self.list: list[AnySourceEntry] = [] # the actual SourceEntries Type + self.matcher: NullMatcher | SourceEntryMatcher + if withMatcher: + self.matcher = SourceEntryMatcher(matcherPath) + else: + self.matcher = NullMatcher() + self.deb822 = deb822 + self.refresh() + + def refresh(self) -> None: + """update the list of known entries""" + self.list = [] + # read sources.list + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file != "/dev/null" and os.path.exists(file): + self.load(file) + # read sources.list.d + partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + if partsdir != "/dev/null" and os.path.exists(partsdir): + for file in os.listdir(partsdir): + if (self.deb822 and file.endswith(".sources")) or file.endswith( + ".list" + ): + self.load(os.path.join(partsdir, file)) + # check if the source item fits a predefined template + for source in self.list: + if not source.invalid: + self.matcher.match(source) + + def __iter__(self) -> Iterator[AnySourceEntry]: + """simple iterator to go over self.list, returns SourceEntry + types""" + yield from self.list + + def __find( + self, *predicates: Callable[[AnyExplodedSourceEntry], bool], **attrs: Any + ) -> Iterator[AnyExplodedSourceEntry]: + uri = attrs.pop("uri", None) + for source in self.exploded_list(): + if uri and source.uri and uri.rstrip("/") != source.uri.rstrip("/"): + continue + if all(getattr(source, key) == attrs[key] for key in attrs) and all( + predicate(source) for predicate in predicates + ): + yield source + + def add( + self, + type: str, + uri: str, + dist: str, + orig_comps: list[str], + comment: str = "", + pos: int = -1, + file: str | None = None, + architectures: Iterable[str] = [], + parent: AnyExplodedSourceEntry | None = None, + ) -> AnyExplodedSourceEntry: + """ + Add a new source to the sources.list. + The method will search for existing matching repos and will try to + reuse them as far as possible + """ + + type = type.strip() + disabled = type.startswith("#") + if disabled: + type = type[1:].lstrip() + architectures = set(architectures) + # create a working copy of the component list so that + # we can modify it later + comps = orig_comps[:] + sources = self.__find( + lambda s: set(s.architectures) == architectures, + disabled=disabled, + invalid=False, + type=type, + uri=uri, + dist=dist, + ) + # check if we have this source already in the sources.list + for source in sources: + for new_comp in comps: + if new_comp in source.comps: + # we have this component already, delete it + # from the new_comps list + del comps[comps.index(new_comp)] + if len(comps) == 0: + return source + + sources = self.__find( + lambda s: set(s.architectures) == architectures, + invalid=False, + type=type, + uri=uri, + dist=dist, + ) + for source in sources: + if source.disabled == disabled: + # if there is a repo with the same (disabled, type, uri, dist) + # just add the components + if set(source.comps) != set(comps): + source.comps = uniq(source.comps + comps) + return source + elif source.disabled and not disabled: + # enable any matching (type, uri, dist), but disabled repo + if set(source.comps) == set(comps): + source.disabled = False + return source + + new_entry: AnySourceEntry + if file is None: + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if file.endswith(".sources"): + new_entry = Deb822SourceEntry(None, file=file, list=self) + if parent: + parent = getattr(parent, "parent", parent) + assert isinstance(parent, Deb822SourceEntry) + for k in parent.section.tags: + new_entry.section[k] = parent.section[k] + new_entry.types = [type] + new_entry.uris = [uri] + new_entry.suites = [dist] + new_entry.comps = comps + if architectures: + new_entry.architectures = list(architectures) + new_entry.section.header = comment + new_entry.disabled = disabled + else: + # there isn't any matching source, so create a new line and parse it + parts = [ + "#" if disabled else "", + type, + ("[arch=%s]" % ",".join(architectures)) if architectures else "", + uri, + dist, + ] + parts.extend(comps) + if comment: + parts.append("#" + comment) + line = " ".join(part for part in parts if part) + "\n" + + new_entry = SourceEntry(line) + if file is not None: + new_entry.file = file + + self.matcher.match(new_entry) + if pos < 0: + self.list.append(new_entry) + else: + self.list.insert(pos, new_entry) + return new_entry + + def remove(self, source_entry: AnyExplodedSourceEntry) -> None: + """remove the specified entry from the sources.list""" + if isinstance(source_entry, ExplodedDeb822SourceEntry): + source_entry.split_out() + source_entry = source_entry.parent + self.list.remove(source_entry) + + def restore_backup(self, backup_ext: str) -> None: + "restore sources.list files based on the backup extension" + file = apt_pkg.config.find_file("Dir::Etc::sourcelist") + if os.path.exists(file + backup_ext) and os.path.exists(file): + shutil.copy(file + backup_ext, file) + # now sources.list.d + partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts") + for file in glob.glob("%s/*" % partsdir): + if os.path.exists(file + backup_ext): + shutil.copy(file + backup_ext, file) + + def backup(self, backup_ext: str | None = None) -> str: + """make a backup of the current source files, if no backup extension + is given, the current date/time is used (and returned)""" + already_backuped: Iterable[str] = set() + if backup_ext is None: + backup_ext = time.strftime("%y%m%d.%H%M") + for source in self.list: + if source.file not in already_backuped and os.path.exists(source.file): + shutil.copy(source.file, f"{source.file}{backup_ext}") + return backup_ext + + def load(self, file: str) -> None: + """(re)load the current sources""" + try: + with open(file) as f: + if file.endswith(".sources"): + for section in _deb822.File(f): + self.list.append(Deb822SourceEntry(section, file, list=self)) + else: + for line in f: + source = SourceEntry(line, file) + self.list.append(source) + except Exception as exc: + logging.warning(f"could not open file '{file}': {exc}\n") + + def index(self, entry: AnyExplodedSourceEntry) -> int: + if isinstance(entry, ExplodedDeb822SourceEntry): + return self.list.index(entry.parent) + return self.list.index(entry) + + def merge(self) -> None: + """Merge consecutive entries that have been split back together.""" + merged = True + while merged: + i = 0 + merged = False + while i + 1 < len(self.list): + entry = self.list[i] + if isinstance(entry, Deb822SourceEntry): + j = i + 1 + while j < len(self.list): + if entry.merge(self.list[j]): + del self.list[j] + merged = True + else: + j += 1 + i += 1 + + def save(self) -> None: + """save the current sources""" + files: dict[str, io.TextIOWrapper] = {} + # write an empty default config file if there aren't any sources + if len(self.list) == 0: + path = apt_pkg.config.find_file("Dir::Etc::sourcelist") + header = ( + "## See sources.list(5) for more information, especialy\n" + "# Remember that you can only use http, ftp or file URIs\n" + "# CDROMs are managed through the apt-cdrom tool.\n" + ) + + with open(path, "w") as f: + f.write(header) + return + + self.merge() + try: + for source in self.list: + if source.file not in files: + files[source.file] = open(source.file, "w") + elif isinstance(source, Deb822SourceEntry): + files[source.file].write("\n") + files[source.file].write(source.str()) + finally: + for f in files.values(): + f.close() + + def check_for_relations( + self, sources_list: Iterable[AnySourceEntry] + ) -> tuple[list[AnySourceEntry], dict[Template, list[AnySourceEntry]]]: + """get all parent and child channels in the sources list""" + parents = [] + used_child_templates: dict[Template, list[AnySourceEntry]] = {} + for source in sources_list: + # try to avoid checking uninterressting sources + if source.template is None: + continue + # set up a dict with all used child templates and corresponding + # source entries + if source.template.child: + key = source.template + if key not in used_child_templates: + used_child_templates[key] = [] + temp = used_child_templates[key] + temp.append(source) + else: + # store each source with children aka. a parent :) + if len(source.template.children) > 0: + parents.append(source) + # print self.used_child_templates + # print self.parents + return (parents, used_child_templates) + + def exploded_list(self) -> list[AnyExplodedSourceEntry]: + """Present an exploded view of the list where each entry corresponds exactly to a Release file. + + A release file is uniquely identified by the triplet (type, uri, suite). Old style entries + always referred to a single release file, but deb822 entries allow multiple values for each + of those fields. + """ + res: list[AnyExplodedSourceEntry] = [] + for entry in self.list: + if isinstance(entry, SourceEntry): + res.append(entry) + elif ( + len(entry.types) == 1 + and len(entry.uris) == 1 + and len(entry.suites) == 1 + ): + res.append(entry) + else: + for typ in entry.types: + for uri in entry.uris: + for sui in entry.suites: + res.append(ExplodedDeb822SourceEntry(entry, typ, uri, sui)) + self.matcher.match(res[-1]) + + return res + + +class SourceEntryMatcher: + """matcher class to make a source entry look nice + lots of predefined matchers to make it i18n/gettext friendly + """ + + def __init__(self, matcherPath: str): + self.templates: list[Template] = [] + # Get the human readable channel and comp names from the channel .infos + spec_files = glob.glob("%s/*.info" % matcherPath) + for f in spec_files: + f = os.path.basename(f) + i = f.find(".info") + f = f[0:i] + dist = DistInfo(f, base_dir=matcherPath) + for template in dist.templates: + if template.match_uri is not None: + self.templates.append(template) + return + + def match(self, source: AnyExplodedSourceEntry) -> bool: + """Add a matching template to the source""" + found = False + for template in self.templates: + if source.uri is None or source.dist is None: + continue + if ( + template.match_uri is not None + and template.match_name is not None + and source.uri is not None + and source.dist is not None + and re.search(template.match_uri, source.uri) + and re.match(template.match_name, source.dist) + and + # deb is a valid fallback for deb-src (if that is not + # definied, see #760035 + (source.type == template.type or template.type == "deb") + ): + found = True + source.template = template + break + elif ( + template.is_mirror(source.uri) + and template.match_name is not None + and source.dist is not None + and re.match(template.match_name, source.dist) + ): + found = True + source.template = template + break + return found + + +# some simple tests +if __name__ == "__main__": + apt_pkg.init_config() + sources = SourcesList() + + for entry in sources: + logging.info("entry %s" % entry.str()) + # print entry.uri + + mirror = is_mirror( + "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu/" + ) + logging.info("is_mirror(): %s" % mirror) + + logging.info( + is_mirror( + "http://archive.ubuntu.com/ubuntu", "http://de.archive.ubuntu.com/ubuntu/" + ) + ) + logging.info( + is_mirror( + "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu" + ) + ) |