summaryrefslogtreecommitdiffstats
path: root/aptsources
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 18:07:41 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-15 18:07:41 +0000
commit76926159194e180003aa78de97e5f287bf4325a5 (patch)
tree2cea7245cdc3f66355900c820c145eba90598766 /aptsources
parentInitial commit. (diff)
downloadpython-apt-76926159194e180003aa78de97e5f287bf4325a5.tar.xz
python-apt-76926159194e180003aa78de97e5f287bf4325a5.zip
Adding upstream version 2.7.6.upstream/2.7.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'aptsources')
-rw-r--r--aptsources/__init__.py6
-rw-r--r--aptsources/_deb822.py144
-rw-r--r--aptsources/distinfo.py415
-rw-r--r--aptsources/distro.py648
-rw-r--r--aptsources/sourceslist.py1083
5 files changed, 2296 insertions, 0 deletions
diff --git a/aptsources/__init__.py b/aptsources/__init__.py
new file mode 100644
index 0000000..2ccf4fc
--- /dev/null
+++ b/aptsources/__init__.py
@@ -0,0 +1,6 @@
+import apt_pkg
+
+# init the package system, but do not re-initialize config
+if "APT" not in apt_pkg.config:
+ apt_pkg.init_config()
+apt_pkg.init_system()
diff --git a/aptsources/_deb822.py b/aptsources/_deb822.py
new file mode 100644
index 0000000..d3b3a7c
--- /dev/null
+++ b/aptsources/_deb822.py
@@ -0,0 +1,144 @@
+#!/usr/bin/python3
+#
+# Copyright (C) Canonical Ltd
+#
+# SPDX-License-Identifier: GPL-2.0+
+
+"""deb822 parser with support for comment headers and footers."""
+
+import collections
+import io
+import typing
+
+import apt_pkg
+
+T = typing.TypeVar("T")
+
+
+class Section:
+ """A single deb822 section, possibly with comments.
+
+ This represents a single deb822 section.
+ """
+
+ tags: collections.OrderedDict[str, str]
+ _case_mapping: dict[str, str]
+ header: str
+ footer: str
+
+ def __init__(self, section: typing.Union[str, "Section"]):
+ if isinstance(section, Section):
+ self.tags = collections.OrderedDict(section.tags)
+ self._case_mapping = {k.casefold(): k for k in self.tags}
+ self.header = section.header
+ self.footer = section.footer
+ return
+
+ comments = ["", ""]
+ in_section = False
+ trimmed_section = ""
+
+ for line in section.split("\n"):
+ if line.startswith("#"):
+ # remove the leading #
+ line = line[1:]
+ comments[in_section] += line + "\n"
+ continue
+
+ in_section = True
+ trimmed_section += line + "\n"
+
+ self.tags = collections.OrderedDict(apt_pkg.TagSection(trimmed_section))
+ self._case_mapping = {k.casefold(): k for k in self.tags}
+ self.header, self.footer = comments
+
+ def __getitem__(self, key: str) -> str:
+ """Get the value of a field."""
+ return self.tags[self._case_mapping.get(key.casefold(), key)]
+
+ def __delitem__(self, key: str) -> None:
+ """Delete a field"""
+ del self.tags[self._case_mapping.get(key.casefold(), key)]
+
+ def __setitem__(self, key: str, val: str) -> None:
+ """Set the value of a field."""
+ if key.casefold() not in self._case_mapping:
+ self._case_mapping[key.casefold()] = key
+ self.tags[self._case_mapping[key.casefold()]] = val
+
+ def __bool__(self) -> bool:
+ return bool(self.tags)
+
+ @typing.overload
+ def get(self, key: str) -> str | None:
+ ...
+
+ @typing.overload
+ def get(self, key: str, default: T) -> T | str:
+ ...
+
+ def get(self, key: str, default: T | None = None) -> T | None | str:
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+ @staticmethod
+ def __comment_lines(content: str) -> str:
+ return (
+ "\n".join("#" + line for line in content.splitlines()) + "\n"
+ if content
+ else ""
+ )
+
+ def __str__(self) -> str:
+ """Canonical string rendering of this section."""
+ return (
+ self.__comment_lines(self.header)
+ + "".join(f"{k}: {v}\n" for k, v in self.tags.items())
+ + self.__comment_lines(self.footer)
+ )
+
+
+class File:
+ """
+ Parse a given file object into a list of Section objects.
+ """
+
+ def __init__(self, fobj: io.TextIOBase):
+ self.sections = []
+ section = ""
+ for line in fobj:
+ if not line.isspace():
+ # A line is part of the section if it has non-whitespace characters
+ section += line
+ elif section:
+ # Our line is just whitespace and we have gathered section content, so let's write out the section
+ self.sections.append(Section(section))
+ section = ""
+
+ # The final section may not be terminated by an empty line
+ if section:
+ self.sections.append(Section(section))
+
+ def __iter__(self) -> typing.Iterator[Section]:
+ return iter(self.sections)
+
+ def __str__(self) -> str:
+ return "\n".join(str(s) for s in self.sections)
+
+
+if __name__ == "__main__":
+ st = """# Header
+# More header
+K1: V1
+# Inline
+K2: V2
+ # not a comment
+# Footer
+# More footer
+"""
+
+ s = Section(st)
+
+ print(s)
diff --git a/aptsources/distinfo.py b/aptsources/distinfo.py
new file mode 100644
index 0000000..bd30f81
--- /dev/null
+++ b/aptsources/distinfo.py
@@ -0,0 +1,415 @@
+# distinfo.py - provide meta information for distro repositories
+#
+# Copyright (c) 2005 Gustavo Noronha Silva <kov@debian.org>
+# Copyright (c) 2006-2007 Sebastian Heinlein <glatzor@ubuntu.com>
+#
+# Authors: Gustavo Noronha Silva <kov@debian.org>
+# Sebastian Heinlein <glatzor@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+import csv
+import errno
+import logging
+import os
+import re
+from collections.abc import Iterator
+from subprocess import PIPE, Popen
+from typing import cast
+
+import apt_pkg
+from apt_pkg import gettext as _
+
+
+def _expand_template(template: str, csv_path: str) -> Iterator[str]:
+ """Expand the given template.
+
+ A template file consists of a header, followed by paragraphs
+ of templated suites, followed by a footer. A templated suite
+ is any paragraph where the Suite field contains {.
+
+ This function expands all templated suites using the information
+ found in the CSV file supplied by distro-info-data.
+
+ It yields lines of template info.
+ """
+
+ known_suites = set()
+
+ # Copy out any header, and gather all hardcoded suites
+ with apt_pkg.TagFile(template) as tmpl:
+ for section in tmpl:
+ if "X-Exclude-Suites" in section:
+ known_suites.update(section["X-Exclude-Suites"].split(", "))
+ if "Suite" in section:
+ if "{" in section["Suite"]:
+ break
+
+ known_suites.add(section["Suite"])
+
+ yield from str(section).splitlines()
+ else:
+ # We did not break, so we did copy all of them
+ return
+
+ for section in tmpl:
+ if "Suite" in section:
+ known_suites.add(section["Suite"])
+
+ with open(csv_path) as csv_object:
+ releases = reversed(list(csv.DictReader(csv_object)))
+
+ # Perform template substitution on the middle of the list
+ for rel in releases:
+ if rel["series"] in known_suites:
+ continue
+ yield ""
+ rel["version"] = rel["version"].replace(" LTS", "")
+ with apt_pkg.TagFile(template) as tmpl:
+ for section in tmpl:
+ # Only work on template sections, this skips head and tails
+ if "Suite" not in section or "{" not in section["Suite"]:
+ continue
+ if "X-Version" in section:
+ # Version requirements. Maybe should be made nicer
+ ver = rel["version"]
+ if any(
+ (
+ field.startswith("le")
+ and apt_pkg.version_compare(field[3:], ver) < 0
+ )
+ or (
+ field.startswith("ge")
+ and apt_pkg.version_compare(field[3:], ver) > 0
+ )
+ for field in section["X-Version"].split(", ")
+ ):
+ continue
+
+ for line in str(section).format(**rel).splitlines():
+ if line.startswith("X-Version"):
+ continue
+ yield line
+
+ # Copy out remaining suites
+ with apt_pkg.TagFile(template) as tmpl:
+ # Skip the head again, we don't want to copy it twice
+ for section in tmpl:
+ if "Suite" in section and "{" in section["Suite"]:
+ break
+
+ for section in tmpl:
+ # Ignore any template parts and copy the rest out,
+ # this is the inverse of the template substitution loop
+ if "Suite" in section and "{" in section["Suite"]:
+ continue
+
+ yield from str(section).splitlines()
+
+
+class Template:
+ def __init__(self) -> None:
+ self.name: str | None = None
+ self.child = False
+ self.parents: list[Template] = [] # ref to parent template(s)
+ self.match_name: str | None = None
+ self.description: str | None = None
+ self.base_uri: str | None = None
+ self.type: str | None = None
+ self.components: list[Component] = []
+ self.children: list[Template] = []
+ self.match_uri: str | None = None
+ self.mirror_set: dict[str, Mirror] = {}
+ self.distribution: str | None = None
+ self.available = True
+ self.official = True
+
+ def has_component(self, comp: str) -> bool:
+ """Check if the distribution provides the given component"""
+ return comp in (c.name for c in self.components)
+
+ def is_mirror(self, url: str) -> bool:
+ """Check if a given url of a repository is a valid mirror"""
+ proto, hostname, dir = split_url(url)
+ if hostname in self.mirror_set:
+ return self.mirror_set[hostname].has_repository(proto, dir)
+ else:
+ return False
+
+
+class Component:
+ def __init__(
+ self,
+ name: str,
+ desc: str | None = None,
+ long_desc: str | None = None,
+ parent_component: str | None = None,
+ ):
+ self.name = name
+ self.description = desc
+ self.description_long = long_desc
+ self.parent_component = parent_component
+
+ def get_parent_component(self) -> str | None:
+ return self.parent_component
+
+ def set_parent_component(self, parent: str) -> None:
+ self.parent_component = parent
+
+ def get_description(self) -> str | None:
+ if self.description_long is not None:
+ return self.description_long
+ elif self.description is not None:
+ return self.description
+ else:
+ return None
+
+ def set_description(self, desc: str) -> None:
+ self.description = desc
+
+ def set_description_long(self, desc: str) -> None:
+ self.description_long = desc
+
+ def get_description_long(self) -> str | None:
+ return self.description_long
+
+
+class Mirror:
+ """Storage for mirror related information"""
+
+ def __init__(
+ self, proto: str, hostname: str, dir: str, location: str | None = None
+ ):
+ self.hostname = hostname
+ self.repositories: list[Repository] = []
+ self.add_repository(proto, dir)
+ self.location = location
+
+ def add_repository(self, proto: str, dir: str) -> None:
+ self.repositories.append(Repository(proto, dir))
+
+ def get_repositories_for_proto(self, proto: str) -> list["Repository"]:
+ return [r for r in self.repositories if r.proto == proto]
+
+ def has_repository(self, proto: str, dir: str) -> bool:
+ if dir is None:
+ return False
+ for r in self.repositories:
+ if r.proto == proto and dir in r.dir:
+ return True
+ return False
+
+ def get_repo_urls(self) -> list[str]:
+ return [r.get_url(self.hostname) for r in self.repositories]
+
+ def get_location(self) -> str | None:
+ return self.location
+
+ def set_location(self, location: str) -> None:
+ self.location = location
+
+
+class Repository:
+ def __init__(self, proto: str, dir: str) -> None:
+ self.proto = proto
+ self.dir = dir
+
+ def get_info(self) -> tuple[str, str]:
+ return self.proto, self.dir
+
+ def get_url(self, hostname: str) -> str:
+ return f"{self.proto}://{hostname}/{self.dir}"
+
+
+def split_url(url: str) -> list[str]:
+ """split a given URL into the protocoll, the hostname and the dir part"""
+ split = re.split(":*\\/+", url, maxsplit=2)
+ while len(split) < 3:
+ split.append(None)
+ return split
+
+
+class DistInfo:
+ def __init__(
+ self,
+ dist: str | None = None,
+ base_dir: str = "/usr/share/python-apt/templates",
+ ):
+ self.metarelease_uri = ""
+ self.templates: list[Template] = []
+ self.arch = apt_pkg.config.find("APT::Architecture")
+
+ location = None
+ match_loc = re.compile(r"^#LOC:(.+)$")
+ match_mirror_line = re.compile(
+ r"^(#LOC:.+)|(((http)|(ftp)|(rsync)|(file)|(mirror)|(https))://"
+ r"[A-Za-z0-9/\.:\-_@]+)$"
+ )
+ # match_mirror_line = re.compile(r".+")
+
+ if not dist:
+ try:
+ dist = (
+ Popen(
+ ["lsb_release", "-i", "-s"],
+ universal_newlines=True,
+ stdout=PIPE,
+ )
+ .communicate()[0]
+ .strip()
+ )
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ logging.warning("lsb_release failed, using defaults: %s" % exc)
+ dist = "Debian"
+
+ self.dist = dist
+
+ map_mirror_sets = {}
+
+ dist_fname = f"{base_dir}/{dist}.info"
+ csv_fname = f"/usr/share/distro-info/{dist.lower()}.csv"
+
+ # FIXME: Logic doesn't work with types.
+ template = cast(Template, None)
+ component = cast(Component, None)
+ for line in _expand_template(dist_fname, csv_fname):
+ tokens = line.split(":", 1)
+ if len(tokens) < 2:
+ continue
+ field = tokens[0].strip()
+ value = tokens[1].strip()
+ if field == "ChangelogURI":
+ self.changelogs_uri = _(value)
+ elif field == "MetaReleaseURI":
+ self.metarelease_uri = value
+ elif field == "Suite":
+ self.finish_template(template, component)
+ component = cast(Component, None) # FIXME
+ template = Template()
+ template.name = value
+ template.distribution = dist
+ template.match_name = "^%s$" % value
+ elif field == "MatchName":
+ template.match_name = value
+ elif field == "ParentSuite":
+ template.child = True
+ for nanny in self.templates:
+ # look for parent and add back ref to it
+ if nanny.name == value:
+ template.parents.append(nanny)
+ nanny.children.append(template)
+ elif field == "Available":
+ template.available = apt_pkg.string_to_bool(value)
+ elif field == "Official":
+ template.official = apt_pkg.string_to_bool(value)
+ elif field == "RepositoryType":
+ template.type = value
+ elif field == "BaseURI" and not template.base_uri:
+ template.base_uri = value
+ elif field == "BaseURI-%s" % self.arch:
+ template.base_uri = value
+ elif field == "MatchURI" and not template.match_uri:
+ template.match_uri = value
+ elif field == "MatchURI-%s" % self.arch:
+ template.match_uri = value
+ elif field == "MirrorsFile" or field == "MirrorsFile-%s" % self.arch:
+ # Make the path absolute.
+ value = (
+ os.path.isabs(value)
+ and value
+ or os.path.abspath(os.path.join(base_dir, value))
+ )
+ if value not in map_mirror_sets:
+ mirror_set: dict[str, Mirror] = {}
+ try:
+ with open(value) as value_f:
+ mirror_data = list(
+ filter(
+ match_mirror_line.match,
+ [x.strip() for x in value_f],
+ )
+ )
+ except Exception:
+ print(f"WARNING: Failed to read mirror file {value}")
+ mirror_data = []
+ for line in mirror_data:
+ if line.startswith("#LOC:"):
+ location = match_loc.sub(r"\1", line)
+ continue
+ (proto, hostname, dir) = split_url(line)
+ if hostname in mirror_set:
+ mirror_set[hostname].add_repository(proto, dir)
+ else:
+ mirror_set[hostname] = Mirror(
+ proto, hostname, dir, location
+ )
+ map_mirror_sets[value] = mirror_set
+ template.mirror_set = map_mirror_sets[value]
+ elif field == "Description":
+ template.description = _(value)
+ elif field == "Component":
+ if component and not template.has_component(component.name):
+ template.components.append(component)
+ component = Component(value)
+ elif field == "CompDescription":
+ component.set_description(_(value))
+ elif field == "CompDescriptionLong":
+ component.set_description_long(_(value))
+ elif field == "ParentComponent":
+ component.set_parent_component(value)
+ self.finish_template(template, component)
+ template = cast(Template, None)
+ component = cast(Component, None)
+
+ def finish_template(self, template: Template, component: Component | None) -> None:
+ "finish the current tempalte"
+ if not template:
+ return
+ # reuse some properties of the parent template
+ if template.match_uri is None and template.child:
+ for t in template.parents:
+ if t.match_uri:
+ template.match_uri = t.match_uri
+ break
+ if template.mirror_set == {} and template.child:
+ for t in template.parents:
+ if t.match_uri:
+ template.mirror_set = t.mirror_set
+ break
+ if component and not template.has_component(component.name):
+ template.components.append(component)
+ component = None
+ # the official attribute is inherited
+ for t in template.parents:
+ template.official = t.official
+ self.templates.append(template)
+
+
+if __name__ == "__main__":
+ d = DistInfo("Ubuntu", "/usr/share/python-apt/templates")
+ logging.info(d.changelogs_uri)
+ for template in d.templates:
+ logging.info("\nSuite: %s" % template.name)
+ logging.info("Desc: %s" % template.description)
+ logging.info("BaseURI: %s" % template.base_uri)
+ logging.info("MatchURI: %s" % template.match_uri)
+ if template.mirror_set != {}:
+ logging.info("Mirrors: %s" % list(template.mirror_set.keys()))
+ for comp in template.components:
+ logging.info(f" {comp.name} -{comp.description} -{comp.description_long}")
+ for child in template.children:
+ logging.info(" %s" % child.description)
diff --git a/aptsources/distro.py b/aptsources/distro.py
new file mode 100644
index 0000000..546d0e7
--- /dev/null
+++ b/aptsources/distro.py
@@ -0,0 +1,648 @@
+# distro.py - Provide a distro abstraction of the sources.list
+#
+# Copyright (c) 2004-2009 Canonical Ltd.
+# Copyright (c) 2006-2007 Sebastian Heinlein
+# Copyright (c) 2016 Harald Sitter
+#
+# Authors: Sebastian Heinlein <glatzor@ubuntu.com>
+# Michael Vogt <mvo@debian.org>
+# Harald Sitter <sitter@kde.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+import gettext
+import logging
+import os
+import re
+import shlex
+import warnings
+from xml.etree.ElementTree import ElementTree
+
+from apt_pkg import gettext as _
+
+
+class NoDistroTemplateException(Exception):
+ pass
+
+
+class Distribution:
+ def __init__(self, id, codename, description, release, is_like=[]):
+ """Container for distribution specific informations"""
+ # LSB information
+ self.id = id
+ self.codename = codename
+ self.description = description
+ self.release = release
+ self.is_like = is_like
+
+ self.binary_type = "deb"
+ self.source_type = "deb-src"
+
+ def get_sources(self, sourceslist):
+ """
+ Find the corresponding template, main and child sources
+ for the distribution
+ """
+
+ self.sourceslist = sourceslist
+ # corresponding sources
+ self.source_template = None
+ self.child_sources = []
+ self.main_sources = []
+ self.disabled_sources = []
+ self.cdrom_sources = []
+ self.download_comps = []
+ self.enabled_comps = []
+ self.cdrom_comps = []
+ self.used_media = []
+ self.get_source_code = False
+ self.source_code_sources = []
+
+ # location of the sources
+ self.default_server = ""
+ self.main_server = ""
+ self.nearest_server = ""
+ self.used_servers = []
+
+ # find the distro template
+ for template in self.sourceslist.matcher.templates:
+ if self.is_codename(template.name) and template.distribution == self.id:
+ # print "yeah! found a template for %s" % self.description
+ # print template.description, template.base_uri, \
+ # template.components
+ self.source_template = template
+ break
+ if self.source_template is None:
+ raise NoDistroTemplateException(
+ "Error: could not find a distribution template for %s/%s"
+ % (self.id, self.codename)
+ )
+
+ # find main and child sources
+ media = []
+ comps = []
+ cdrom_comps = []
+ enabled_comps = []
+ # source_code = []
+ for source in self.sourceslist.exploded_list():
+ if (
+ not source.invalid
+ and self.is_codename(source.dist)
+ and source.template
+ and source.template.official
+ and self.is_codename(source.template.name)
+ ):
+ # print "yeah! found a distro repo: %s" % source.line
+ # cdroms need do be handled differently
+ if source.uri.startswith("cdrom:") and not source.disabled:
+ self.cdrom_sources.append(source)
+ cdrom_comps.extend(source.comps)
+ elif source.uri.startswith("cdrom:") and source.disabled:
+ self.cdrom_sources.append(source)
+ elif source.type == self.binary_type and not source.disabled:
+ self.main_sources.append(source)
+ comps.extend(source.comps)
+ media.append(source.uri)
+ elif source.type == self.binary_type and source.disabled:
+ self.disabled_sources.append(source)
+ elif source.type == self.source_type and not source.disabled:
+ self.source_code_sources.append(source)
+ elif source.type == self.source_type and source.disabled:
+ self.disabled_sources.append(source)
+ if not source.invalid and source.template in self.source_template.children:
+ if not source.disabled and source.type == self.binary_type:
+ self.child_sources.append(source)
+ elif not source.disabled and source.type == self.source_type:
+ self.source_code_sources.append(source)
+ else:
+ self.disabled_sources.append(source)
+ self.download_comps = set(comps)
+ self.cdrom_comps = set(cdrom_comps)
+ enabled_comps.extend(comps)
+ enabled_comps.extend(cdrom_comps)
+ self.enabled_comps = set(enabled_comps)
+ self.used_media = set(media)
+ self.get_mirrors()
+
+ def get_mirrors(self, mirror_template=None):
+ """
+ Provide a set of mirrors where you can get the distribution from
+ """
+ # the main server is stored in the template
+ self.main_server = self.source_template.base_uri
+
+ # other used servers
+ for medium in self.used_media:
+ if not medium.startswith("cdrom:"):
+ # seems to be a network source
+ self.used_servers.append(medium)
+
+ if len(self.main_sources) == 0:
+ self.default_server = self.main_server
+ else:
+ self.default_server = self.main_sources[0].uri
+
+ # get a list of country codes and real names
+ self.countries = {}
+ fname = "/usr/share/xml/iso-codes/iso_3166.xml"
+ if os.path.exists(fname):
+ et = ElementTree(file=fname)
+ # python2.6 compat, the next two lines can get removed
+ # once we do not use py2.6 anymore
+ if getattr(et, "iter", None) is None:
+ et.iter = et.getiterator
+ it = et.iter("iso_3166_entry")
+ for elm in it:
+ try:
+ descr = elm.attrib["common_name"]
+ except KeyError:
+ descr = elm.attrib["name"]
+ try:
+ code = elm.attrib["alpha_2_code"]
+ except KeyError:
+ code = elm.attrib["alpha_3_code"]
+ self.countries[code.lower()] = gettext.dgettext("iso_3166", descr)
+
+ # try to guess the nearest mirror from the locale
+ self.country = None
+ self.country_code = None
+ locale = os.getenv("LANG", default="en_UK")
+ a = locale.find("_")
+ z = locale.find(".")
+ if z == -1:
+ z = len(locale)
+ country_code = locale[a + 1 : z].lower()
+
+ if mirror_template:
+ self.nearest_server = mirror_template % country_code
+
+ if country_code in self.countries:
+ self.country = self.countries[country_code]
+ self.country_code = country_code
+
+ def _get_mirror_name(self, server):
+ """Try to get a human readable name for the main mirror of a country
+ Customize for different distributions"""
+ country = None
+ i = server.find("://")
+ li = server.find(".archive.ubuntu.com")
+ if i != -1 and li != -1:
+ country = server[i + len("://") : li]
+ if country in self.countries:
+ # TRANSLATORS: %s is a country
+ return _("Server for %s") % self.countries[country]
+ else:
+ return "%s" % server.rstrip("/ ")
+
+ def get_server_list(self):
+ """Return a list of used and suggested servers"""
+
+ def compare_mirrors(mir1, mir2):
+ """Helper function that handles comaprision of mirror urls
+ that could contain trailing slashes"""
+ return re.match(mir1.strip("/ "), mir2.rstrip("/ "))
+
+ # Store all available servers:
+ # Name, URI, active
+ mirrors = []
+ if len(self.used_servers) < 1 or (
+ len(self.used_servers) == 1
+ and compare_mirrors(self.used_servers[0], self.main_server)
+ ):
+ mirrors.append([_("Main server"), self.main_server, True])
+ if self.nearest_server:
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ elif len(self.used_servers) == 1 and not compare_mirrors(
+ self.used_servers[0], self.main_server
+ ):
+ mirrors.append([_("Main server"), self.main_server, False])
+ # Only one server is used
+ server = self.used_servers[0]
+
+ # Append the nearest server if it's not already used
+ if self.nearest_server:
+ if not compare_mirrors(server, self.nearest_server):
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ if server:
+ mirrors.append([self._get_mirror_name(server), server, True])
+
+ elif len(self.used_servers) > 1:
+ # More than one server is used. Since we don't handle this case
+ # in the user interface we set "custom servers" to true and
+ # append a list of all used servers
+ mirrors.append([_("Main server"), self.main_server, False])
+ if self.nearest_server:
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ mirrors.append([_("Custom servers"), None, True])
+ for server in self.used_servers:
+ mirror_entry = [self._get_mirror_name(server), server, False]
+ if compare_mirrors(server, self.nearest_server) or compare_mirrors(
+ server, self.main_server
+ ):
+ continue
+ elif mirror_entry not in mirrors:
+ mirrors.append(mirror_entry)
+
+ return mirrors
+
+ def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""):
+ """
+ Add distribution specific sources
+ """
+ if uri is None:
+ # FIXME: Add support for the server selector
+ uri = self.default_server
+ if dist is None:
+ dist = self.codename
+ if comps is None:
+ comps = list(self.enabled_comps)
+ if type is None:
+ type = self.binary_type
+
+ parent = None
+ file = None
+ for parent in reversed(self.child_sources) or reversed(self.main_sources):
+ file = parent.file
+ break
+
+ new_source = self.sourceslist.add(
+ type, uri, dist, comps, comment, parent=parent, file=file
+ )
+ # if source code is enabled add a deb-src line after the new
+ # source
+ if self.get_source_code and type == self.binary_type:
+ self.sourceslist.add(
+ self.source_type,
+ uri,
+ dist,
+ comps,
+ comment,
+ file=new_source.file,
+ parent=new_source,
+ pos=self.sourceslist.list.index(new_source) + 1,
+ )
+
+ def enable_component(self, comp):
+ """
+ Enable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+
+ comp: the component that should be enabled
+ """
+ comps = list([comp])
+ # look for parent components that we may have to add
+ for source in self.main_sources:
+ for c in source.template.components:
+ if c.name == comp and c.parent_component:
+ if c.parent_component not in comps:
+ comps.append(c.parent_component)
+ for c in comps:
+ self._enable_component(c)
+
+ def _enable_component(self, comp):
+ def add_component_only_once(source, comps_per_dist):
+ """
+ Check if we already added the component to the repository, since
+ a repository could be splitted into different apt lines. If not
+ add the component
+ """
+ # if we don't have that distro, just return (can happen for e.g.
+ # dapper-update only in deb-src
+ if source.dist not in comps_per_dist:
+ return
+ # if we have seen this component already for this distro,
+ # return (nothing to do)
+ if comp in comps_per_dist[source.dist]:
+ return
+ # add it
+ source.comps = source.comps + [comp]
+ comps_per_dist[source.dist].add(comp)
+
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ # store what comps are enabled already per distro (where distro is
+ # e.g. "dapper", "dapper-updates")
+ comps_per_dist = {}
+ comps_per_sdist = {}
+ for s in sources:
+ if s.type == self.binary_type:
+ if s.dist not in comps_per_dist:
+ comps_per_dist[s.dist] = set()
+ for c in s.comps:
+ comps_per_dist[s.dist].add(c)
+ for s in self.source_code_sources:
+ if s.type == self.source_type:
+ if s.dist not in comps_per_sdist:
+ comps_per_sdist[s.dist] = set()
+ for c in s.comps:
+ comps_per_sdist[s.dist].add(c)
+
+ # check if there is a main source at all
+ if len(self.main_sources) < 1:
+ # create a new main source
+ self.add_source(comps=["%s" % comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in sources:
+ add_component_only_once(source, comps_per_dist)
+
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ # check if there is a main source code source at all
+ if self.get_source_code:
+ if len(self.source_code_sources) < 1:
+ # create a new main source
+ self.add_source(type=self.source_type, comps=["%s" % comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ def disable_component(self, comp):
+ """
+ Disable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+ """
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ sources.extend(self.source_code_sources)
+ if comp in self.cdrom_comps:
+ sources = []
+ sources.extend(self.main_sources)
+ for source in sources:
+ if comp in source.comps:
+ comps = source.comps
+ comps.remove(comp)
+ source.comps = comps
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ def change_server(self, uri):
+ """Change the server of all distro specific sources to
+ a given host"""
+
+ def change_server_of_source(source, uri, seen):
+ # Avoid creating duplicate entries
+ source.uri = uri
+ for comp in source.comps:
+ if [source.uri, source.dist, comp] in seen:
+ source.comps.remove(comp)
+ else:
+ seen.append([source.uri, source.dist, comp])
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ seen_binary = []
+ seen_source = []
+ self.default_server = uri
+ for source in self.main_sources:
+ change_server_of_source(source, uri, seen_binary)
+ for source in self.child_sources:
+ # Do not change the forces server of a child source
+ if (
+ source.template.base_uri is None
+ or source.template.base_uri != source.uri
+ ):
+ change_server_of_source(source, uri, seen_binary)
+ for source in self.source_code_sources:
+ change_server_of_source(source, uri, seen_source)
+
+ def is_codename(self, name):
+ """Compare a given name with the release codename."""
+ if name == self.codename:
+ return True
+ else:
+ return False
+
+
+class DebianDistribution(Distribution):
+ """Class to support specific Debian features"""
+
+ def is_codename(self, name):
+ """Compare a given name with the release codename and check if
+ if it can be used as a synonym for a development releases"""
+ if name == self.codename or self.release in ("testing", "unstable"):
+ return True
+ else:
+ return False
+
+ def _get_mirror_name(self, server):
+ """Try to get a human readable name for the main mirror of a country
+ Debian specific"""
+ country = None
+ i = server.find("://ftp.")
+ li = server.find(".debian.org")
+ if i != -1 and li != -1:
+ country = server[i + len("://ftp.") : li]
+ if country in self.countries:
+ # TRANSLATORS: %s is a country
+ return (
+ _("Server for %s")
+ % gettext.dgettext(
+ "iso_3166", self.countries[country].rstrip()
+ ).rstrip()
+ )
+ else:
+ return "%s" % server.rstrip("/ ")
+
+ def get_mirrors(self):
+ Distribution.get_mirrors(
+ self, mirror_template="http://ftp.%s.debian.org/debian/"
+ )
+
+
+class UbuntuDistribution(Distribution):
+ """Class to support specific Ubuntu features"""
+
+ def get_mirrors(self):
+ Distribution.get_mirrors(
+ self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/"
+ )
+
+
+class UbuntuRTMDistribution(UbuntuDistribution):
+ """Class to support specific Ubuntu RTM features"""
+
+ def get_mirrors(self):
+ self.main_server = self.source_template.base_uri
+
+
+def _lsb_release():
+ """Call lsb_release --idrc and return a mapping."""
+ import errno
+ from subprocess import PIPE, Popen
+
+ result = {
+ "Codename": "sid",
+ "Distributor ID": "Debian",
+ "Description": "Debian GNU/Linux unstable (sid)",
+ "Release": "unstable",
+ }
+ try:
+ out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0]
+ # Convert to unicode string, needed for Python 3.1
+ out = out.decode("utf-8")
+ result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ logging.warning("lsb_release failed, using defaults:" % exc)
+ return result
+
+
+def _system_image_channel():
+ """Get the current channel from system-image-cli -i if possible."""
+ import errno
+ from subprocess import DEVNULL, PIPE, Popen
+
+ try:
+ out = Popen(
+ ["system-image-cli", "-i"],
+ stdout=PIPE,
+ stderr=DEVNULL,
+ universal_newlines=True,
+ ).communicate()[0]
+ for line in out.splitlines():
+ if line.startswith("channel: "):
+ return line.split(": ", 1)[1]
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ logging.warning("system-image-cli failed, using defaults: %s" % exc)
+ return None
+
+
+class _OSRelease:
+ DEFAULT_OS_RELEASE_FILE = "/etc/os-release"
+ OS_RELEASE_FILE = "/etc/os-release"
+
+ def __init__(self, lsb_compat=True):
+ self.result = {}
+ self.valid = False
+ self.file = _OSRelease.OS_RELEASE_FILE
+
+ if not os.path.isfile(self.file):
+ return
+
+ self.parse()
+ self.valid = True
+
+ if lsb_compat:
+ self.inject_lsb_compat()
+
+ def inject_lsb_compat(self):
+ self.result["Distributor ID"] = self.result["ID"]
+ self.result["Description"] = self.result["PRETTY_NAME"]
+ # Optionals as per os-release spec.
+ self.result["Codename"] = self.result.get("VERSION_CODENAME")
+ if not self.result["Codename"]:
+ # Transient Ubuntu 16.04 field (LP: #1598212)
+ self.result["Codename"] = self.result.get("UBUNTU_CODENAME")
+ self.result["Release"] = self.result.get("VERSION_ID")
+
+ def parse(self):
+ f = open(self.file)
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ self.parse_entry(*line.split("=", 1))
+ f.close()
+
+ def parse_entry(self, key, value):
+ value = self.parse_value(value) # Values can be shell strings...
+ if key == "ID_LIKE" and isinstance(value, str):
+ # ID_LIKE is specified as quoted space-separated list. This will
+ # be parsed as string that we need to split manually.
+ value = value.split(" ")
+ self.result[key] = value
+
+ def parse_value(self, value):
+ values = shlex.split(value)
+ if len(values) == 1:
+ return values[0]
+ return values
+
+
+def get_distro(id=None, codename=None, description=None, release=None, is_like=[]):
+ """
+ Check the currently used distribution and return the corresponding
+ distriubtion class that supports distro specific features.
+
+ If no paramter are given the distro will be auto detected via
+ a call to lsb-release
+ """
+ # make testing easier
+ if not (id and codename and description and release):
+ if id or codename or description or release:
+ warnings.warn(
+ "Provided only a subset of arguments", DeprecationWarning, stacklevel=2
+ )
+ os_release = _OSRelease()
+ os_result = []
+ lsb_result = _lsb_release()
+ if os_release.valid:
+ os_result = os_release.result
+ # TODO: We cannot presently use os-release to fully replace lsb_release
+ # because os-release's ID, VERSION_ID and VERSION_CODENAME fields
+ # are specified as lowercase. In lsb_release they can be upcase
+ # or captizalized. So, switching to os-release would consitute
+ # a behavior break a which point lsb_release support should be
+ # fully removed.
+ # This in particular is a problem for template matching, as this
+ # matches against Distribution objects and depends on string
+ # case.
+ lsb_result = _lsb_release()
+ id = lsb_result["Distributor ID"]
+ codename = lsb_result["Codename"]
+ description = lsb_result["Description"]
+ release = lsb_result["Release"]
+ # Not available with LSB, use get directly.
+ is_like = os_result.get("ID_LIKE", [])
+ if id == "Ubuntu":
+ channel = _system_image_channel()
+ if channel is not None and "ubuntu-rtm/" in channel:
+ id = "Ubuntu-RTM"
+ codename = channel.rsplit("/", 1)[1].split("-", 1)[0]
+ description = codename
+ release = codename
+ if id == "Ubuntu":
+ return UbuntuDistribution(id, codename, description, release, is_like)
+ if id == "Ubuntu-RTM":
+ return UbuntuRTMDistribution(id, codename, description, release, is_like)
+ elif id == "Debian":
+ return DebianDistribution(id, codename, description, release, is_like)
+ else:
+ return Distribution(id, codename, description, release, is_like)
diff --git a/aptsources/sourceslist.py b/aptsources/sourceslist.py
new file mode 100644
index 0000000..b227690
--- /dev/null
+++ b/aptsources/sourceslist.py
@@ -0,0 +1,1083 @@
+# sourceslist.py - Provide an abstraction of the sources.list
+#
+# Copyright (c) 2004-2023 Canonical Ltd.
+# Copyright (c) 2004 Michiel Sikkes
+# Copyright (c) 2006-2007 Sebastian Heinlein
+#
+# Authors: Michiel Sikkes <michiel@eyesopened.nl>
+# Michael Vogt <mvo@debian.org>
+# Sebastian Heinlein <glatzor@ubuntu.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+import builtins
+import glob
+import io
+import logging
+import os.path
+import re
+import shutil
+import time
+import weakref
+from collections.abc import Callable, Iterable, Iterator
+from typing import Any, Generic, Optional, TypeVar, Union
+
+import apt_pkg
+
+from . import _deb822
+from .distinfo import DistInfo, Template
+
+# from apt_pkg import gettext as _
+
+T = TypeVar("T")
+
+# some global helpers
+
+__all__ = [
+ "is_mirror",
+ "Deb822SourceEntry",
+ "SourceEntry",
+ "NullMatcher",
+ "SourcesList",
+ "SourceEntryMatcher",
+]
+
+
+def is_mirror(master_uri: str, compare_uri: str) -> bool:
+ """check if the given add_url is idential or a mirror of orig_uri e.g.:
+ master_uri = archive.ubuntu.com
+ compare_uri = de.archive.ubuntu.com
+ -> True
+ """
+ # remove traling spaces and "/"
+ compare_uri = compare_uri.rstrip("/ ")
+ master_uri = master_uri.rstrip("/ ")
+ # uri is identical
+ if compare_uri == master_uri:
+ # print "Identical"
+ return True
+ # add uri is a master site and orig_uri has the from "XX.mastersite"
+ # (e.g. de.archive.ubuntu.com)
+ try:
+ compare_srv = compare_uri.split("//")[1]
+ master_srv = master_uri.split("//")[1]
+ # print "%s == %s " % (add_srv, orig_srv)
+ except IndexError: # ok, somethings wrong here
+ # print "IndexError"
+ return False
+ # remove the leading "<country>." (if any) and see if that helps
+ if "." in compare_srv and compare_srv[compare_srv.index(".") + 1 :] == master_srv:
+ # print "Mirror"
+ return True
+ return False
+
+
+def uniq(s: Iterable[T]) -> list[T]:
+ """simple and efficient way to return uniq collection
+
+ This is not intended for use with a SourceList. It is provided
+ for internal use only. It does not have a leading underscore to
+ not break any old code that uses it; but it should not be used
+ in new code (and is not listed in __all__)."""
+ return list(set(s))
+
+
+class SingleValueProperty(property):
+ def __init__(self, key: str, doc: str):
+ self.key = key
+ self.__doc__ = doc
+
+ def __get__(
+ self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None
+ ) -> str | None:
+ if obj is None:
+ return self # type: ignore
+ return obj.section.get(self.key, None)
+
+ def __set__(self, obj: "Deb822SourceEntry", value: str | None) -> None:
+ if value is None:
+ del obj.section[self.key]
+ else:
+ obj.section[self.key] = value
+
+
+class MultiValueProperty(property):
+ def __init__(self, key: str, doc: str):
+ self.key = key
+ self.__doc__ = doc
+
+ def __get__(
+ self, obj: Optional["Deb822SourceEntry"], objtype: type | None = None
+ ) -> list[str]:
+ if obj is None:
+ return self # type: ignore
+ return SourceEntry.mysplit(obj.section.get(self.key, ""))
+
+ def __set__(self, obj: "Deb822SourceEntry", values: list[str]) -> None:
+ obj.section[self.key] = " ".join(values)
+
+
+class ExplodedEntryProperty(property, Generic[T]):
+ def __init__(self, parent: T):
+ self.parent = parent
+
+ def __get__(
+ self, obj: Optional["ExplodedDeb822SourceEntry"], objtype: type | None = None
+ ) -> T:
+ if obj is None:
+ return self # type: ignore
+ return self.parent.__get__(obj.parent) # type: ignore
+
+ def __set__(self, obj: "ExplodedDeb822SourceEntry", value: T) -> None:
+ obj.split_out()
+ self.parent.__set__(obj.parent, value) # type: ignore
+
+
+def DeprecatedProperty(prop: T) -> T:
+ return prop
+
+
+def _null_weakref() -> None:
+ """Behaves like an expired weakref.ref, returning None"""
+ return None
+
+
+class Deb822SourceEntry:
+ def __init__(
+ self,
+ section: _deb822.Section | str | None,
+ file: str,
+ list: Optional["SourcesList"] = None,
+ ):
+ if section is None:
+ self.section = _deb822.Section("")
+ elif isinstance(section, str):
+ self.section = _deb822.Section(section)
+ else:
+ self.section = section
+
+ self._line = str(self.section)
+ self.file = file
+ self.template: Template | None = None # type DistInfo.Suite
+ self.may_merge = False
+ self._children = weakref.WeakSet["ExplodedDeb822SourceEntry"]()
+
+ if list:
+ self._list: Callable[[], SourcesList | None] = weakref.ref(list)
+ else:
+ self._list = _null_weakref
+
+ def __eq__(self, other: Any) -> Any:
+ # FIXME: Implement plurals more correctly
+ """equal operator for two sources.list entries"""
+ return (
+ self.disabled == other.disabled
+ and self.type == other.type
+ and self.uri
+ and self.uri.rstrip("/") == other.uri.rstrip("/")
+ and self.dist == other.dist
+ and self.comps == other.comps
+ )
+
+ architectures = MultiValueProperty("Architectures", "The list of architectures")
+ types = MultiValueProperty("Types", "The list of types")
+ type = DeprecatedProperty(SingleValueProperty("Types", "The list of types"))
+ uris = MultiValueProperty("URIs", "URIs in the source")
+ uri = DeprecatedProperty(SingleValueProperty("URIs", "URIs in the source"))
+ suites = MultiValueProperty("Suites", "Suites in the source")
+ dist = DeprecatedProperty(SingleValueProperty("Suites", "Suites in the source"))
+ comps = MultiValueProperty("Components", "Components in the source")
+
+ @property
+ def comment(self) -> str:
+ """Legacy attribute describing the paragraph header."""
+ return self.section.header
+
+ @comment.setter
+ def comment(self, comment: str) -> None:
+ """Legacy attribute describing the paragraph header."""
+ self.section.header = comment
+
+ @property
+ def trusted(self) -> bool | None:
+ """Return the value of the Trusted field"""
+ try:
+ return apt_pkg.string_to_bool(self.section["Trusted"])
+ except KeyError:
+ return None
+
+ @trusted.setter
+ def trusted(self, value: bool | None) -> None:
+ if value is None:
+ try:
+ del self.section["Trusted"]
+ except KeyError:
+ pass
+ else:
+ self.section["Trusted"] = "yes" if value else "no"
+
+ @property
+ def disabled(self) -> bool:
+ """Check if Enabled: no is set."""
+ return not apt_pkg.string_to_bool(self.section.get("Enabled", "yes"))
+
+ @disabled.setter
+ def disabled(self, value: bool) -> None:
+ if value:
+ self.section["Enabled"] = "no"
+ else:
+ try:
+ del self.section["Enabled"]
+ except KeyError:
+ pass
+
+ @property
+ def invalid(self) -> bool:
+ """A section is invalid if it doesn't have proper entries."""
+ return not self.section
+
+ @property
+ def line(self) -> str:
+ """The entire (original) paragraph."""
+ return self._line
+
+ def __str__(self) -> str:
+ return self.str().strip()
+
+ def str(self) -> str:
+ """Section as a string, newline terminated."""
+ return str(self.section)
+
+ def set_enabled(self, enabled: bool) -> None:
+ """Deprecated (for deb822) accessor for .disabled"""
+ self.disabled = not enabled
+
+ def merge(self, other: "AnySourceEntry") -> bool:
+ """Merge the two entries if they are compatible."""
+ if (
+ not self.may_merge
+ and self.template is None
+ and not all(child.template for child in self._children)
+ ):
+ return False
+ if self.file != other.file:
+ return False
+ if not isinstance(other, Deb822SourceEntry):
+ return False
+ if self.comment != other.comment and not any(
+ "Added by software-properties" in c for c in (self.comment, other.comment)
+ ):
+ return False
+
+ for tag in set(list(self.section.tags) + list(other.section.tags)):
+ if tag.lower() in (
+ "types",
+ "uris",
+ "suites",
+ "components",
+ "architectures",
+ "signed-by",
+ ):
+ continue
+ in_self = self.section.get(tag, None)
+ in_other = other.section.get(tag, None)
+ if in_self != in_other:
+ return False
+
+ if (
+ sum(
+ [
+ set(self.types) != set(other.types),
+ set(self.uris) != set(other.uris),
+ set(self.suites) != set(other.suites),
+ set(self.comps) != set(other.comps),
+ set(self.architectures) != set(other.architectures),
+ ]
+ )
+ > 1
+ ):
+ return False
+
+ for typ in other.types:
+ if typ not in self.types:
+ self.types += [typ]
+
+ for uri in other.uris:
+ if uri not in self.uris:
+ self.uris += [uri]
+
+ for suite in other.suites:
+ if suite not in self.suites:
+ self.suites += [suite]
+
+ for component in other.comps:
+ if component not in self.comps:
+ self.comps += [component]
+
+ for arch in other.architectures:
+ if arch not in self.architectures:
+ self.architectures += [arch]
+
+ return True
+
+ def _reparent_children(self, to: "Deb822SourceEntry") -> None:
+ """If we end up being split, check if any of our children need to be reparented to the new parent."""
+ for child in self._children:
+ for typ in to.types:
+ for uri in to.uris:
+ for suite in to.suites:
+ if (child._type, child._uri, child._suite) == (typ, uri, suite):
+ assert child.parent == self
+ child._parent = weakref.ref(to)
+
+
+class ExplodedDeb822SourceEntry:
+ """This represents a bit of a deb822 paragraph corresponding to a legacy sources.list entry"""
+
+ # Mostly we use slots to prevent accidentally assigning unproxied attributes
+ __slots__ = ["_parent", "_type", "_uri", "_suite", "template", "__weakref__"]
+
+ def __init__(self, parent: Deb822SourceEntry, typ: str, uri: str, suite: str):
+ self._parent = weakref.ref(parent)
+ self._type = typ
+ self._uri = uri
+ self._suite = suite
+ self.template = parent.template
+ parent._children.add(self)
+
+ @property
+ def parent(self) -> Deb822SourceEntry:
+ if self._parent is not None:
+ if (parent := self._parent()) is not None:
+ return parent
+ raise ValueError("The parent entry is no longer valid")
+
+ @property
+ def uri(self) -> str:
+ self.__check_valid()
+ return self._uri
+
+ @uri.setter
+ def uri(self, uri: str) -> None:
+ self.split_out()
+ self.parent.uris = [u if u != self._uri else uri for u in self.parent.uris]
+ self._uri = uri
+
+ @property
+ def types(self) -> list[str]:
+ return [self.type]
+
+ @property
+ def suites(self) -> list[str]:
+ return [self.dist]
+
+ @property
+ def uris(self) -> list[str]:
+ return [self.uri]
+
+ @property
+ def type(self) -> str:
+ self.__check_valid()
+ return self._type
+
+ @type.setter
+ def type(self, typ: str) -> None:
+ self.split_out()
+ self.parent.types = [typ]
+ self._type = typ
+ self.__check_valid()
+ assert self._type == typ
+ assert self.parent.types == [self._type]
+
+ @property
+ def dist(self) -> str:
+ self.__check_valid()
+ return self._suite
+
+ @dist.setter
+ def dist(self, suite: str) -> None:
+ self.split_out()
+ self.parent.suites = [suite]
+ self._suite = suite
+ self.__check_valid()
+ assert self._suite == suite
+ assert self.parent.suites == [self._suite]
+
+ def __check_valid(self) -> None:
+ if self.parent._list() is None:
+ raise ValueError("The parent entry is dead")
+ for type in self.parent.types:
+ for uri in self.parent.uris:
+ for suite in self.parent.suites:
+ if (type, uri, suite) == (self._type, self._uri, self._suite):
+ return
+ raise ValueError(f"Could not find parent of {self}")
+
+ def split_out(self) -> None:
+ parent = self.parent
+ if (parent.types, parent.uris, parent.suites) == (
+ [self._type],
+ [self._uri],
+ [self._suite],
+ ):
+ return
+ sources_list = parent._list()
+ if sources_list is None:
+ raise ValueError("The parent entry is dead")
+
+ try:
+ index = sources_list.list.index(parent)
+ except ValueError as e:
+ raise ValueError(
+ f"Parent entry for partial deb822 {self} no longer valid"
+ ) from e
+
+ sources_list.remove(parent)
+
+ reparented = False
+ for type in reversed(parent.types):
+ for uri in reversed(parent.uris):
+ for suite in reversed(parent.suites):
+ new = Deb822SourceEntry(
+ section=_deb822.Section(parent.section),
+ file=parent.file,
+ list=sources_list,
+ )
+ new.types = [type]
+ new.uris = [uri]
+ new.suites = [suite]
+ new.may_merge = True
+
+ parent._reparent_children(new)
+ sources_list.list.insert(index, new)
+ if (type, uri, suite) == (self._type, self._uri, self._suite):
+ self._parent = weakref.ref(new)
+ reparented = True
+ if not reparented:
+ raise ValueError(f"Could not find parent of {self}")
+
+ def __repr__(self) -> str:
+ return f"<child {self._type} {self._uri} {self._suite} of {self._parent}"
+
+ architectures = ExplodedEntryProperty(Deb822SourceEntry.architectures)
+ comps = ExplodedEntryProperty(Deb822SourceEntry.comps)
+ invalid = ExplodedEntryProperty(Deb822SourceEntry.invalid)
+ disabled = ExplodedEntryProperty[bool](Deb822SourceEntry.disabled) # type: ignore
+ trusted = ExplodedEntryProperty(Deb822SourceEntry.trusted)
+ comment = ExplodedEntryProperty(Deb822SourceEntry.comment)
+
+ def set_enabled(self, enabled: bool) -> None:
+ """Set the source to enabled."""
+ self.disabled = not enabled
+
+ @property
+ def file(self) -> str:
+ """Return the file."""
+ return self.parent.file
+
+
+class SourceEntry:
+ """single sources.list entry"""
+
+ def __init__(self, line: str, file: str | None = None):
+ self.invalid = False # is the source entry valid
+ self.disabled = False # is it disabled ('#' in front)
+ self.type = "" # what type (deb, deb-src)
+ self.architectures: list[str] = [] # architectures
+ self.trusted: bool | None = None # Trusted
+ self.uri = "" # base-uri
+ self.dist = "" # distribution (dapper, edgy, etc)
+ self.comps: list[str] = [] # list of available componetns (may empty)
+ self.comment = "" # (optional) comment
+ self.line = line # the original sources.list line
+ if file is None:
+ file = apt_pkg.config.find_file("Dir::Etc::sourcelist")
+ if file.endswith(".sources"):
+ raise ValueError("Classic SourceEntry cannot be written to .sources file")
+ self.file = file # the file that the entry is located in
+ self.parse(line)
+ self.template: Template | None = None # type DistInfo.Suite
+ self.children: list[SourceEntry] = []
+
+ def __eq__(self, other: Any) -> Any:
+ """equal operator for two sources.list entries"""
+ return (
+ self.disabled == other.disabled
+ and self.type == other.type
+ and self.uri.rstrip("/") == other.uri.rstrip("/")
+ and self.dist == other.dist
+ and self.comps == other.comps
+ )
+
+ @staticmethod
+ def mysplit(line: str) -> list[str]:
+ """a split() implementation that understands the sources.list
+ format better and takes [] into account (for e.g. cdroms)"""
+ line = line.strip()
+ pieces = []
+ tmp = ""
+ # we are inside a [..] block
+ p_found = False
+ space_found = False
+ for i in range(len(line)):
+ if line[i] == "[":
+ if space_found:
+ space_found = False
+ p_found = True
+ pieces.append(tmp)
+ tmp = line[i]
+ else:
+ p_found = True
+ tmp += line[i]
+ elif line[i] == "]":
+ p_found = False
+ tmp += line[i]
+ elif space_found and not line[i].isspace():
+ # we skip one or more space
+ space_found = False
+ pieces.append(tmp)
+ tmp = line[i]
+ elif line[i].isspace() and not p_found:
+ # found a whitespace
+ space_found = True
+ else:
+ tmp += line[i]
+ # append last piece
+ if len(tmp) > 0:
+ pieces.append(tmp)
+ return pieces
+
+ def parse(self, line: str) -> None:
+ """parse a given sources.list (textual) line and break it up
+ into the field we have"""
+ self.line = line
+ line = line.strip()
+ # check if the source is enabled/disabled
+ if line == "" or line == "#": # empty line
+ self.invalid = True
+ return
+ if line[0] == "#":
+ self.disabled = True
+ pieces = line[1:].strip().split()
+ # if it looks not like a disabled deb line return
+ if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"):
+ self.invalid = True
+ return
+ else:
+ line = line[1:]
+ # check for another "#" in the line (this is treated as a comment)
+ i = line.find("#")
+ if i > 0:
+ self.comment = line[i + 1 :]
+ line = line[:i]
+ # source is ok, split it and see what we have
+ pieces = self.mysplit(line)
+ # Sanity check
+ if len(pieces) < 3:
+ self.invalid = True
+ return
+ # Type, deb or deb-src
+ self.type = pieces[0].strip()
+ # Sanity check
+ if self.type not in ("deb", "deb-src", "rpm", "rpm-src"):
+ self.invalid = True
+ return
+
+ if pieces[1].strip()[0] == "[":
+ options = pieces.pop(1).strip("[]").split()
+ for option in options:
+ try:
+ key, value = option.split("=", 1)
+ except Exception:
+ self.invalid = True
+ else:
+ if key == "arch":
+ self.architectures = value.split(",")
+ elif key == "trusted":
+ self.trusted = apt_pkg.string_to_bool(value)
+ else:
+ self.invalid = True
+
+ # URI
+ self.uri = pieces[1].strip()
+ if len(self.uri) < 1:
+ self.invalid = True
+ # distro and components (optional)
+ # Directory or distro
+ self.dist = pieces[2].strip()
+ if len(pieces) > 3:
+ # List of components
+ self.comps = pieces[3:]
+ else:
+ self.comps = []
+
+ def set_enabled(self, new_value: bool) -> None:
+ """set a line to enabled or disabled"""
+ self.disabled = not new_value
+ # enable, remove all "#" from the start of the line
+ if new_value:
+ self.line = self.line.lstrip().lstrip("#")
+ else:
+ # disabled, add a "#"
+ if self.line.strip()[0] != "#":
+ self.line = "#" + self.line
+
+ def __str__(self) -> str:
+ """debug helper"""
+ return self.str().strip()
+
+ def str(self) -> str:
+ """return the current line as string"""
+ if self.invalid:
+ return self.line
+ line = ""
+ if self.disabled:
+ line = "# "
+
+ line += self.type
+
+ if self.architectures and self.trusted is not None:
+ line += " [arch={} trusted={}]".format(
+ ",".join(self.architectures),
+ "yes" if self.trusted else "no",
+ )
+ elif self.trusted is not None:
+ line += " [trusted=%s]" % ("yes" if self.trusted else "no")
+ elif self.architectures:
+ line += " [arch=%s]" % ",".join(self.architectures)
+ line += f" {self.uri} {self.dist}"
+ if len(self.comps) > 0:
+ line += " " + " ".join(self.comps)
+ if self.comment != "":
+ line += " #" + self.comment
+ line += "\n"
+ return line
+
+ @property
+ def types(self) -> list[builtins.str]:
+ """deb822 compatible accessor for the type"""
+ return [self.type]
+
+ @property
+ def uris(self) -> list[builtins.str]:
+ """deb822 compatible accessor for the uri"""
+ return [self.uri]
+
+ @property
+ def suites(self) -> list[builtins.str]:
+ """deb822 compatible accessor for the suite"""
+ return [self.dist]
+
+
+AnySourceEntry = Union[SourceEntry, Deb822SourceEntry]
+AnyExplodedSourceEntry = Union[
+ SourceEntry, Deb822SourceEntry, ExplodedDeb822SourceEntry
+]
+
+
+class NullMatcher:
+ """a Matcher that does nothing"""
+
+ def match(self, s: AnyExplodedSourceEntry) -> bool:
+ return True
+
+
+class SourcesList:
+ """represents the full sources.list + sources.list.d file"""
+
+ def __init__(
+ self,
+ withMatcher: bool = True,
+ matcherPath: str = "/usr/share/python-apt/templates/",
+ *,
+ deb822: bool = False,
+ ):
+ self.list: list[AnySourceEntry] = [] # the actual SourceEntries Type
+ self.matcher: NullMatcher | SourceEntryMatcher
+ if withMatcher:
+ self.matcher = SourceEntryMatcher(matcherPath)
+ else:
+ self.matcher = NullMatcher()
+ self.deb822 = deb822
+ self.refresh()
+
+ def refresh(self) -> None:
+ """update the list of known entries"""
+ self.list = []
+ # read sources.list
+ file = apt_pkg.config.find_file("Dir::Etc::sourcelist")
+ if file != "/dev/null" and os.path.exists(file):
+ self.load(file)
+ # read sources.list.d
+ partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts")
+ if partsdir != "/dev/null" and os.path.exists(partsdir):
+ for file in os.listdir(partsdir):
+ if (self.deb822 and file.endswith(".sources")) or file.endswith(
+ ".list"
+ ):
+ self.load(os.path.join(partsdir, file))
+ # check if the source item fits a predefined template
+ for source in self.list:
+ if not source.invalid:
+ self.matcher.match(source)
+
+ def __iter__(self) -> Iterator[AnySourceEntry]:
+ """simple iterator to go over self.list, returns SourceEntry
+ types"""
+ yield from self.list
+
+ def __find(
+ self, *predicates: Callable[[AnyExplodedSourceEntry], bool], **attrs: Any
+ ) -> Iterator[AnyExplodedSourceEntry]:
+ uri = attrs.pop("uri", None)
+ for source in self.exploded_list():
+ if uri and source.uri and uri.rstrip("/") != source.uri.rstrip("/"):
+ continue
+ if all(getattr(source, key) == attrs[key] for key in attrs) and all(
+ predicate(source) for predicate in predicates
+ ):
+ yield source
+
+ def add(
+ self,
+ type: str,
+ uri: str,
+ dist: str,
+ orig_comps: list[str],
+ comment: str = "",
+ pos: int = -1,
+ file: str | None = None,
+ architectures: Iterable[str] = [],
+ parent: AnyExplodedSourceEntry | None = None,
+ ) -> AnyExplodedSourceEntry:
+ """
+ Add a new source to the sources.list.
+ The method will search for existing matching repos and will try to
+ reuse them as far as possible
+ """
+
+ type = type.strip()
+ disabled = type.startswith("#")
+ if disabled:
+ type = type[1:].lstrip()
+ architectures = set(architectures)
+ # create a working copy of the component list so that
+ # we can modify it later
+ comps = orig_comps[:]
+ sources = self.__find(
+ lambda s: set(s.architectures) == architectures,
+ disabled=disabled,
+ invalid=False,
+ type=type,
+ uri=uri,
+ dist=dist,
+ )
+ # check if we have this source already in the sources.list
+ for source in sources:
+ for new_comp in comps:
+ if new_comp in source.comps:
+ # we have this component already, delete it
+ # from the new_comps list
+ del comps[comps.index(new_comp)]
+ if len(comps) == 0:
+ return source
+
+ sources = self.__find(
+ lambda s: set(s.architectures) == architectures,
+ invalid=False,
+ type=type,
+ uri=uri,
+ dist=dist,
+ )
+ for source in sources:
+ if source.disabled == disabled:
+ # if there is a repo with the same (disabled, type, uri, dist)
+ # just add the components
+ if set(source.comps) != set(comps):
+ source.comps = uniq(source.comps + comps)
+ return source
+ elif source.disabled and not disabled:
+ # enable any matching (type, uri, dist), but disabled repo
+ if set(source.comps) == set(comps):
+ source.disabled = False
+ return source
+
+ new_entry: AnySourceEntry
+ if file is None:
+ file = apt_pkg.config.find_file("Dir::Etc::sourcelist")
+ if file.endswith(".sources"):
+ new_entry = Deb822SourceEntry(None, file=file, list=self)
+ if parent:
+ parent = getattr(parent, "parent", parent)
+ assert isinstance(parent, Deb822SourceEntry)
+ for k in parent.section.tags:
+ new_entry.section[k] = parent.section[k]
+ new_entry.types = [type]
+ new_entry.uris = [uri]
+ new_entry.suites = [dist]
+ new_entry.comps = comps
+ if architectures:
+ new_entry.architectures = list(architectures)
+ new_entry.section.header = comment
+ new_entry.disabled = disabled
+ else:
+ # there isn't any matching source, so create a new line and parse it
+ parts = [
+ "#" if disabled else "",
+ type,
+ ("[arch=%s]" % ",".join(architectures)) if architectures else "",
+ uri,
+ dist,
+ ]
+ parts.extend(comps)
+ if comment:
+ parts.append("#" + comment)
+ line = " ".join(part for part in parts if part) + "\n"
+
+ new_entry = SourceEntry(line)
+ if file is not None:
+ new_entry.file = file
+
+ self.matcher.match(new_entry)
+ if pos < 0:
+ self.list.append(new_entry)
+ else:
+ self.list.insert(pos, new_entry)
+ return new_entry
+
+ def remove(self, source_entry: AnyExplodedSourceEntry) -> None:
+ """remove the specified entry from the sources.list"""
+ if isinstance(source_entry, ExplodedDeb822SourceEntry):
+ source_entry.split_out()
+ source_entry = source_entry.parent
+ self.list.remove(source_entry)
+
+ def restore_backup(self, backup_ext: str) -> None:
+ "restore sources.list files based on the backup extension"
+ file = apt_pkg.config.find_file("Dir::Etc::sourcelist")
+ if os.path.exists(file + backup_ext) and os.path.exists(file):
+ shutil.copy(file + backup_ext, file)
+ # now sources.list.d
+ partsdir = apt_pkg.config.find_dir("Dir::Etc::sourceparts")
+ for file in glob.glob("%s/*" % partsdir):
+ if os.path.exists(file + backup_ext):
+ shutil.copy(file + backup_ext, file)
+
+ def backup(self, backup_ext: str | None = None) -> str:
+ """make a backup of the current source files, if no backup extension
+ is given, the current date/time is used (and returned)"""
+ already_backuped: Iterable[str] = set()
+ if backup_ext is None:
+ backup_ext = time.strftime("%y%m%d.%H%M")
+ for source in self.list:
+ if source.file not in already_backuped and os.path.exists(source.file):
+ shutil.copy(source.file, f"{source.file}{backup_ext}")
+ return backup_ext
+
+ def load(self, file: str) -> None:
+ """(re)load the current sources"""
+ try:
+ with open(file) as f:
+ if file.endswith(".sources"):
+ for section in _deb822.File(f):
+ self.list.append(Deb822SourceEntry(section, file, list=self))
+ else:
+ for line in f:
+ source = SourceEntry(line, file)
+ self.list.append(source)
+ except Exception as exc:
+ logging.warning(f"could not open file '{file}': {exc}\n")
+
+ def index(self, entry: AnyExplodedSourceEntry) -> int:
+ if isinstance(entry, ExplodedDeb822SourceEntry):
+ return self.list.index(entry.parent)
+ return self.list.index(entry)
+
+ def merge(self) -> None:
+ """Merge consecutive entries that have been split back together."""
+ merged = True
+ while merged:
+ i = 0
+ merged = False
+ while i + 1 < len(self.list):
+ entry = self.list[i]
+ if isinstance(entry, Deb822SourceEntry):
+ j = i + 1
+ while j < len(self.list):
+ if entry.merge(self.list[j]):
+ del self.list[j]
+ merged = True
+ else:
+ j += 1
+ i += 1
+
+ def save(self) -> None:
+ """save the current sources"""
+ files: dict[str, io.TextIOWrapper] = {}
+ # write an empty default config file if there aren't any sources
+ if len(self.list) == 0:
+ path = apt_pkg.config.find_file("Dir::Etc::sourcelist")
+ header = (
+ "## See sources.list(5) for more information, especialy\n"
+ "# Remember that you can only use http, ftp or file URIs\n"
+ "# CDROMs are managed through the apt-cdrom tool.\n"
+ )
+
+ with open(path, "w") as f:
+ f.write(header)
+ return
+
+ self.merge()
+ try:
+ for source in self.list:
+ if source.file not in files:
+ files[source.file] = open(source.file, "w")
+ elif isinstance(source, Deb822SourceEntry):
+ files[source.file].write("\n")
+ files[source.file].write(source.str())
+ finally:
+ for f in files.values():
+ f.close()
+
+ def check_for_relations(
+ self, sources_list: Iterable[AnySourceEntry]
+ ) -> tuple[list[AnySourceEntry], dict[Template, list[AnySourceEntry]]]:
+ """get all parent and child channels in the sources list"""
+ parents = []
+ used_child_templates: dict[Template, list[AnySourceEntry]] = {}
+ for source in sources_list:
+ # try to avoid checking uninterressting sources
+ if source.template is None:
+ continue
+ # set up a dict with all used child templates and corresponding
+ # source entries
+ if source.template.child:
+ key = source.template
+ if key not in used_child_templates:
+ used_child_templates[key] = []
+ temp = used_child_templates[key]
+ temp.append(source)
+ else:
+ # store each source with children aka. a parent :)
+ if len(source.template.children) > 0:
+ parents.append(source)
+ # print self.used_child_templates
+ # print self.parents
+ return (parents, used_child_templates)
+
+ def exploded_list(self) -> list[AnyExplodedSourceEntry]:
+ """Present an exploded view of the list where each entry corresponds exactly to a Release file.
+
+ A release file is uniquely identified by the triplet (type, uri, suite). Old style entries
+ always referred to a single release file, but deb822 entries allow multiple values for each
+ of those fields.
+ """
+ res: list[AnyExplodedSourceEntry] = []
+ for entry in self.list:
+ if isinstance(entry, SourceEntry):
+ res.append(entry)
+ elif (
+ len(entry.types) == 1
+ and len(entry.uris) == 1
+ and len(entry.suites) == 1
+ ):
+ res.append(entry)
+ else:
+ for typ in entry.types:
+ for uri in entry.uris:
+ for sui in entry.suites:
+ res.append(ExplodedDeb822SourceEntry(entry, typ, uri, sui))
+ self.matcher.match(res[-1])
+
+ return res
+
+
+class SourceEntryMatcher:
+ """matcher class to make a source entry look nice
+ lots of predefined matchers to make it i18n/gettext friendly
+ """
+
+ def __init__(self, matcherPath: str):
+ self.templates: list[Template] = []
+ # Get the human readable channel and comp names from the channel .infos
+ spec_files = glob.glob("%s/*.info" % matcherPath)
+ for f in spec_files:
+ f = os.path.basename(f)
+ i = f.find(".info")
+ f = f[0:i]
+ dist = DistInfo(f, base_dir=matcherPath)
+ for template in dist.templates:
+ if template.match_uri is not None:
+ self.templates.append(template)
+ return
+
+ def match(self, source: AnyExplodedSourceEntry) -> bool:
+ """Add a matching template to the source"""
+ found = False
+ for template in self.templates:
+ if source.uri is None or source.dist is None:
+ continue
+ if (
+ template.match_uri is not None
+ and template.match_name is not None
+ and source.uri is not None
+ and source.dist is not None
+ and re.search(template.match_uri, source.uri)
+ and re.match(template.match_name, source.dist)
+ and
+ # deb is a valid fallback for deb-src (if that is not
+ # definied, see #760035
+ (source.type == template.type or template.type == "deb")
+ ):
+ found = True
+ source.template = template
+ break
+ elif (
+ template.is_mirror(source.uri)
+ and template.match_name is not None
+ and source.dist is not None
+ and re.match(template.match_name, source.dist)
+ ):
+ found = True
+ source.template = template
+ break
+ return found
+
+
+# some simple tests
+if __name__ == "__main__":
+ apt_pkg.init_config()
+ sources = SourcesList()
+
+ for entry in sources:
+ logging.info("entry %s" % entry.str())
+ # print entry.uri
+
+ mirror = is_mirror(
+ "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu/"
+ )
+ logging.info("is_mirror(): %s" % mirror)
+
+ logging.info(
+ is_mirror(
+ "http://archive.ubuntu.com/ubuntu", "http://de.archive.ubuntu.com/ubuntu/"
+ )
+ )
+ logging.info(
+ is_mirror(
+ "http://archive.ubuntu.com/ubuntu/", "http://de.archive.ubuntu.com/ubuntu"
+ )
+ )