# distro.py - Provide a distro abstraction of the sources.list # # Copyright (c) 2004-2009 Canonical Ltd. # Copyright (c) 2006-2007 Sebastian Heinlein # Copyright (c) 2016 Harald Sitter # # Authors: Sebastian Heinlein # Michael Vogt # Harald Sitter # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import gettext import logging import os import re import shlex import warnings from xml.etree.ElementTree import ElementTree from apt_pkg import gettext as _ class NoDistroTemplateException(Exception): pass class Distribution: def __init__(self, id, codename, description, release, is_like=[]): """Container for distribution specific informations""" # LSB information self.id = id self.codename = codename self.description = description self.release = release self.is_like = is_like self.binary_type = "deb" self.source_type = "deb-src" def get_sources(self, sourceslist): """ Find the corresponding template, main and child sources for the distribution """ self.sourceslist = sourceslist # corresponding sources self.source_template = None self.child_sources = [] self.main_sources = [] self.disabled_sources = [] self.cdrom_sources = [] self.download_comps = [] self.enabled_comps = [] self.cdrom_comps = [] self.used_media = [] self.get_source_code = False self.source_code_sources = [] # location of the sources self.default_server = "" self.main_server = "" self.nearest_server = "" self.used_servers = [] # find the distro template for template in self.sourceslist.matcher.templates: if self.is_codename(template.name) and template.distribution == self.id: # print "yeah! found a template for %s" % self.description # print template.description, template.base_uri, \ # template.components self.source_template = template break if self.source_template is None: raise NoDistroTemplateException( "Error: could not find a distribution template for %s/%s" % (self.id, self.codename) ) # find main and child sources media = [] comps = [] cdrom_comps = [] enabled_comps = [] # source_code = [] for source in self.sourceslist.exploded_list(): if ( not source.invalid and self.is_codename(source.dist) and source.template and source.template.official and self.is_codename(source.template.name) ): # print "yeah! found a distro repo: %s" % source.line # cdroms need do be handled differently if source.uri.startswith("cdrom:") and not source.disabled: self.cdrom_sources.append(source) cdrom_comps.extend(source.comps) elif source.uri.startswith("cdrom:") and source.disabled: self.cdrom_sources.append(source) elif source.type == self.binary_type and not source.disabled: self.main_sources.append(source) comps.extend(source.comps) media.append(source.uri) elif source.type == self.binary_type and source.disabled: self.disabled_sources.append(source) elif source.type == self.source_type and not source.disabled: self.source_code_sources.append(source) elif source.type == self.source_type and source.disabled: self.disabled_sources.append(source) if not source.invalid and source.template in self.source_template.children: if not source.disabled and source.type == self.binary_type: self.child_sources.append(source) elif not source.disabled and source.type == self.source_type: self.source_code_sources.append(source) else: self.disabled_sources.append(source) self.download_comps = set(comps) self.cdrom_comps = set(cdrom_comps) enabled_comps.extend(comps) enabled_comps.extend(cdrom_comps) self.enabled_comps = set(enabled_comps) self.used_media = set(media) self.get_mirrors() def get_mirrors(self, mirror_template=None): """ Provide a set of mirrors where you can get the distribution from """ # the main server is stored in the template self.main_server = self.source_template.base_uri # other used servers for medium in self.used_media: if not medium.startswith("cdrom:"): # seems to be a network source self.used_servers.append(medium) if len(self.main_sources) == 0: self.default_server = self.main_server else: self.default_server = self.main_sources[0].uri # get a list of country codes and real names self.countries = {} fname = "/usr/share/xml/iso-codes/iso_3166.xml" if os.path.exists(fname): et = ElementTree(file=fname) # python2.6 compat, the next two lines can get removed # once we do not use py2.6 anymore if getattr(et, "iter", None) is None: et.iter = et.getiterator it = et.iter("iso_3166_entry") for elm in it: try: descr = elm.attrib["common_name"] except KeyError: descr = elm.attrib["name"] try: code = elm.attrib["alpha_2_code"] except KeyError: code = elm.attrib["alpha_3_code"] self.countries[code.lower()] = gettext.dgettext("iso_3166", descr) # try to guess the nearest mirror from the locale self.country = None self.country_code = None locale = os.getenv("LANG", default="en_UK") a = locale.find("_") z = locale.find(".") if z == -1: z = len(locale) country_code = locale[a + 1 : z].lower() if mirror_template: self.nearest_server = mirror_template % country_code if country_code in self.countries: self.country = self.countries[country_code] self.country_code = country_code def _get_mirror_name(self, server): """Try to get a human readable name for the main mirror of a country Customize for different distributions""" country = None i = server.find("://") li = server.find(".archive.ubuntu.com") if i != -1 and li != -1: country = server[i + len("://") : li] if country in self.countries: # TRANSLATORS: %s is a country return _("Server for %s") % self.countries[country] else: return "%s" % server.rstrip("/ ") def get_server_list(self): """Return a list of used and suggested servers""" def compare_mirrors(mir1, mir2): """Helper function that handles comaprision of mirror urls that could contain trailing slashes""" return re.match(mir1.strip("/ "), mir2.rstrip("/ ")) # Store all available servers: # Name, URI, active mirrors = [] if len(self.used_servers) < 1 or ( len(self.used_servers) == 1 and compare_mirrors(self.used_servers[0], self.main_server) ): mirrors.append([_("Main server"), self.main_server, True]) if self.nearest_server: mirrors.append( [ self._get_mirror_name(self.nearest_server), self.nearest_server, False, ] ) elif len(self.used_servers) == 1 and not compare_mirrors( self.used_servers[0], self.main_server ): mirrors.append([_("Main server"), self.main_server, False]) # Only one server is used server = self.used_servers[0] # Append the nearest server if it's not already used if self.nearest_server: if not compare_mirrors(server, self.nearest_server): mirrors.append( [ self._get_mirror_name(self.nearest_server), self.nearest_server, False, ] ) if server: mirrors.append([self._get_mirror_name(server), server, True]) elif len(self.used_servers) > 1: # More than one server is used. Since we don't handle this case # in the user interface we set "custom servers" to true and # append a list of all used servers mirrors.append([_("Main server"), self.main_server, False]) if self.nearest_server: mirrors.append( [ self._get_mirror_name(self.nearest_server), self.nearest_server, False, ] ) mirrors.append([_("Custom servers"), None, True]) for server in self.used_servers: mirror_entry = [self._get_mirror_name(server), server, False] if compare_mirrors(server, self.nearest_server) or compare_mirrors( server, self.main_server ): continue elif mirror_entry not in mirrors: mirrors.append(mirror_entry) return mirrors def add_source(self, type=None, uri=None, dist=None, comps=None, comment=""): """ Add distribution specific sources """ if uri is None: # FIXME: Add support for the server selector uri = self.default_server if dist is None: dist = self.codename if comps is None: comps = list(self.enabled_comps) if type is None: type = self.binary_type parent = None file = None for parent in reversed(self.child_sources) or reversed(self.main_sources): file = parent.file break new_source = self.sourceslist.add( type, uri, dist, comps, comment, parent=parent, file=file ) # if source code is enabled add a deb-src line after the new # source if self.get_source_code and type == self.binary_type: self.sourceslist.add( self.source_type, uri, dist, comps, comment, file=new_source.file, parent=new_source, pos=self.sourceslist.list.index(new_source) + 1, ) def enable_component(self, comp): """ Enable a component in all main, child and source code sources (excluding cdrom based sources) comp: the component that should be enabled """ comps = list([comp]) # look for parent components that we may have to add for source in self.main_sources: for c in source.template.components: if c.name == comp and c.parent_component: if c.parent_component not in comps: comps.append(c.parent_component) for c in comps: self._enable_component(c) def _enable_component(self, comp): def add_component_only_once(source, comps_per_dist): """ Check if we already added the component to the repository, since a repository could be splitted into different apt lines. If not add the component """ # if we don't have that distro, just return (can happen for e.g. # dapper-update only in deb-src if source.dist not in comps_per_dist: return # if we have seen this component already for this distro, # return (nothing to do) if comp in comps_per_dist[source.dist]: return # add it source.comps = source.comps + [comp] comps_per_dist[source.dist].add(comp) sources = [] sources.extend(self.main_sources) sources.extend(self.child_sources) # store what comps are enabled already per distro (where distro is # e.g. "dapper", "dapper-updates") comps_per_dist = {} comps_per_sdist = {} for s in sources: if s.type == self.binary_type: if s.dist not in comps_per_dist: comps_per_dist[s.dist] = set() for c in s.comps: comps_per_dist[s.dist].add(c) for s in self.source_code_sources: if s.type == self.source_type: if s.dist not in comps_per_sdist: comps_per_sdist[s.dist] = set() for c in s.comps: comps_per_sdist[s.dist].add(c) # check if there is a main source at all if len(self.main_sources) < 1: # create a new main source self.add_source(comps=["%s" % comp]) else: # add the comp to all main, child and source code sources for source in sources: add_component_only_once(source, comps_per_dist) for source in self.source_code_sources: add_component_only_once(source, comps_per_sdist) # check if there is a main source code source at all if self.get_source_code: if len(self.source_code_sources) < 1: # create a new main source self.add_source(type=self.source_type, comps=["%s" % comp]) else: # add the comp to all main, child and source code sources for source in self.source_code_sources: add_component_only_once(source, comps_per_sdist) def disable_component(self, comp): """ Disable a component in all main, child and source code sources (excluding cdrom based sources) """ sources = [] sources.extend(self.main_sources) sources.extend(self.child_sources) sources.extend(self.source_code_sources) if comp in self.cdrom_comps: sources = [] sources.extend(self.main_sources) for source in sources: if comp in source.comps: comps = source.comps comps.remove(comp) source.comps = comps if len(source.comps) < 1: self.sourceslist.remove(source) def change_server(self, uri): """Change the server of all distro specific sources to a given host""" def change_server_of_source(source, uri, seen): # Avoid creating duplicate entries source.uri = uri for comp in source.comps: if [source.uri, source.dist, comp] in seen: source.comps.remove(comp) else: seen.append([source.uri, source.dist, comp]) if len(source.comps) < 1: self.sourceslist.remove(source) seen_binary = [] seen_source = [] self.default_server = uri for source in self.main_sources: change_server_of_source(source, uri, seen_binary) for source in self.child_sources: # Do not change the forces server of a child source if ( source.template.base_uri is None or source.template.base_uri != source.uri ): change_server_of_source(source, uri, seen_binary) for source in self.source_code_sources: change_server_of_source(source, uri, seen_source) def is_codename(self, name): """Compare a given name with the release codename.""" if name == self.codename: return True else: return False class DebianDistribution(Distribution): """Class to support specific Debian features""" def is_codename(self, name): """Compare a given name with the release codename and check if if it can be used as a synonym for a development releases""" if name == self.codename or self.release in ("testing", "unstable"): return True else: return False def _get_mirror_name(self, server): """Try to get a human readable name for the main mirror of a country Debian specific""" country = None i = server.find("://ftp.") li = server.find(".debian.org") if i != -1 and li != -1: country = server[i + len("://ftp.") : li] if country in self.countries: # TRANSLATORS: %s is a country return ( _("Server for %s") % gettext.dgettext( "iso_3166", self.countries[country].rstrip() ).rstrip() ) else: return "%s" % server.rstrip("/ ") def get_mirrors(self): Distribution.get_mirrors( self, mirror_template="http://ftp.%s.debian.org/debian/" ) class UbuntuDistribution(Distribution): """Class to support specific Ubuntu features""" def get_mirrors(self): Distribution.get_mirrors( self, mirror_template="http://%s.archive.ubuntu.com/ubuntu/" ) class UbuntuRTMDistribution(UbuntuDistribution): """Class to support specific Ubuntu RTM features""" def get_mirrors(self): self.main_server = self.source_template.base_uri def _lsb_release(): """Call lsb_release --idrc and return a mapping.""" import errno from subprocess import PIPE, Popen result = { "Codename": "sid", "Distributor ID": "Debian", "Description": "Debian GNU/Linux unstable (sid)", "Release": "unstable", } try: out = Popen(["lsb_release", "-idrc"], stdout=PIPE).communicate()[0] # Convert to unicode string, needed for Python 3.1 out = out.decode("utf-8") result.update(line.split(":\t") for line in out.split("\n") if ":\t" in line) except OSError as exc: if exc.errno != errno.ENOENT: logging.warning("lsb_release failed, using defaults:" % exc) return result def _system_image_channel(): """Get the current channel from system-image-cli -i if possible.""" import errno from subprocess import DEVNULL, PIPE, Popen try: out = Popen( ["system-image-cli", "-i"], stdout=PIPE, stderr=DEVNULL, universal_newlines=True, ).communicate()[0] for line in out.splitlines(): if line.startswith("channel: "): return line.split(": ", 1)[1] except OSError as exc: if exc.errno != errno.ENOENT: logging.warning("system-image-cli failed, using defaults: %s" % exc) return None class _OSRelease: DEFAULT_OS_RELEASE_FILE = "/etc/os-release" OS_RELEASE_FILE = "/etc/os-release" def __init__(self, lsb_compat=True): self.result = {} self.valid = False self.file = _OSRelease.OS_RELEASE_FILE if not os.path.isfile(self.file): return self.parse() self.valid = True if lsb_compat: self.inject_lsb_compat() def inject_lsb_compat(self): self.result["Distributor ID"] = self.result["ID"] self.result["Description"] = self.result["PRETTY_NAME"] # Optionals as per os-release spec. self.result["Codename"] = self.result.get("VERSION_CODENAME") if not self.result["Codename"]: # Transient Ubuntu 16.04 field (LP: #1598212) self.result["Codename"] = self.result.get("UBUNTU_CODENAME") self.result["Release"] = self.result.get("VERSION_ID") def parse(self): f = open(self.file) for line in f: line = line.strip() if not line: continue self.parse_entry(*line.split("=", 1)) f.close() def parse_entry(self, key, value): value = self.parse_value(value) # Values can be shell strings... if key == "ID_LIKE" and isinstance(value, str): # ID_LIKE is specified as quoted space-separated list. This will # be parsed as string that we need to split manually. value = value.split(" ") self.result[key] = value def parse_value(self, value): values = shlex.split(value) if len(values) == 1: return values[0] return values def get_distro(id=None, codename=None, description=None, release=None, is_like=[]): """ Check the currently used distribution and return the corresponding distriubtion class that supports distro specific features. If no paramter are given the distro will be auto detected via a call to lsb-release """ # make testing easier if not (id and codename and description and release): if id or codename or description or release: warnings.warn( "Provided only a subset of arguments", DeprecationWarning, stacklevel=2 ) os_release = _OSRelease() os_result = [] lsb_result = _lsb_release() if os_release.valid: os_result = os_release.result # TODO: We cannot presently use os-release to fully replace lsb_release # because os-release's ID, VERSION_ID and VERSION_CODENAME fields # are specified as lowercase. In lsb_release they can be upcase # or captizalized. So, switching to os-release would consitute # a behavior break a which point lsb_release support should be # fully removed. # This in particular is a problem for template matching, as this # matches against Distribution objects and depends on string # case. lsb_result = _lsb_release() id = lsb_result["Distributor ID"] codename = lsb_result["Codename"] description = lsb_result["Description"] release = lsb_result["Release"] # Not available with LSB, use get directly. is_like = os_result.get("ID_LIKE", []) if id == "Ubuntu": channel = _system_image_channel() if channel is not None and "ubuntu-rtm/" in channel: id = "Ubuntu-RTM" codename = channel.rsplit("/", 1)[1].split("-", 1)[0] description = codename release = codename if id == "Ubuntu": return UbuntuDistribution(id, codename, description, release, is_like) if id == "Ubuntu-RTM": return UbuntuRTMDistribution(id, codename, description, release, is_like) elif id == "Debian": return DebianDistribution(id, codename, description, release, is_like) else: return Distribution(id, codename, description, release, is_like)