summaryrefslogtreecommitdiffstats
path: root/aptsources/distro.py
diff options
context:
space:
mode:
Diffstat (limited to 'aptsources/distro.py')
-rw-r--r--aptsources/distro.py648
1 files changed, 648 insertions, 0 deletions
diff --git a/aptsources/distro.py b/aptsources/distro.py
new file mode 100644
index 0000000..546d0e7
--- /dev/null
+++ b/aptsources/distro.py
@@ -0,0 +1,648 @@
+# distro.py - Provide a distro abstraction of the sources.list
+#
+# Copyright (c) 2004-2009 Canonical Ltd.
+# Copyright (c) 2006-2007 Sebastian Heinlein
+# Copyright (c) 2016 Harald Sitter
+#
+# Authors: Sebastian Heinlein <glatzor@ubuntu.com>
+# Michael Vogt <mvo@debian.org>
+# Harald Sitter <sitter@kde.org>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+# USA
+
+import gettext
+import logging
+import os
+import re
+import shlex
+import warnings
+from xml.etree.ElementTree import ElementTree
+
+from apt_pkg import gettext as _
+
+
+class NoDistroTemplateException(Exception):
+ pass
+
+
+class Distribution:
+ def __init__(self, id, codename, description, release, is_like=[]):
+ """Container for distribution specific informations"""
+ # LSB information
+ self.id = id
+ self.codename = codename
+ self.description = description
+ self.release = release
+ self.is_like = is_like
+
+ self.binary_type = "deb"
+ self.source_type = "deb-src"
+
+ def get_sources(self, sourceslist):
+ """
+ Find the corresponding template, main and child sources
+ for the distribution
+ """
+
+ self.sourceslist = sourceslist
+ # corresponding sources
+ self.source_template = None
+ self.child_sources = []
+ self.main_sources = []
+ self.disabled_sources = []
+ self.cdrom_sources = []
+ self.download_comps = []
+ self.enabled_comps = []
+ self.cdrom_comps = []
+ self.used_media = []
+ self.get_source_code = False
+ self.source_code_sources = []
+
+ # location of the sources
+ self.default_server = ""
+ self.main_server = ""
+ self.nearest_server = ""
+ self.used_servers = []
+
+ # find the distro template
+ for template in self.sourceslist.matcher.templates:
+ if self.is_codename(template.name) and template.distribution == self.id:
+ # print "yeah! found a template for %s" % self.description
+ # print template.description, template.base_uri, \
+ # template.components
+ self.source_template = template
+ break
+ if self.source_template is None:
+ raise NoDistroTemplateException(
+ "Error: could not find a distribution template for %s/%s"
+ % (self.id, self.codename)
+ )
+
+ # find main and child sources
+ media = []
+ comps = []
+ cdrom_comps = []
+ enabled_comps = []
+ # source_code = []
+ for source in self.sourceslist.exploded_list():
+ if (
+ not source.invalid
+ and self.is_codename(source.dist)
+ and source.template
+ and source.template.official
+ and self.is_codename(source.template.name)
+ ):
+ # print "yeah! found a distro repo: %s" % source.line
+ # cdroms need do be handled differently
+ if source.uri.startswith("cdrom:") and not source.disabled:
+ self.cdrom_sources.append(source)
+ cdrom_comps.extend(source.comps)
+ elif source.uri.startswith("cdrom:") and source.disabled:
+ self.cdrom_sources.append(source)
+ elif source.type == self.binary_type and not source.disabled:
+ self.main_sources.append(source)
+ comps.extend(source.comps)
+ media.append(source.uri)
+ elif source.type == self.binary_type and source.disabled:
+ self.disabled_sources.append(source)
+ elif source.type == self.source_type and not source.disabled:
+ self.source_code_sources.append(source)
+ elif source.type == self.source_type and source.disabled:
+ self.disabled_sources.append(source)
+ if not source.invalid and source.template in self.source_template.children:
+ if not source.disabled and source.type == self.binary_type:
+ self.child_sources.append(source)
+ elif not source.disabled and source.type == self.source_type:
+ self.source_code_sources.append(source)
+ else:
+ self.disabled_sources.append(source)
+ self.download_comps = set(comps)
+ self.cdrom_comps = set(cdrom_comps)
+ enabled_comps.extend(comps)
+ enabled_comps.extend(cdrom_comps)
+ self.enabled_comps = set(enabled_comps)
+ self.used_media = set(media)
+ self.get_mirrors()
+
+ def get_mirrors(self, mirror_template=None):
+ """
+ Provide a set of mirrors where you can get the distribution from
+ """
+ # the main server is stored in the template
+ self.main_server = self.source_template.base_uri
+
+ # other used servers
+ for medium in self.used_media:
+ if not medium.startswith("cdrom:"):
+ # seems to be a network source
+ self.used_servers.append(medium)
+
+ if len(self.main_sources) == 0:
+ self.default_server = self.main_server
+ else:
+ self.default_server = self.main_sources[0].uri
+
+ # get a list of country codes and real names
+ self.countries = {}
+ fname = "/usr/share/xml/iso-codes/iso_3166.xml"
+ if os.path.exists(fname):
+ et = ElementTree(file=fname)
+ # python2.6 compat, the next two lines can get removed
+ # once we do not use py2.6 anymore
+ if getattr(et, "iter", None) is None:
+ et.iter = et.getiterator
+ it = et.iter("iso_3166_entry")
+ for elm in it:
+ try:
+ descr = elm.attrib["common_name"]
+ except KeyError:
+ descr = elm.attrib["name"]
+ try:
+ code = elm.attrib["alpha_2_code"]
+ except KeyError:
+ code = elm.attrib["alpha_3_code"]
+ self.countries[code.lower()] = gettext.dgettext("iso_3166", descr)
+
+ # try to guess the nearest mirror from the locale
+ self.country = None
+ self.country_code = None
+ locale = os.getenv("LANG", default="en_UK")
+ a = locale.find("_")
+ z = locale.find(".")
+ if z == -1:
+ z = len(locale)
+ country_code = locale[a + 1 : z].lower()
+
+ if mirror_template:
+ self.nearest_server = mirror_template % country_code
+
+ if country_code in self.countries:
+ self.country = self.countries[country_code]
+ self.country_code = country_code
+
+ def _get_mirror_name(self, server):
+ """Try to get a human readable name for the main mirror of a country
+ Customize for different distributions"""
+ country = None
+ i = server.find("://")
+ li = server.find(".archive.ubuntu.com")
+ if i != -1 and li != -1:
+ country = server[i + len("://") : li]
+ if country in self.countries:
+ # TRANSLATORS: %s is a country
+ return _("Server for %s") % self.countries[country]
+ else:
+ return "%s" % server.rstrip("/ ")
+
+ def get_server_list(self):
+ """Return a list of used and suggested servers"""
+
+ def compare_mirrors(mir1, mir2):
+ """Helper function that handles comaprision of mirror urls
+ that could contain trailing slashes"""
+ return re.match(mir1.strip("/ "), mir2.rstrip("/ "))
+
+ # Store all available servers:
+ # Name, URI, active
+ mirrors = []
+ if len(self.used_servers) < 1 or (
+ len(self.used_servers) == 1
+ and compare_mirrors(self.used_servers[0], self.main_server)
+ ):
+ mirrors.append([_("Main server"), self.main_server, True])
+ if self.nearest_server:
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ elif len(self.used_servers) == 1 and not compare_mirrors(
+ self.used_servers[0], self.main_server
+ ):
+ mirrors.append([_("Main server"), self.main_server, False])
+ # Only one server is used
+ server = self.used_servers[0]
+
+ # Append the nearest server if it's not already used
+ if self.nearest_server:
+ if not compare_mirrors(server, self.nearest_server):
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ if server:
+ mirrors.append([self._get_mirror_name(server), server, True])
+
+ elif len(self.used_servers) > 1:
+ # More than one server is used. Since we don't handle this case
+ # in the user interface we set "custom servers" to true and
+ # append a list of all used servers
+ mirrors.append([_("Main server"), self.main_server, False])
+ if self.nearest_server:
+ mirrors.append(
+ [
+ self._get_mirror_name(self.nearest_server),
+ self.nearest_server,
+ False,
+ ]
+ )
+ mirrors.append([_("Custom servers"), None, True])
+ for server in self.used_servers:
+ mirror_entry = [self._get_mirror_name(server), server, False]
+ if compare_mirrors(server, self.nearest_server) or compare_mirrors(
+ server, self.main_server
+ ):
+ continue
+ elif mirror_entry not in mirrors:
+ mirrors.append(mirror_entry)
+
+ return mirrors
+
+ def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""):
+ """
+ Add distribution specific sources
+ """
+ if uri is None:
+ # FIXME: Add support for the server selector
+ uri = self.default_server
+ if dist is None:
+ dist = self.codename
+ if comps is None:
+ comps = list(self.enabled_comps)
+ if type is None:
+ type = self.binary_type
+
+ parent = None
+ file = None
+ for parent in reversed(self.child_sources) or reversed(self.main_sources):
+ file = parent.file
+ break
+
+ new_source = self.sourceslist.add(
+ type, uri, dist, comps, comment, parent=parent, file=file
+ )
+ # if source code is enabled add a deb-src line after the new
+ # source
+ if self.get_source_code and type == self.binary_type:
+ self.sourceslist.add(
+ self.source_type,
+ uri,
+ dist,
+ comps,
+ comment,
+ file=new_source.file,
+ parent=new_source,
+ pos=self.sourceslist.list.index(new_source) + 1,
+ )
+
+ def enable_component(self, comp):
+ """
+ Enable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+
+ comp: the component that should be enabled
+ """
+ comps = list([comp])
+ # look for parent components that we may have to add
+ for source in self.main_sources:
+ for c in source.template.components:
+ if c.name == comp and c.parent_component:
+ if c.parent_component not in comps:
+ comps.append(c.parent_component)
+ for c in comps:
+ self._enable_component(c)
+
+ def _enable_component(self, comp):
+ def add_component_only_once(source, comps_per_dist):
+ """
+ Check if we already added the component to the repository, since
+ a repository could be splitted into different apt lines. If not
+ add the component
+ """
+ # if we don't have that distro, just return (can happen for e.g.
+ # dapper-update only in deb-src
+ if source.dist not in comps_per_dist:
+ return
+ # if we have seen this component already for this distro,
+ # return (nothing to do)
+ if comp in comps_per_dist[source.dist]:
+ return
+ # add it
+ source.comps = source.comps + [comp]
+ comps_per_dist[source.dist].add(comp)
+
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ # store what comps are enabled already per distro (where distro is
+ # e.g. "dapper", "dapper-updates")
+ comps_per_dist = {}
+ comps_per_sdist = {}
+ for s in sources:
+ if s.type == self.binary_type:
+ if s.dist not in comps_per_dist:
+ comps_per_dist[s.dist] = set()
+ for c in s.comps:
+ comps_per_dist[s.dist].add(c)
+ for s in self.source_code_sources:
+ if s.type == self.source_type:
+ if s.dist not in comps_per_sdist:
+ comps_per_sdist[s.dist] = set()
+ for c in s.comps:
+ comps_per_sdist[s.dist].add(c)
+
+ # check if there is a main source at all
+ if len(self.main_sources) < 1:
+ # create a new main source
+ self.add_source(comps=["%s" % comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in sources:
+ add_component_only_once(source, comps_per_dist)
+
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ # check if there is a main source code source at all
+ if self.get_source_code:
+ if len(self.source_code_sources) < 1:
+ # create a new main source
+ self.add_source(type=self.source_type, comps=["%s" % comp])
+ else:
+ # add the comp to all main, child and source code sources
+ for source in self.source_code_sources:
+ add_component_only_once(source, comps_per_sdist)
+
+ def disable_component(self, comp):
+ """
+ Disable a component in all main, child and source code sources
+ (excluding cdrom based sources)
+ """
+ sources = []
+ sources.extend(self.main_sources)
+ sources.extend(self.child_sources)
+ sources.extend(self.source_code_sources)
+ if comp in self.cdrom_comps:
+ sources = []
+ sources.extend(self.main_sources)
+ for source in sources:
+ if comp in source.comps:
+ comps = source.comps
+ comps.remove(comp)
+ source.comps = comps
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ def change_server(self, uri):
+ """Change the server of all distro specific sources to
+ a given host"""
+
+ def change_server_of_source(source, uri, seen):
+ # Avoid creating duplicate entries
+ source.uri = uri
+ for comp in source.comps:
+ if [source.uri, source.dist, comp] in seen:
+ source.comps.remove(comp)
+ else:
+ seen.append([source.uri, source.dist, comp])
+ if len(source.comps) < 1:
+ self.sourceslist.remove(source)
+
+ seen_binary = []
+ seen_source = []
+ self.default_server = uri
+ for source in self.main_sources:
+ change_server_of_source(source, uri, seen_binary)
+ for source in self.child_sources:
+ # Do not change the forces server of a child source
+ if (
+ source.template.base_uri is None
+ or source.template.base_uri != source.uri
+ ):
+ change_server_of_source(source, uri, seen_binary)
+ for source in self.source_code_sources:
+ change_server_of_source(source, uri, seen_source)
+
+ def is_codename(self, name):
+ """Compare a given name with the release codename."""
+ if name == self.codename:
+ return True
+ else:
+ return False
+
+
+class DebianDistribution(Distribution):
+ """Class to support specific Debian features"""
+
+ def is_codename(self, name):
+ """Compare a given name with the release codename and check if
+ if it can be used as a synonym for a development releases"""
+ if name == self.codename or self.release in ("testing", "unstable"):
+ return True
+ else:
+ return False
+
+ def _get_mirror_name(self, server):
+ """Try to get a human readable name for the main mirror of a country
+ Debian specific"""
+ country = None
+ i = server.find("://ftp.")
+ li = server.find(".debian.org")
+ if i != -1 and li != -1:
+ country = server[i + len("://ftp.") : li]
+ if country in self.countries:
+ # TRANSLATORS: %s is a country
+ return (
+ _("Server for %s")
+ % gettext.dgettext(
+ "iso_3166", self.countries[country].rstrip()
+ ).rstrip()
+ )
+ else:
+ return "%s" % server.rstrip("/ ")
+
+ def get_mirrors(self):
+ Distribution.get_mirrors(
+ self, mirror_template="http://ftp.%s.debian.org/debian/"
+ )
+
+
+class UbuntuDistribution(Distribution):
+ """Class to support specific Ubuntu features"""
+
+ def get_mirrors(self):
+ Distribution.get_mirrors(
+ self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/"
+ )
+
+
+class UbuntuRTMDistribution(UbuntuDistribution):
+ """Class to support specific Ubuntu RTM features"""
+
+ def get_mirrors(self):
+ self.main_server = self.source_template.base_uri
+
+
+def _lsb_release():
+ """Call lsb_release --idrc and return a mapping."""
+ import errno
+ from subprocess import PIPE, Popen
+
+ result = {
+ "Codename": "sid",
+ "Distributor ID": "Debian",
+ "Description": "Debian GNU/Linux unstable (sid)",
+ "Release": "unstable",
+ }
+ try:
+ out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0]
+ # Convert to unicode string, needed for Python 3.1
+ out = out.decode("utf-8")
+ result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ logging.warning("lsb_release failed, using defaults:" % exc)
+ return result
+
+
+def _system_image_channel():
+ """Get the current channel from system-image-cli -i if possible."""
+ import errno
+ from subprocess import DEVNULL, PIPE, Popen
+
+ try:
+ out = Popen(
+ ["system-image-cli", "-i"],
+ stdout=PIPE,
+ stderr=DEVNULL,
+ universal_newlines=True,
+ ).communicate()[0]
+ for line in out.splitlines():
+ if line.startswith("channel: "):
+ return line.split(": ", 1)[1]
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ logging.warning("system-image-cli failed, using defaults: %s" % exc)
+ return None
+
+
+class _OSRelease:
+ DEFAULT_OS_RELEASE_FILE = "/etc/os-release"
+ OS_RELEASE_FILE = "/etc/os-release"
+
+ def __init__(self, lsb_compat=True):
+ self.result = {}
+ self.valid = False
+ self.file = _OSRelease.OS_RELEASE_FILE
+
+ if not os.path.isfile(self.file):
+ return
+
+ self.parse()
+ self.valid = True
+
+ if lsb_compat:
+ self.inject_lsb_compat()
+
+ def inject_lsb_compat(self):
+ self.result["Distributor ID"] = self.result["ID"]
+ self.result["Description"] = self.result["PRETTY_NAME"]
+ # Optionals as per os-release spec.
+ self.result["Codename"] = self.result.get("VERSION_CODENAME")
+ if not self.result["Codename"]:
+ # Transient Ubuntu 16.04 field (LP: #1598212)
+ self.result["Codename"] = self.result.get("UBUNTU_CODENAME")
+ self.result["Release"] = self.result.get("VERSION_ID")
+
+ def parse(self):
+ f = open(self.file)
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ self.parse_entry(*line.split("=", 1))
+ f.close()
+
+ def parse_entry(self, key, value):
+ value = self.parse_value(value) # Values can be shell strings...
+ if key == "ID_LIKE" and isinstance(value, str):
+ # ID_LIKE is specified as quoted space-separated list. This will
+ # be parsed as string that we need to split manually.
+ value = value.split(" ")
+ self.result[key] = value
+
+ def parse_value(self, value):
+ values = shlex.split(value)
+ if len(values) == 1:
+ return values[0]
+ return values
+
+
+def get_distro(id=None, codename=None, description=None, release=None, is_like=[]):
+ """
+ Check the currently used distribution and return the corresponding
+ distriubtion class that supports distro specific features.
+
+ If no paramter are given the distro will be auto detected via
+ a call to lsb-release
+ """
+ # make testing easier
+ if not (id and codename and description and release):
+ if id or codename or description or release:
+ warnings.warn(
+ "Provided only a subset of arguments", DeprecationWarning, stacklevel=2
+ )
+ os_release = _OSRelease()
+ os_result = []
+ lsb_result = _lsb_release()
+ if os_release.valid:
+ os_result = os_release.result
+ # TODO: We cannot presently use os-release to fully replace lsb_release
+ # because os-release's ID, VERSION_ID and VERSION_CODENAME fields
+ # are specified as lowercase. In lsb_release they can be upcase
+ # or captizalized. So, switching to os-release would consitute
+ # a behavior break a which point lsb_release support should be
+ # fully removed.
+ # This in particular is a problem for template matching, as this
+ # matches against Distribution objects and depends on string
+ # case.
+ lsb_result = _lsb_release()
+ id = lsb_result["Distributor ID"]
+ codename = lsb_result["Codename"]
+ description = lsb_result["Description"]
+ release = lsb_result["Release"]
+ # Not available with LSB, use get directly.
+ is_like = os_result.get("ID_LIKE", [])
+ if id == "Ubuntu":
+ channel = _system_image_channel()
+ if channel is not None and "ubuntu-rtm/" in channel:
+ id = "Ubuntu-RTM"
+ codename = channel.rsplit("/", 1)[1].split("-", 1)[0]
+ description = codename
+ release = codename
+ if id == "Ubuntu":
+ return UbuntuDistribution(id, codename, description, release, is_like)
+ if id == "Ubuntu-RTM":
+ return UbuntuRTMDistribution(id, codename, description, release, is_like)
+ elif id == "Debian":
+ return DebianDistribution(id, codename, description, release, is_like)
+ else:
+ return Distribution(id, codename, description, release, is_like)