diff options
Diffstat (limited to 'aptsources/distro.py')
-rw-r--r-- | aptsources/distro.py | 648 |
1 files changed, 648 insertions, 0 deletions
diff --git a/aptsources/distro.py b/aptsources/distro.py new file mode 100644 index 0000000..546d0e7 --- /dev/null +++ b/aptsources/distro.py @@ -0,0 +1,648 @@ +# distro.py - Provide a distro abstraction of the sources.list +# +# Copyright (c) 2004-2009 Canonical Ltd. +# Copyright (c) 2006-2007 Sebastian Heinlein +# Copyright (c) 2016 Harald Sitter +# +# Authors: Sebastian Heinlein <glatzor@ubuntu.com> +# Michael Vogt <mvo@debian.org> +# Harald Sitter <sitter@kde.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA + +import gettext +import logging +import os +import re +import shlex +import warnings +from xml.etree.ElementTree import ElementTree + +from apt_pkg import gettext as _ + + +class NoDistroTemplateException(Exception): + pass + + +class Distribution: + def __init__(self, id, codename, description, release, is_like=[]): + """Container for distribution specific informations""" + # LSB information + self.id = id + self.codename = codename + self.description = description + self.release = release + self.is_like = is_like + + self.binary_type = "deb" + self.source_type = "deb-src" + + def get_sources(self, sourceslist): + """ + Find the corresponding template, main and child sources + for the distribution + """ + + self.sourceslist = sourceslist + # corresponding sources + self.source_template = None + self.child_sources = [] + self.main_sources = [] + self.disabled_sources = [] + self.cdrom_sources = [] + self.download_comps = [] + self.enabled_comps = [] + self.cdrom_comps = [] + self.used_media = [] + self.get_source_code = False + self.source_code_sources = [] + + # location of the sources + self.default_server = "" + self.main_server = "" + self.nearest_server = "" + self.used_servers = [] + + # find the distro template + for template in self.sourceslist.matcher.templates: + if self.is_codename(template.name) and template.distribution == self.id: + # print "yeah! found a template for %s" % self.description + # print template.description, template.base_uri, \ + # template.components + self.source_template = template + break + if self.source_template is None: + raise NoDistroTemplateException( + "Error: could not find a distribution template for %s/%s" + % (self.id, self.codename) + ) + + # find main and child sources + media = [] + comps = [] + cdrom_comps = [] + enabled_comps = [] + # source_code = [] + for source in self.sourceslist.exploded_list(): + if ( + not source.invalid + and self.is_codename(source.dist) + and source.template + and source.template.official + and self.is_codename(source.template.name) + ): + # print "yeah! found a distro repo: %s" % source.line + # cdroms need do be handled differently + if source.uri.startswith("cdrom:") and not source.disabled: + self.cdrom_sources.append(source) + cdrom_comps.extend(source.comps) + elif source.uri.startswith("cdrom:") and source.disabled: + self.cdrom_sources.append(source) + elif source.type == self.binary_type and not source.disabled: + self.main_sources.append(source) + comps.extend(source.comps) + media.append(source.uri) + elif source.type == self.binary_type and source.disabled: + self.disabled_sources.append(source) + elif source.type == self.source_type and not source.disabled: + self.source_code_sources.append(source) + elif source.type == self.source_type and source.disabled: + self.disabled_sources.append(source) + if not source.invalid and source.template in self.source_template.children: + if not source.disabled and source.type == self.binary_type: + self.child_sources.append(source) + elif not source.disabled and source.type == self.source_type: + self.source_code_sources.append(source) + else: + self.disabled_sources.append(source) + self.download_comps = set(comps) + self.cdrom_comps = set(cdrom_comps) + enabled_comps.extend(comps) + enabled_comps.extend(cdrom_comps) + self.enabled_comps = set(enabled_comps) + self.used_media = set(media) + self.get_mirrors() + + def get_mirrors(self, mirror_template=None): + """ + Provide a set of mirrors where you can get the distribution from + """ + # the main server is stored in the template + self.main_server = self.source_template.base_uri + + # other used servers + for medium in self.used_media: + if not medium.startswith("cdrom:"): + # seems to be a network source + self.used_servers.append(medium) + + if len(self.main_sources) == 0: + self.default_server = self.main_server + else: + self.default_server = self.main_sources[0].uri + + # get a list of country codes and real names + self.countries = {} + fname = "/usr/share/xml/iso-codes/iso_3166.xml" + if os.path.exists(fname): + et = ElementTree(file=fname) + # python2.6 compat, the next two lines can get removed + # once we do not use py2.6 anymore + if getattr(et, "iter", None) is None: + et.iter = et.getiterator + it = et.iter("iso_3166_entry") + for elm in it: + try: + descr = elm.attrib["common_name"] + except KeyError: + descr = elm.attrib["name"] + try: + code = elm.attrib["alpha_2_code"] + except KeyError: + code = elm.attrib["alpha_3_code"] + self.countries[code.lower()] = gettext.dgettext("iso_3166", descr) + + # try to guess the nearest mirror from the locale + self.country = None + self.country_code = None + locale = os.getenv("LANG", default="en_UK") + a = locale.find("_") + z = locale.find(".") + if z == -1: + z = len(locale) + country_code = locale[a + 1 : z].lower() + + if mirror_template: + self.nearest_server = mirror_template % country_code + + if country_code in self.countries: + self.country = self.countries[country_code] + self.country_code = country_code + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Customize for different distributions""" + country = None + i = server.find("://") + li = server.find(".archive.ubuntu.com") + if i != -1 and li != -1: + country = server[i + len("://") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return _("Server for %s") % self.countries[country] + else: + return "%s" % server.rstrip("/ ") + + def get_server_list(self): + """Return a list of used and suggested servers""" + + def compare_mirrors(mir1, mir2): + """Helper function that handles comaprision of mirror urls + that could contain trailing slashes""" + return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) + + # Store all available servers: + # Name, URI, active + mirrors = [] + if len(self.used_servers) < 1 or ( + len(self.used_servers) == 1 + and compare_mirrors(self.used_servers[0], self.main_server) + ): + mirrors.append([_("Main server"), self.main_server, True]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + elif len(self.used_servers) == 1 and not compare_mirrors( + self.used_servers[0], self.main_server + ): + mirrors.append([_("Main server"), self.main_server, False]) + # Only one server is used + server = self.used_servers[0] + + # Append the nearest server if it's not already used + if self.nearest_server: + if not compare_mirrors(server, self.nearest_server): + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + if server: + mirrors.append([self._get_mirror_name(server), server, True]) + + elif len(self.used_servers) > 1: + # More than one server is used. Since we don't handle this case + # in the user interface we set "custom servers" to true and + # append a list of all used servers + mirrors.append([_("Main server"), self.main_server, False]) + if self.nearest_server: + mirrors.append( + [ + self._get_mirror_name(self.nearest_server), + self.nearest_server, + False, + ] + ) + mirrors.append([_("Custom servers"), None, True]) + for server in self.used_servers: + mirror_entry = [self._get_mirror_name(server), server, False] + if compare_mirrors(server, self.nearest_server) or compare_mirrors( + server, self.main_server + ): + continue + elif mirror_entry not in mirrors: + mirrors.append(mirror_entry) + + return mirrors + + def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): + """ + Add distribution specific sources + """ + if uri is None: + # FIXME: Add support for the server selector + uri = self.default_server + if dist is None: + dist = self.codename + if comps is None: + comps = list(self.enabled_comps) + if type is None: + type = self.binary_type + + parent = None + file = None + for parent in reversed(self.child_sources) or reversed(self.main_sources): + file = parent.file + break + + new_source = self.sourceslist.add( + type, uri, dist, comps, comment, parent=parent, file=file + ) + # if source code is enabled add a deb-src line after the new + # source + if self.get_source_code and type == self.binary_type: + self.sourceslist.add( + self.source_type, + uri, + dist, + comps, + comment, + file=new_source.file, + parent=new_source, + pos=self.sourceslist.list.index(new_source) + 1, + ) + + def enable_component(self, comp): + """ + Enable a component in all main, child and source code sources + (excluding cdrom based sources) + + comp: the component that should be enabled + """ + comps = list([comp]) + # look for parent components that we may have to add + for source in self.main_sources: + for c in source.template.components: + if c.name == comp and c.parent_component: + if c.parent_component not in comps: + comps.append(c.parent_component) + for c in comps: + self._enable_component(c) + + def _enable_component(self, comp): + def add_component_only_once(source, comps_per_dist): + """ + Check if we already added the component to the repository, since + a repository could be splitted into different apt lines. If not + add the component + """ + # if we don't have that distro, just return (can happen for e.g. + # dapper-update only in deb-src + if source.dist not in comps_per_dist: + return + # if we have seen this component already for this distro, + # return (nothing to do) + if comp in comps_per_dist[source.dist]: + return + # add it + source.comps = source.comps + [comp] + comps_per_dist[source.dist].add(comp) + + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + # store what comps are enabled already per distro (where distro is + # e.g. "dapper", "dapper-updates") + comps_per_dist = {} + comps_per_sdist = {} + for s in sources: + if s.type == self.binary_type: + if s.dist not in comps_per_dist: + comps_per_dist[s.dist] = set() + for c in s.comps: + comps_per_dist[s.dist].add(c) + for s in self.source_code_sources: + if s.type == self.source_type: + if s.dist not in comps_per_sdist: + comps_per_sdist[s.dist] = set() + for c in s.comps: + comps_per_sdist[s.dist].add(c) + + # check if there is a main source at all + if len(self.main_sources) < 1: + # create a new main source + self.add_source(comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in sources: + add_component_only_once(source, comps_per_dist) + + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + # check if there is a main source code source at all + if self.get_source_code: + if len(self.source_code_sources) < 1: + # create a new main source + self.add_source(type=self.source_type, comps=["%s" % comp]) + else: + # add the comp to all main, child and source code sources + for source in self.source_code_sources: + add_component_only_once(source, comps_per_sdist) + + def disable_component(self, comp): + """ + Disable a component in all main, child and source code sources + (excluding cdrom based sources) + """ + sources = [] + sources.extend(self.main_sources) + sources.extend(self.child_sources) + sources.extend(self.source_code_sources) + if comp in self.cdrom_comps: + sources = [] + sources.extend(self.main_sources) + for source in sources: + if comp in source.comps: + comps = source.comps + comps.remove(comp) + source.comps = comps + if len(source.comps) < 1: + self.sourceslist.remove(source) + + def change_server(self, uri): + """Change the server of all distro specific sources to + a given host""" + + def change_server_of_source(source, uri, seen): + # Avoid creating duplicate entries + source.uri = uri + for comp in source.comps: + if [source.uri, source.dist, comp] in seen: + source.comps.remove(comp) + else: + seen.append([source.uri, source.dist, comp]) + if len(source.comps) < 1: + self.sourceslist.remove(source) + + seen_binary = [] + seen_source = [] + self.default_server = uri + for source in self.main_sources: + change_server_of_source(source, uri, seen_binary) + for source in self.child_sources: + # Do not change the forces server of a child source + if ( + source.template.base_uri is None + or source.template.base_uri != source.uri + ): + change_server_of_source(source, uri, seen_binary) + for source in self.source_code_sources: + change_server_of_source(source, uri, seen_source) + + def is_codename(self, name): + """Compare a given name with the release codename.""" + if name == self.codename: + return True + else: + return False + + +class DebianDistribution(Distribution): + """Class to support specific Debian features""" + + def is_codename(self, name): + """Compare a given name with the release codename and check if + if it can be used as a synonym for a development releases""" + if name == self.codename or self.release in ("testing", "unstable"): + return True + else: + return False + + def _get_mirror_name(self, server): + """Try to get a human readable name for the main mirror of a country + Debian specific""" + country = None + i = server.find("://ftp.") + li = server.find(".debian.org") + if i != -1 and li != -1: + country = server[i + len("://ftp.") : li] + if country in self.countries: + # TRANSLATORS: %s is a country + return ( + _("Server for %s") + % gettext.dgettext( + "iso_3166", self.countries[country].rstrip() + ).rstrip() + ) + else: + return "%s" % server.rstrip("/ ") + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://ftp.%s.debian.org/debian/" + ) + + +class UbuntuDistribution(Distribution): + """Class to support specific Ubuntu features""" + + def get_mirrors(self): + Distribution.get_mirrors( + self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/" + ) + + +class UbuntuRTMDistribution(UbuntuDistribution): + """Class to support specific Ubuntu RTM features""" + + def get_mirrors(self): + self.main_server = self.source_template.base_uri + + +def _lsb_release(): + """Call lsb_release --idrc and return a mapping.""" + import errno + from subprocess import PIPE, Popen + + result = { + "Codename": "sid", + "Distributor ID": "Debian", + "Description": "Debian GNU/Linux unstable (sid)", + "Release": "unstable", + } + try: + out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0] + # Convert to unicode string, needed for Python 3.1 + out = out.decode("utf-8") + result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line) + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("lsb_release failed, using defaults:" % exc) + return result + + +def _system_image_channel(): + """Get the current channel from system-image-cli -i if possible.""" + import errno + from subprocess import DEVNULL, PIPE, Popen + + try: + out = Popen( + ["system-image-cli", "-i"], + stdout=PIPE, + stderr=DEVNULL, + universal_newlines=True, + ).communicate()[0] + for line in out.splitlines(): + if line.startswith("channel: "): + return line.split(": ", 1)[1] + except OSError as exc: + if exc.errno != errno.ENOENT: + logging.warning("system-image-cli failed, using defaults: %s" % exc) + return None + + +class _OSRelease: + DEFAULT_OS_RELEASE_FILE = "/etc/os-release" + OS_RELEASE_FILE = "/etc/os-release" + + def __init__(self, lsb_compat=True): + self.result = {} + self.valid = False + self.file = _OSRelease.OS_RELEASE_FILE + + if not os.path.isfile(self.file): + return + + self.parse() + self.valid = True + + if lsb_compat: + self.inject_lsb_compat() + + def inject_lsb_compat(self): + self.result["Distributor ID"] = self.result["ID"] + self.result["Description"] = self.result["PRETTY_NAME"] + # Optionals as per os-release spec. + self.result["Codename"] = self.result.get("VERSION_CODENAME") + if not self.result["Codename"]: + # Transient Ubuntu 16.04 field (LP: #1598212) + self.result["Codename"] = self.result.get("UBUNTU_CODENAME") + self.result["Release"] = self.result.get("VERSION_ID") + + def parse(self): + f = open(self.file) + for line in f: + line = line.strip() + if not line: + continue + self.parse_entry(*line.split("=", 1)) + f.close() + + def parse_entry(self, key, value): + value = self.parse_value(value) # Values can be shell strings... + if key == "ID_LIKE" and isinstance(value, str): + # ID_LIKE is specified as quoted space-separated list. This will + # be parsed as string that we need to split manually. + value = value.split(" ") + self.result[key] = value + + def parse_value(self, value): + values = shlex.split(value) + if len(values) == 1: + return values[0] + return values + + +def get_distro(id=None, codename=None, description=None, release=None, is_like=[]): + """ + Check the currently used distribution and return the corresponding + distriubtion class that supports distro specific features. + + If no paramter are given the distro will be auto detected via + a call to lsb-release + """ + # make testing easier + if not (id and codename and description and release): + if id or codename or description or release: + warnings.warn( + "Provided only a subset of arguments", DeprecationWarning, stacklevel=2 + ) + os_release = _OSRelease() + os_result = [] + lsb_result = _lsb_release() + if os_release.valid: + os_result = os_release.result + # TODO: We cannot presently use os-release to fully replace lsb_release + # because os-release's ID, VERSION_ID and VERSION_CODENAME fields + # are specified as lowercase. In lsb_release they can be upcase + # or captizalized. So, switching to os-release would consitute + # a behavior break a which point lsb_release support should be + # fully removed. + # This in particular is a problem for template matching, as this + # matches against Distribution objects and depends on string + # case. + lsb_result = _lsb_release() + id = lsb_result["Distributor ID"] + codename = lsb_result["Codename"] + description = lsb_result["Description"] + release = lsb_result["Release"] + # Not available with LSB, use get directly. + is_like = os_result.get("ID_LIKE", []) + if id == "Ubuntu": + channel = _system_image_channel() + if channel is not None and "ubuntu-rtm/" in channel: + id = "Ubuntu-RTM" + codename = channel.rsplit("/", 1)[1].split("-", 1)[0] + description = codename + release = codename + if id == "Ubuntu": + return UbuntuDistribution(id, codename, description, release, is_like) + if id == "Ubuntu-RTM": + return UbuntuRTMDistribution(id, codename, description, release, is_like) + elif id == "Debian": + return DebianDistribution(id, codename, description, release, is_like) + else: + return Distribution(id, codename, description, release, is_like) |