# encoding: utf-8 # Copyright: (c) 2012, Matt Wright # Copyright: (c) 2013, Alexander Saltanov # Copyright: (c) 2014, Rutger Spiertz # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = ''' --- module: apt_repository short_description: Add and remove APT repositories description: - Add or remove an APT repositories in Ubuntu and Debian. extends_documentation_fragment: action_common_attributes attributes: check_mode: support: full diff_mode: support: full platform: platforms: debian notes: - This module supports Debian Squeeze (version 6) as well as its successors and derivatives. options: repo: description: - A source string for the repository. type: str required: true state: description: - A source string state. type: str choices: [ absent, present ] default: "present" mode: description: - The octal mode for newly created files in sources.list.d. - Default is what system uses (probably 0644). type: raw version_added: "1.6" update_cache: description: - Run the equivalent of C(apt-get update) when a change occurs. Cache updates are run after making changes. type: bool default: "yes" aliases: [ update-cache ] update_cache_retries: description: - Amount of retries if the cache update fails. Also see I(update_cache_retry_max_delay). type: int default: 5 version_added: '2.10' update_cache_retry_max_delay: description: - Use an exponential backoff delay for each retry (see I(update_cache_retries)) up to this max delay in seconds. type: int default: 12 version_added: '2.10' validate_certs: description: - If C(false), SSL certificates for the target repo will not be validated. This should only be used on personally controlled sites using self-signed certificates. type: bool default: 'yes' version_added: '1.8' filename: description: - Sets the name of the source list file in sources.list.d. Defaults to a file name based on the repository source url. The .list extension will be automatically added. type: str version_added: '2.1' codename: description: - Override the distribution codename to use for PPA repositories. Should usually only be set when working with a PPA on a non-Ubuntu target (for example, Debian or Mint). type: str version_added: '2.3' install_python_apt: description: - Whether to automatically try to install the Python apt library or not, if it is not already installed. Without this library, the module does not work. - Runs C(apt-get install python-apt) for Python 2, and C(apt-get install python3-apt) for Python 3. - Only works with the system Python 2 or Python 3. If you are using a Python on the remote that is not the system Python, set I(install_python_apt=false) and ensure that the Python apt library for your Python version is installed some other way. type: bool default: true author: - Alexander Saltanov (@sashka) version_added: "0.7" requirements: - python-apt (python 2) - python3-apt (python 3) - apt-key or gpg ''' EXAMPLES = ''' - name: Add specified repository into sources list ansible.builtin.apt_repository: repo: deb http://archive.canonical.com/ubuntu hardy partner state: present - name: Add specified repository into sources list using specified filename ansible.builtin.apt_repository: repo: deb http://dl.google.com/linux/chrome/deb/ stable main state: present filename: google-chrome - name: Add source repository into sources list ansible.builtin.apt_repository: repo: deb-src http://archive.canonical.com/ubuntu hardy partner state: present - name: Remove specified repository from sources list ansible.builtin.apt_repository: repo: deb http://archive.canonical.com/ubuntu hardy partner state: absent - name: Add nginx stable repository from PPA and install its signing key on Ubuntu target ansible.builtin.apt_repository: repo: ppa:nginx/stable - name: Add nginx stable repository from PPA and install its signing key on Debian target ansible.builtin.apt_repository: repo: 'ppa:nginx/stable' codename: trusty - name: One way to avoid apt_key once it is removed from your distro block: - name: somerepo |no apt key ansible.builtin.get_url: url: https://download.example.com/linux/ubuntu/gpg dest: /etc/apt/trusted.gpg.d/somerepo.asc - name: somerepo | apt source ansible.builtin.apt_repository: repo: "deb [arch=amd64 signed-by=/etc/apt/trusted.gpg.d/myrepo.asc] https://download.example.com/linux/ubuntu {{ ansible_distribution_release }} stable" state: present ''' RETURN = '''#''' import copy import glob import json import os import re import sys import tempfile import random import time from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.respawn import has_respawned, probe_interpreters_for_module, respawn_module from ansible.module_utils._text import to_native from ansible.module_utils.six import PY3 from ansible.module_utils.urls import fetch_url try: import apt import apt_pkg import aptsources.distro as aptsources_distro distro = aptsources_distro.get_distro() HAVE_PYTHON_APT = True except ImportError: apt = apt_pkg = aptsources_distro = distro = None HAVE_PYTHON_APT = False APT_KEY_DIRS = ['/etc/apt/keyrings', '/etc/apt/trusted.gpg.d', '/usr/share/keyrings'] DEFAULT_SOURCES_PERM = 0o0644 VALID_SOURCE_TYPES = ('deb', 'deb-src') def install_python_apt(module, apt_pkg_name): if not module.check_mode: apt_get_path = module.get_bin_path('apt-get') if apt_get_path: rc, so, se = module.run_command([apt_get_path, 'update']) if rc != 0: module.fail_json(msg="Failed to auto-install %s. Error was: '%s'" % (apt_pkg_name, se.strip())) rc, so, se = module.run_command([apt_get_path, 'install', apt_pkg_name, '-y', '-q']) if rc != 0: module.fail_json(msg="Failed to auto-install %s. Error was: '%s'" % (apt_pkg_name, se.strip())) else: module.fail_json(msg="%s must be installed to use check mode" % apt_pkg_name) class InvalidSource(Exception): pass # Simple version of aptsources.sourceslist.SourcesList. # No advanced logic and no backups inside. class SourcesList(object): def __init__(self, module): self.module = module self.files = {} # group sources by file # Repositories that we're adding -- used to implement mode param self.new_repos = set() self.default_file = self._apt_cfg_file('Dir::Etc::sourcelist') # read sources.list if it exists if os.path.isfile(self.default_file): self.load(self.default_file) # read sources.list.d for file in glob.iglob('%s/*.list' % self._apt_cfg_dir('Dir::Etc::sourceparts')): self.load(file) def __iter__(self): '''Simple iterator to go over all sources. Empty, non-source, and other not valid lines will be skipped.''' for file, sources in self.files.items(): for n, valid, enabled, source, comment in sources: if valid: yield file, n, enabled, source, comment def _expand_path(self, filename): if '/' in filename: return filename else: return os.path.abspath(os.path.join(self._apt_cfg_dir('Dir::Etc::sourceparts'), filename)) def _suggest_filename(self, line): def _cleanup_filename(s): filename = self.module.params['filename'] if filename is not None: return filename return '_'.join(re.sub('[^a-zA-Z0-9]', ' ', s).split()) def _strip_username_password(s): if '@' in s: s = s.split('@', 1) s = s[-1] return s # Drop options and protocols. line = re.sub(r'\[[^\]]+\]', '', line) line = re.sub(r'\w+://', '', line) # split line into valid keywords parts = [part for part in line.split() if part not in VALID_SOURCE_TYPES] # Drop usernames and passwords parts[0] = _strip_username_password(parts[0]) return '%s.list' % _cleanup_filename(' '.join(parts[:1])) def _parse(self, line, raise_if_invalid_or_disabled=False): valid = False enabled = True source = '' comment = '' line = line.strip() if line.startswith('#'): enabled = False line = line[1:] # Check for another "#" in the line and treat a part after it as a comment. i = line.find('#') if i > 0: comment = line[i + 1:].strip() line = line[:i] # Split a source into substring to make sure that it is source spec. # Duplicated whitespaces in a valid source spec will be removed. source = line.strip() if source: chunks = source.split() if chunks[0] in VALID_SOURCE_TYPES: valid = True source = ' '.join(chunks) if raise_if_invalid_or_disabled and (not valid or not enabled): raise InvalidSource(line) return valid, enabled, source, comment @staticmethod def _apt_cfg_file(filespec): ''' Wrapper for `apt_pkg` module for running with Python 2.5 ''' try: result = apt_pkg.config.find_file(filespec) except AttributeError: result = apt_pkg.Config.FindFile(filespec) return result @staticmethod def _apt_cfg_dir(dirspec): ''' Wrapper for `apt_pkg` module for running with Python 2.5 ''' try: result = apt_pkg.config.find_dir(dirspec) except AttributeError: result = apt_pkg.Config.FindDir(dirspec) return result def load(self, file): group = [] f = open(file, 'r') for n, line in enumerate(f): valid, enabled, source, comment = self._parse(line) group.append((n, valid, enabled, source, comment)) self.files[file] = group def save(self): for filename, sources in list(self.files.items()): if sources: d, fn = os.path.split(filename) try: os.makedirs(d) except OSError as ex: if not os.path.isdir(d): self.module.fail_json("Failed to create directory %s: %s" % (d, to_native(ex))) try: fd, tmp_path = tempfile.mkstemp(prefix=".%s-" % fn, dir=d) except (OSError, IOError) as e: self.module.fail_json(msg='Unable to create temp file at "%s" for apt source: %s' % (d, to_native(e))) f = os.fdopen(fd, 'w') for n, valid, enabled, source, comment in sources: chunks = [] if not enabled: chunks.append('# ') chunks.append(source) if comment: chunks.append(' # ') chunks.append(comment) chunks.append('\n') line = ''.join(chunks) try: f.write(line) except IOError as ex: self.module.fail_json(msg="Failed to write to file %s: %s" % (tmp_path, to_native(ex))) self.module.atomic_move(tmp_path, filename) # allow the user to override the default mode if filename in self.new_repos: this_mode = self.module.params.get('mode', DEFAULT_SOURCES_PERM) self.module.set_mode_if_different(filename, this_mode, False) else: del self.files[filename] if os.path.exists(filename): os.remove(filename) def dump(self): dumpstruct = {} for filename, sources in self.files.items(): if sources: lines = [] for n, valid, enabled, source, comment in sources: chunks = [] if not enabled: chunks.append('# ') chunks.append(source) if comment: chunks.append(' # ') chunks.append(comment) chunks.append('\n') lines.append(''.join(chunks)) dumpstruct[filename] = ''.join(lines) return dumpstruct def _choice(self, new, old): if new is None: return old return new def modify(self, file, n, enabled=None, source=None, comment=None): ''' This function to be used with iterator, so we don't care of invalid sources. If source, enabled, or comment is None, original value from line ``n`` will be preserved. ''' valid, enabled_old, source_old, comment_old = self.files[file][n][1:] self.files[file][n] = (n, valid, self._choice(enabled, enabled_old), self._choice(source, source_old), self._choice(comment, comment_old)) def _add_valid_source(self, source_new, comment_new, file): # We'll try to reuse disabled source if we have it. # If we have more than one entry, we will enable them all - no advanced logic, remember. self.module.log('ading source file: %s | %s | %s' % (source_new, comment_new, file)) found = False for filename, n, enabled, source, comment in self: if source == source_new: self.modify(filename, n, enabled=True) found = True if not found: if file is None: file = self.default_file else: file = self._expand_path(file) if file not in self.files: self.files[file] = [] files = self.files[file] files.append((len(files), True, True, source_new, comment_new)) self.new_repos.add(file) def add_source(self, line, comment='', file=None): source = self._parse(line, raise_if_invalid_or_disabled=True)[2] # Prefer separate files for new sources. self._add_valid_source(source, comment, file=file or self._suggest_filename(source)) def _remove_valid_source(self, source): # If we have more than one entry, we will remove them all (not comment, remove!) for filename, n, enabled, src, comment in self: if source == src and enabled: self.files[filename].pop(n) def remove_source(self, line): source = self._parse(line, raise_if_invalid_or_disabled=True)[2] self._remove_valid_source(source) class UbuntuSourcesList(SourcesList): LP_API = 'https://launchpad.net/api/1.0/~%s/+archive/%s' def __init__(self, module): self.module = module self.codename = module.params['codename'] or distro.codename super(UbuntuSourcesList, self).__init__(module) self.apt_key_bin = self.module.get_bin_path('apt-key', required=False) self.gpg_bin = self.module.get_bin_path('gpg', required=False) if not self.apt_key_bin and not self.gpg_bin: self.module.fail_json(msg='Either apt-key or gpg binary is required, but neither could be found') def __deepcopy__(self, memo=None): return UbuntuSourcesList(self.module) def _get_ppa_info(self, owner_name, ppa_name): lp_api = self.LP_API % (owner_name, ppa_name) headers = dict(Accept='application/json') response, info = fetch_url(self.module, lp_api, headers=headers) if info['status'] != 200: self.module.fail_json(msg="failed to fetch PPA information, error was: %s" % info['msg']) return json.loads(to_native(response.read())) def _expand_ppa(self, path): ppa = path.split(':')[1] ppa_owner = ppa.split('/')[0] try: ppa_name = ppa.split('/')[1] except IndexError: ppa_name = 'ppa' line = 'deb http://ppa.launchpad.net/%s/%s/ubuntu %s main' % (ppa_owner, ppa_name, self.codename) return line, ppa_owner, ppa_name def _key_already_exists(self, key_fingerprint): if self.apt_key_bin: rc, out, err = self.module.run_command([self.apt_key_bin, 'export', key_fingerprint], check_rc=True) found = len(err) == 0 else: found = self._gpg_key_exists(key_fingerprint) return found def _gpg_key_exists(self, key_fingerprint): found = False keyfiles = ['/etc/apt/trusted.gpg'] # main gpg repo for apt for other_dir in APT_KEY_DIRS: # add other known sources of gpg sigs for apt, skip hidden files keyfiles.extend([os.path.join(other_dir, x) for x in os.listdir(other_dir) if not x.startswith('.')]) for key_file in keyfiles: if os.path.exists(key_file): try: rc, out, err = self.module.run_command([self.gpg_bin, '--list-packets', key_file]) except (IOError, OSError) as e: self.debug("Could check key against file %s: %s" % (key_file, to_native(e))) continue if key_fingerprint in out: found = True break return found # https://www.linuxuprising.com/2021/01/apt-key-is-deprecated-how-to-add.html def add_source(self, line, comment='', file=None): if line.startswith('ppa:'): source, ppa_owner, ppa_name = self._expand_ppa(line) if source in self.repos_urls: # repository already exists return info = self._get_ppa_info(ppa_owner, ppa_name) # add gpg sig if needed if not self._key_already_exists(info['signing_key_fingerprint']): # TODO: report file that would have been added if not check_mode keyfile = '' if not self.module.check_mode: if self.apt_key_bin: command = [self.apt_key_bin, 'adv', '--recv-keys', '--no-tty', '--keyserver', 'hkp://keyserver.ubuntu.com:80', info['signing_key_fingerprint']] else: # use first available key dir, in order of preference for keydir in APT_KEY_DIRS: if os.path.exists(keydir): break else: self.module.fail_json("Unable to find any existing apt gpgp repo directories, tried the following: %s" % ', '.join(APT_KEY_DIRS)) keyfile = '%s/%s-%s-%s.gpg' % (keydir, os.path.basename(source).replace(' ', '-'), ppa_owner, ppa_name) command = [self.gpg_bin, '--no-tty', '--keyserver', 'hkp://keyserver.ubuntu.com:80', '--export', info['signing_key_fingerprint']] rc, stdout, stderr = self.module.run_command(command, check_rc=True, encoding=None) if keyfile: # using gpg we must write keyfile ourselves if len(stdout) == 0: self.module.fail_json(msg='Unable to get required signing key', rc=rc, stderr=stderr, command=command) try: with open(keyfile, 'wb') as f: f.write(stdout) self.module.log('Added repo key "%s" for apt to file "%s"' % (info['signing_key_fingerprint'], keyfile)) except (OSError, IOError) as e: self.module.fail_json(msg='Unable to add required signing key for%s ', rc=rc, stderr=stderr, error=to_native(e)) # apt source file file = file or self._suggest_filename('%s_%s' % (line, self.codename)) else: source = self._parse(line, raise_if_invalid_or_disabled=True)[2] file = file or self._suggest_filename(source) self._add_valid_source(source, comment, file) def remove_source(self, line): if line.startswith('ppa:'): source = self._expand_ppa(line)[0] else: source = self._parse(line, raise_if_invalid_or_disabled=True)[2] self._remove_valid_source(source) @property def repos_urls(self): _repositories = [] for parsed_repos in self.files.values(): for parsed_repo in parsed_repos: valid = parsed_repo[1] enabled = parsed_repo[2] source_line = parsed_repo[3] if not valid or not enabled: continue if source_line.startswith('ppa:'): source, ppa_owner, ppa_name = self._expand_ppa(source_line) _repositories.append(source) else: _repositories.append(source_line) return _repositories def revert_sources_list(sources_before, sources_after, sourceslist_before): '''Revert the sourcelist files to their previous state.''' # First remove any new files that were created: for filename in set(sources_after.keys()).difference(sources_before.keys()): if os.path.exists(filename): os.remove(filename) # Now revert the existing files to their former state: sourceslist_before.save() def main(): module = AnsibleModule( argument_spec=dict( repo=dict(type='str', required=True), state=dict(type='str', default='present', choices=['absent', 'present']), mode=dict(type='raw'), update_cache=dict(type='bool', default=True, aliases=['update-cache']), update_cache_retries=dict(type='int', default=5), update_cache_retry_max_delay=dict(type='int', default=12), filename=dict(type='str'), # This should not be needed, but exists as a failsafe install_python_apt=dict(type='bool', default=True), validate_certs=dict(type='bool', default=True), codename=dict(type='str'), ), supports_check_mode=True, ) params = module.params repo = module.params['repo'] state = module.params['state'] update_cache = module.params['update_cache'] # Note: mode is referenced in SourcesList class via the passed in module (self here) sourceslist = None if not HAVE_PYTHON_APT: # This interpreter can't see the apt Python library- we'll do the following to try and fix that: # 1) look in common locations for system-owned interpreters that can see it; if we find one, respawn under it # 2) finding none, try to install a matching python-apt package for the current interpreter version; # we limit to the current interpreter version to try and avoid installing a whole other Python just # for apt support # 3) if we installed a support package, try to respawn under what we think is the right interpreter (could be # the current interpreter again, but we'll let it respawn anyway for simplicity) # 4) if still not working, return an error and give up (some corner cases not covered, but this shouldn't be # made any more complex than it already is to try and cover more, eg, custom interpreters taking over # system locations) apt_pkg_name = 'python3-apt' if PY3 else 'python-apt' if has_respawned(): # this shouldn't be possible; short-circuit early if it happens... module.fail_json(msg="{0} must be installed and visible from {1}.".format(apt_pkg_name, sys.executable)) interpreters = ['/usr/bin/python3', '/usr/bin/python2', '/usr/bin/python'] interpreter = probe_interpreters_for_module(interpreters, 'apt') if interpreter: # found the Python bindings; respawn this module under the interpreter where we found them respawn_module(interpreter) # this is the end of the line for this process, it will exit here once the respawned module has completed # don't make changes if we're in check_mode if module.check_mode: module.fail_json(msg="%s must be installed to use check mode. " "If run normally this module can auto-install it." % apt_pkg_name) if params['install_python_apt']: install_python_apt(module, apt_pkg_name) else: module.fail_json(msg='%s is not installed, and install_python_apt is False' % apt_pkg_name) # try again to find the bindings in common places interpreter = probe_interpreters_for_module(interpreters, 'apt') if interpreter: # found the Python bindings; respawn this module under the interpreter where we found them # NB: respawn is somewhat wasteful if it's this interpreter, but simplifies the code respawn_module(interpreter) # this is the end of the line for this process, it will exit here once the respawned module has completed else: # we've done all we can do; just tell the user it's busted and get out module.fail_json(msg="{0} must be installed and visible from {1}.".format(apt_pkg_name, sys.executable)) if not repo: module.fail_json(msg='Please set argument \'repo\' to a non-empty value') if isinstance(distro, aptsources_distro.Distribution): sourceslist = UbuntuSourcesList(module) else: module.fail_json(msg='Module apt_repository is not supported on target.') sourceslist_before = copy.deepcopy(sourceslist) sources_before = sourceslist.dump() try: if state == 'present': sourceslist.add_source(repo) elif state == 'absent': sourceslist.remove_source(repo) except InvalidSource as ex: module.fail_json(msg='Invalid repository string: %s' % to_native(ex)) sources_after = sourceslist.dump() changed = sources_before != sources_after if changed and module._diff: diff = [] for filename in set(sources_before.keys()).union(sources_after.keys()): diff.append({'before': sources_before.get(filename, ''), 'after': sources_after.get(filename, ''), 'before_header': (filename, '/dev/null')[filename not in sources_before], 'after_header': (filename, '/dev/null')[filename not in sources_after]}) else: diff = {} if changed and not module.check_mode: try: sourceslist.save() if update_cache: err = '' update_cache_retries = module.params.get('update_cache_retries') update_cache_retry_max_delay = module.params.get('update_cache_retry_max_delay') randomize = random.randint(0, 1000) / 1000.0 for retry in range(update_cache_retries): try: cache = apt.Cache() cache.update() break except apt.cache.FetchFailedException as e: err = to_native(e) # Use exponential backoff with a max fail count, plus a little bit of randomness delay = 2 ** retry + randomize if delay > update_cache_retry_max_delay: delay = update_cache_retry_max_delay + randomize time.sleep(delay) else: revert_sources_list(sources_before, sources_after, sourceslist_before) module.fail_json(msg='Failed to update apt cache: %s' % (err if err else 'unknown reason')) except (OSError, IOError) as ex: revert_sources_list(sources_before, sources_after, sourceslist_before) module.fail_json(msg=to_native(ex)) module.exit_json(changed=changed, repo=repo, state=state, diff=diff) if __name__ == '__main__': main()