diff options
Diffstat (limited to 'ansible_collections/community/general/plugins/modules/snap.py')
-rw-r--r-- | ansible_collections/community/general/plugins/modules/snap.py | 254 |
1 files changed, 173 insertions, 81 deletions
diff --git a/ansible_collections/community/general/plugins/modules/snap.py b/ansible_collections/community/general/plugins/modules/snap.py index 4b798d6e2..fd1676480 100644 --- a/ansible_collections/community/general/plugins/modules/snap.py +++ b/ansible_collections/community/general/plugins/modules/snap.py @@ -17,7 +17,7 @@ DOCUMENTATION = ''' module: snap short_description: Manages snaps description: - - "Manages snaps packages." + - Manages snaps packages. extends_documentation_fragment: - community.general.attributes attributes: @@ -28,14 +28,20 @@ attributes: options: name: description: - - Name of the snaps. + - Name of the snaps to be installed. + - Any named snap accepted by the C(snap) command is valid. + - > + Notice that snap files might require O(dangerous=true) to ignore the error + "cannot find signatures with metadata for snap". required: true type: list elements: str state: description: - Desired state of the package. - required: false + - > + When O(state=present) the module will use C(snap install) if the snap is not installed, + and C(snap refresh) if it is installed but from a different channel. default: present choices: [ absent, present, enabled, disabled ] type: str @@ -43,7 +49,7 @@ options: description: - Confinement policy. The classic confinement allows a snap to have the same level of access to the system as "classic" packages, - like those managed by APT. This option corresponds to the --classic argument. + like those managed by APT. This option corresponds to the C(--classic) argument. This option can only be specified if there is a single snap in the task. type: bool required: false @@ -52,18 +58,29 @@ options: description: - Define which release of a snap is installed and tracked for updates. This option can only be specified if there is a single snap in the task. + - If not passed, the C(snap) command will default to V(stable). + - If the value passed does not contain the C(track), it will default to C(latest). + For example, if V(edge) is passed, the module will assume the channel to be V(latest/edge). + - See U(https://snapcraft.io/docs/channels) for more details about snap channels. type: str required: false - default: stable options: description: - Set options with pattern C(key=value) or C(snap:key=value). If a snap name is given, the option will be applied - to that snap only. If the snap name is omitted, the options will be applied to all snaps listed in I(name). Options will + to that snap only. If the snap name is omitted, the options will be applied to all snaps listed in O(name). Options will only be applied to active snaps. required: false type: list elements: str version_added: 4.4.0 + dangerous: + description: + - Install the given snap file even if there are no pre-acknowledged signatures for it, + meaning it was not verified and could be dangerous. + type: bool + required: false + default: false + version_added: 7.2.0 author: - Victor Carceler (@vcarceler) <vcarceler@iespuigcastellar.xeill.net> @@ -154,51 +171,29 @@ import numbers from ansible.module_utils.common.text.converters import to_native -from ansible_collections.community.general.plugins.module_utils.module_helper import ( - CmdStateModuleHelper, ArgFormat -) +from ansible_collections.community.general.plugins.module_utils.module_helper import StateModuleHelper +from ansible_collections.community.general.plugins.module_utils.snap import snap_runner -__state_map = dict( - present='install', - absent='remove', - enabled='enable', - disabled='disable', - info='info', # not public - list='list', # not public - set='set', # not public - get='get', # not public -) +class Snap(StateModuleHelper): + NOT_INSTALLED = 0 + CHANNEL_MISMATCH = 1 + INSTALLED = 2 - -def _state_map(value): - return [__state_map[value]] - - -class Snap(CmdStateModuleHelper): __disable_re = re.compile(r'(?:\S+\s+){5}(?P<notes>\S+)') __set_param_re = re.compile(r'(?P<snap_prefix>\S+:)?(?P<key>\S+)\s*=\s*(?P<value>.+)') + __list_re = re.compile(r'^(?P<name>\S+)\s+\S+\s+\S+\s+(?P<channel>\S+)') module = dict( argument_spec={ 'name': dict(type='list', elements='str', required=True), - 'state': dict(type='str', default='present', - choices=['absent', 'present', 'enabled', 'disabled']), + 'state': dict(type='str', default='present', choices=['absent', 'present', 'enabled', 'disabled']), 'classic': dict(type='bool', default=False), - 'channel': dict(type='str', default='stable'), + 'channel': dict(type='str'), 'options': dict(type='list', elements='str'), + 'dangerous': dict(type='bool', default=False), }, supports_check_mode=True, ) - command = "snap" - command_args_formats = dict( - actionable_snaps=dict(fmt=lambda v: v), - state=dict(fmt=_state_map), - classic=dict(fmt="--classic", style=ArgFormat.BOOLEAN), - channel=dict(fmt=lambda v: [] if v == 'stable' else ['--channel', '{0}'.format(v)]), - options=dict(fmt=list), - json_format=dict(fmt="-d", style=ArgFormat.BOOLEAN), - ) - check_rc = False @staticmethod def _first_non_zero(a): @@ -208,19 +203,63 @@ class Snap(CmdStateModuleHelper): return 0 - def _run_multiple_commands(self, commands): - outputs = [(c,) + self.run_command(params=c) for c in commands] - results = ([], [], [], []) - for output in outputs: - for i in range(4): - results[i].append(output[i]) - - return [ - '; '.join([to_native(x) for x in results[0]]), - self._first_non_zero(results[1]), - '\n'.join(results[2]), - '\n'.join(results[3]), - ] + def __init_module__(self): + self.runner = snap_runner(self.module) + # if state=present there might be file names passed in 'name', in + # which case they must be converted to their actual snap names, which + # is done using the names_from_snaps() method calling 'snap info'. + self.vars.set("snapinfo_run_info", [], output=(self.verbosity >= 4)) + self.vars.set("status_run_info", [], output=(self.verbosity >= 4)) + self.vars.set("status_out", None, output=(self.verbosity >= 4)) + self.vars.set("run_info", [], output=(self.verbosity >= 4)) + + if self.vars.state == "present": + self.vars.set("snap_names", self.names_from_snaps(self.vars.name)) + status_var = "snap_names" + else: + status_var = "name" + self.vars.set("status_var", status_var, output=False) + self.vars.set("snap_status", self.snap_status(self.vars[self.vars.status_var], self.vars.channel), output=False, change=True) + self.vars.set("snap_status_map", dict(zip(self.vars.name, self.vars.snap_status)), output=False, change=True) + + def __quit_module__(self): + self.vars.snap_status = self.snap_status(self.vars[self.vars.status_var], self.vars.channel) + if self.vars.channel is None: + self.vars.channel = "stable" + + def _run_multiple_commands(self, commands, actionable_names, bundle=True, refresh=False): + results_cmd = [] + results_rc = [] + results_out = [] + results_err = [] + results_run_info = [] + + state = "refresh" if refresh else self.vars.state + + with self.runner(commands + ["name"]) as ctx: + if bundle: + rc, out, err = ctx.run(state=state, name=actionable_names) + results_cmd.append(commands + actionable_names) + results_rc.append(rc) + results_out.append(out.strip()) + results_err.append(err.strip()) + results_run_info.append(ctx.run_info) + else: + for name in actionable_names: + rc, out, err = ctx.run(state=state, name=name) + results_cmd.append(commands + [name]) + results_rc.append(rc) + results_out.append(out.strip()) + results_err.append(err.strip()) + results_run_info.append(ctx.run_info) + + return ( + '; '.join([to_native(x) for x in results_cmd]), + self._first_non_zero(results_rc), + '\n'.join(results_out), + '\n'.join(results_err), + results_run_info, + ) def convert_json_subtree_to_map(self, json_subtree, prefix=None): option_map = {} @@ -234,7 +273,6 @@ class Snap(CmdStateModuleHelper): if isinstance(value, (str, float, bool, numbers.Integral)): option_map[full_key] = str(value) - else: option_map.update(self.convert_json_subtree_to_map(json_subtree=value, prefix=full_key)) @@ -245,8 +283,8 @@ class Snap(CmdStateModuleHelper): return self.convert_json_subtree_to_map(json_object) def retrieve_option_map(self, snap_name): - params = [{'state': 'get'}, {'name': snap_name}, {'json_format': True}] - rc, out, err = self.run_command(params=params) + with self.runner("get name") as ctx: + rc, out, err = ctx.run(name=snap_name) if rc != 0: return {} @@ -258,18 +296,73 @@ class Snap(CmdStateModuleHelper): try: option_map = self.convert_json_to_map(out) - + return option_map except Exception as e: self.do_raise( msg="Parsing option map returned by 'snap get {0}' triggers exception '{1}', output:\n'{2}'".format(snap_name, str(e), out)) - return option_map + def names_from_snaps(self, snaps): + def process_one(rc, out, err): + res = [line for line in out.split("\n") if line.startswith("name:")] + name = res[0].split()[1] + return [name] + + def process_many(rc, out, err): + # This needs to be "\n---" instead of just "---" because otherwise + # if a snap uses "---" in its description then that will incorrectly + # be interpreted as a separator between snaps in the output. + outputs = out.split("\n---") + res = [] + for sout in outputs: + res.extend(process_one(rc, sout, "")) + return res + + def process(rc, out, err): + if len(snaps) == 1: + check_error = err + process_ = process_one + else: + check_error = out + process_ = process_many + + if "warning: no snap found" in check_error: + self.do_raise("Snaps not found: {0}.".format([x.split()[-1] + for x in out.split('\n') + if x.startswith("warning: no snap found")])) + return process_(rc, out, err) + + names = [] + if snaps: + with self.runner("info name", output_process=process) as ctx: + try: + names = ctx.run(name=snaps) + finally: + self.vars.snapinfo_run_info.append(ctx.run_info) + return names + + def snap_status(self, snap_name, channel): + def _status_check(name, channel, installed): + match = [c for n, c in installed if n == name] + if not match: + return Snap.NOT_INSTALLED + if channel and match[0] not in (channel, "latest/{0}".format(channel)): + return Snap.CHANNEL_MISMATCH + else: + return Snap.INSTALLED + + with self.runner("_list") as ctx: + rc, out, err = ctx.run(check_rc=True) + list_out = out.split('\n')[1:] + list_out = [self.__list_re.match(x) for x in list_out] + list_out = [(m.group('name'), m.group('channel')) for m in list_out if m] + self.vars.status_out = list_out + self.vars.status_run_info = ctx.run_info - def is_snap_installed(self, snap_name): - return 0 == self.run_command(params=[{'state': 'list'}, {'name': snap_name}])[0] + return [_status_check(n, channel, list_out) for n in snap_name] def is_snap_enabled(self, snap_name): - rc, out, err = self.run_command(params=[{'state': 'list'}, {'name': snap_name}]) + with self.runner("_list name") as ctx: + rc, out, err = ctx.run(name=snap_name) if rc != 0: return None result = out.splitlines()[1] @@ -279,22 +372,22 @@ class Snap(CmdStateModuleHelper): notes = match.group('notes') return "disabled" not in notes.split(',') - def process_actionable_snaps(self, actionable_snaps): + def _present(self, actionable_snaps, refresh=False): self.changed = True self.vars.snaps_installed = actionable_snaps - if self.module.check_mode: + if self.check_mode: return - params = ['state', 'classic', 'channel'] # get base cmd parts + params = ['state', 'classic', 'channel', 'dangerous'] # get base cmd parts has_one_pkg_params = bool(self.vars.classic) or self.vars.channel != 'stable' has_multiple_snaps = len(actionable_snaps) > 1 if has_one_pkg_params and has_multiple_snaps: - commands = [params + [{'actionable_snaps': [s]}] for s in actionable_snaps] + self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps, bundle=False, refresh=refresh) else: - commands = [params + [{'actionable_snaps': actionable_snaps}]] - self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps, refresh=refresh) + self.vars.run_info = run_info if rc == 0: return @@ -314,10 +407,13 @@ class Snap(CmdStateModuleHelper): self.vars.meta('classic').set(output=True) self.vars.meta('channel').set(output=True) - actionable_snaps = [s for s in self.vars.name if not self.is_snap_installed(s)] - if actionable_snaps: - self.process_actionable_snaps(actionable_snaps) + actionable_refresh = [snap for snap in self.vars.name if self.vars.snap_status_map[snap] == Snap.CHANNEL_MISMATCH] + if actionable_refresh: + self._present(actionable_refresh, refresh=True) + actionable_install = [snap for snap in self.vars.name if self.vars.snap_status_map[snap] == Snap.NOT_INSTALLED] + if actionable_install: + self._present(actionable_install) self.set_options() @@ -325,7 +421,7 @@ class Snap(CmdStateModuleHelper): if self.vars.options is None: return - actionable_snaps = [s for s in self.vars.name if self.is_snap_installed(s)] + actionable_snaps = [s for s in self.vars.name if self.vars.snap_status_map[s] != Snap.NOT_INSTALLED] overall_options_changed = [] for snap_name in actionable_snaps: @@ -360,11 +456,9 @@ class Snap(CmdStateModuleHelper): if options_changed: self.changed = True - if not self.module.check_mode: - params = [{'state': 'set'}, {'name': snap_name}, {'options': options_changed}] - - rc, out, err = self.run_command(params=params) - + if not self.check_mode: + with self.runner("_set name options") as ctx: + rc, out, err = ctx.run(name=snap_name, options=options_changed) if rc != 0: if 'has no "configure" hook' in err: msg = "Snap '{snap}' does not have any configurable options".format(snap=snap_name) @@ -377,18 +471,16 @@ class Snap(CmdStateModuleHelper): if overall_options_changed: self.vars.options_changed = overall_options_changed - def _generic_state_action(self, actionable_func, actionable_var, params=None): + def _generic_state_action(self, actionable_func, actionable_var, params): actionable_snaps = [s for s in self.vars.name if actionable_func(s)] if not actionable_snaps: return self.changed = True self.vars[actionable_var] = actionable_snaps - if self.module.check_mode: + if self.check_mode: return - if params is None: - params = ['state'] - commands = [params + [{'actionable_snaps': actionable_snaps}]] - self.vars.cmd, rc, out, err = self._run_multiple_commands(commands) + self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps) + self.vars.run_info = run_info if rc == 0: return msg = "Ooops! Snap operation failed while executing '{cmd}', please examine logs and " \ @@ -396,7 +488,7 @@ class Snap(CmdStateModuleHelper): self.do_raise(msg=msg) def state_absent(self): - self._generic_state_action(self.is_snap_installed, "snaps_removed", ['classic', 'channel', 'state']) + self._generic_state_action(lambda s: self.vars.snap_status_map[s] != Snap.NOT_INSTALLED, "snaps_removed", ['classic', 'channel', 'state']) def state_enabled(self): self._generic_state_action(lambda s: not self.is_snap_enabled(s), "snaps_enabled", ['classic', 'channel', 'state']) |