summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/general/plugins/modules/snap.py
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/community/general/plugins/modules/snap.py')
-rw-r--r--ansible_collections/community/general/plugins/modules/snap.py254
1 files changed, 173 insertions, 81 deletions
diff --git a/ansible_collections/community/general/plugins/modules/snap.py b/ansible_collections/community/general/plugins/modules/snap.py
index 4b798d6e2..fd1676480 100644
--- a/ansible_collections/community/general/plugins/modules/snap.py
+++ b/ansible_collections/community/general/plugins/modules/snap.py
@@ -17,7 +17,7 @@ DOCUMENTATION = '''
module: snap
short_description: Manages snaps
description:
- - "Manages snaps packages."
+ - Manages snaps packages.
extends_documentation_fragment:
- community.general.attributes
attributes:
@@ -28,14 +28,20 @@ attributes:
options:
name:
description:
- - Name of the snaps.
+ - Name of the snaps to be installed.
+ - Any named snap accepted by the C(snap) command is valid.
+ - >
+ Notice that snap files might require O(dangerous=true) to ignore the error
+ "cannot find signatures with metadata for snap".
required: true
type: list
elements: str
state:
description:
- Desired state of the package.
- required: false
+ - >
+ When O(state=present) the module will use C(snap install) if the snap is not installed,
+ and C(snap refresh) if it is installed but from a different channel.
default: present
choices: [ absent, present, enabled, disabled ]
type: str
@@ -43,7 +49,7 @@ options:
description:
- Confinement policy. The classic confinement allows a snap to have
the same level of access to the system as "classic" packages,
- like those managed by APT. This option corresponds to the --classic argument.
+ like those managed by APT. This option corresponds to the C(--classic) argument.
This option can only be specified if there is a single snap in the task.
type: bool
required: false
@@ -52,18 +58,29 @@ options:
description:
- Define which release of a snap is installed and tracked for updates.
This option can only be specified if there is a single snap in the task.
+ - If not passed, the C(snap) command will default to V(stable).
+ - If the value passed does not contain the C(track), it will default to C(latest).
+ For example, if V(edge) is passed, the module will assume the channel to be V(latest/edge).
+ - See U(https://snapcraft.io/docs/channels) for more details about snap channels.
type: str
required: false
- default: stable
options:
description:
- Set options with pattern C(key=value) or C(snap:key=value). If a snap name is given, the option will be applied
- to that snap only. If the snap name is omitted, the options will be applied to all snaps listed in I(name). Options will
+ to that snap only. If the snap name is omitted, the options will be applied to all snaps listed in O(name). Options will
only be applied to active snaps.
required: false
type: list
elements: str
version_added: 4.4.0
+ dangerous:
+ description:
+ - Install the given snap file even if there are no pre-acknowledged signatures for it,
+ meaning it was not verified and could be dangerous.
+ type: bool
+ required: false
+ default: false
+ version_added: 7.2.0
author:
- Victor Carceler (@vcarceler) <vcarceler@iespuigcastellar.xeill.net>
@@ -154,51 +171,29 @@ import numbers
from ansible.module_utils.common.text.converters import to_native
-from ansible_collections.community.general.plugins.module_utils.module_helper import (
- CmdStateModuleHelper, ArgFormat
-)
+from ansible_collections.community.general.plugins.module_utils.module_helper import StateModuleHelper
+from ansible_collections.community.general.plugins.module_utils.snap import snap_runner
-__state_map = dict(
- present='install',
- absent='remove',
- enabled='enable',
- disabled='disable',
- info='info', # not public
- list='list', # not public
- set='set', # not public
- get='get', # not public
-)
+class Snap(StateModuleHelper):
+ NOT_INSTALLED = 0
+ CHANNEL_MISMATCH = 1
+ INSTALLED = 2
-
-def _state_map(value):
- return [__state_map[value]]
-
-
-class Snap(CmdStateModuleHelper):
__disable_re = re.compile(r'(?:\S+\s+){5}(?P<notes>\S+)')
__set_param_re = re.compile(r'(?P<snap_prefix>\S+:)?(?P<key>\S+)\s*=\s*(?P<value>.+)')
+ __list_re = re.compile(r'^(?P<name>\S+)\s+\S+\s+\S+\s+(?P<channel>\S+)')
module = dict(
argument_spec={
'name': dict(type='list', elements='str', required=True),
- 'state': dict(type='str', default='present',
- choices=['absent', 'present', 'enabled', 'disabled']),
+ 'state': dict(type='str', default='present', choices=['absent', 'present', 'enabled', 'disabled']),
'classic': dict(type='bool', default=False),
- 'channel': dict(type='str', default='stable'),
+ 'channel': dict(type='str'),
'options': dict(type='list', elements='str'),
+ 'dangerous': dict(type='bool', default=False),
},
supports_check_mode=True,
)
- command = "snap"
- command_args_formats = dict(
- actionable_snaps=dict(fmt=lambda v: v),
- state=dict(fmt=_state_map),
- classic=dict(fmt="--classic", style=ArgFormat.BOOLEAN),
- channel=dict(fmt=lambda v: [] if v == 'stable' else ['--channel', '{0}'.format(v)]),
- options=dict(fmt=list),
- json_format=dict(fmt="-d", style=ArgFormat.BOOLEAN),
- )
- check_rc = False
@staticmethod
def _first_non_zero(a):
@@ -208,19 +203,63 @@ class Snap(CmdStateModuleHelper):
return 0
- def _run_multiple_commands(self, commands):
- outputs = [(c,) + self.run_command(params=c) for c in commands]
- results = ([], [], [], [])
- for output in outputs:
- for i in range(4):
- results[i].append(output[i])
-
- return [
- '; '.join([to_native(x) for x in results[0]]),
- self._first_non_zero(results[1]),
- '\n'.join(results[2]),
- '\n'.join(results[3]),
- ]
+ def __init_module__(self):
+ self.runner = snap_runner(self.module)
+ # if state=present there might be file names passed in 'name', in
+ # which case they must be converted to their actual snap names, which
+ # is done using the names_from_snaps() method calling 'snap info'.
+ self.vars.set("snapinfo_run_info", [], output=(self.verbosity >= 4))
+ self.vars.set("status_run_info", [], output=(self.verbosity >= 4))
+ self.vars.set("status_out", None, output=(self.verbosity >= 4))
+ self.vars.set("run_info", [], output=(self.verbosity >= 4))
+
+ if self.vars.state == "present":
+ self.vars.set("snap_names", self.names_from_snaps(self.vars.name))
+ status_var = "snap_names"
+ else:
+ status_var = "name"
+ self.vars.set("status_var", status_var, output=False)
+ self.vars.set("snap_status", self.snap_status(self.vars[self.vars.status_var], self.vars.channel), output=False, change=True)
+ self.vars.set("snap_status_map", dict(zip(self.vars.name, self.vars.snap_status)), output=False, change=True)
+
+ def __quit_module__(self):
+ self.vars.snap_status = self.snap_status(self.vars[self.vars.status_var], self.vars.channel)
+ if self.vars.channel is None:
+ self.vars.channel = "stable"
+
+ def _run_multiple_commands(self, commands, actionable_names, bundle=True, refresh=False):
+ results_cmd = []
+ results_rc = []
+ results_out = []
+ results_err = []
+ results_run_info = []
+
+ state = "refresh" if refresh else self.vars.state
+
+ with self.runner(commands + ["name"]) as ctx:
+ if bundle:
+ rc, out, err = ctx.run(state=state, name=actionable_names)
+ results_cmd.append(commands + actionable_names)
+ results_rc.append(rc)
+ results_out.append(out.strip())
+ results_err.append(err.strip())
+ results_run_info.append(ctx.run_info)
+ else:
+ for name in actionable_names:
+ rc, out, err = ctx.run(state=state, name=name)
+ results_cmd.append(commands + [name])
+ results_rc.append(rc)
+ results_out.append(out.strip())
+ results_err.append(err.strip())
+ results_run_info.append(ctx.run_info)
+
+ return (
+ '; '.join([to_native(x) for x in results_cmd]),
+ self._first_non_zero(results_rc),
+ '\n'.join(results_out),
+ '\n'.join(results_err),
+ results_run_info,
+ )
def convert_json_subtree_to_map(self, json_subtree, prefix=None):
option_map = {}
@@ -234,7 +273,6 @@ class Snap(CmdStateModuleHelper):
if isinstance(value, (str, float, bool, numbers.Integral)):
option_map[full_key] = str(value)
-
else:
option_map.update(self.convert_json_subtree_to_map(json_subtree=value, prefix=full_key))
@@ -245,8 +283,8 @@ class Snap(CmdStateModuleHelper):
return self.convert_json_subtree_to_map(json_object)
def retrieve_option_map(self, snap_name):
- params = [{'state': 'get'}, {'name': snap_name}, {'json_format': True}]
- rc, out, err = self.run_command(params=params)
+ with self.runner("get name") as ctx:
+ rc, out, err = ctx.run(name=snap_name)
if rc != 0:
return {}
@@ -258,18 +296,73 @@ class Snap(CmdStateModuleHelper):
try:
option_map = self.convert_json_to_map(out)
-
+ return option_map
except Exception as e:
self.do_raise(
msg="Parsing option map returned by 'snap get {0}' triggers exception '{1}', output:\n'{2}'".format(snap_name, str(e), out))
- return option_map
+ def names_from_snaps(self, snaps):
+ def process_one(rc, out, err):
+ res = [line for line in out.split("\n") if line.startswith("name:")]
+ name = res[0].split()[1]
+ return [name]
+
+ def process_many(rc, out, err):
+ # This needs to be "\n---" instead of just "---" because otherwise
+ # if a snap uses "---" in its description then that will incorrectly
+ # be interpreted as a separator between snaps in the output.
+ outputs = out.split("\n---")
+ res = []
+ for sout in outputs:
+ res.extend(process_one(rc, sout, ""))
+ return res
+
+ def process(rc, out, err):
+ if len(snaps) == 1:
+ check_error = err
+ process_ = process_one
+ else:
+ check_error = out
+ process_ = process_many
+
+ if "warning: no snap found" in check_error:
+ self.do_raise("Snaps not found: {0}.".format([x.split()[-1]
+ for x in out.split('\n')
+ if x.startswith("warning: no snap found")]))
+ return process_(rc, out, err)
+
+ names = []
+ if snaps:
+ with self.runner("info name", output_process=process) as ctx:
+ try:
+ names = ctx.run(name=snaps)
+ finally:
+ self.vars.snapinfo_run_info.append(ctx.run_info)
+ return names
+
+ def snap_status(self, snap_name, channel):
+ def _status_check(name, channel, installed):
+ match = [c for n, c in installed if n == name]
+ if not match:
+ return Snap.NOT_INSTALLED
+ if channel and match[0] not in (channel, "latest/{0}".format(channel)):
+ return Snap.CHANNEL_MISMATCH
+ else:
+ return Snap.INSTALLED
+
+ with self.runner("_list") as ctx:
+ rc, out, err = ctx.run(check_rc=True)
+ list_out = out.split('\n')[1:]
+ list_out = [self.__list_re.match(x) for x in list_out]
+ list_out = [(m.group('name'), m.group('channel')) for m in list_out if m]
+ self.vars.status_out = list_out
+ self.vars.status_run_info = ctx.run_info
- def is_snap_installed(self, snap_name):
- return 0 == self.run_command(params=[{'state': 'list'}, {'name': snap_name}])[0]
+ return [_status_check(n, channel, list_out) for n in snap_name]
def is_snap_enabled(self, snap_name):
- rc, out, err = self.run_command(params=[{'state': 'list'}, {'name': snap_name}])
+ with self.runner("_list name") as ctx:
+ rc, out, err = ctx.run(name=snap_name)
if rc != 0:
return None
result = out.splitlines()[1]
@@ -279,22 +372,22 @@ class Snap(CmdStateModuleHelper):
notes = match.group('notes')
return "disabled" not in notes.split(',')
- def process_actionable_snaps(self, actionable_snaps):
+ def _present(self, actionable_snaps, refresh=False):
self.changed = True
self.vars.snaps_installed = actionable_snaps
- if self.module.check_mode:
+ if self.check_mode:
return
- params = ['state', 'classic', 'channel'] # get base cmd parts
+ params = ['state', 'classic', 'channel', 'dangerous'] # get base cmd parts
has_one_pkg_params = bool(self.vars.classic) or self.vars.channel != 'stable'
has_multiple_snaps = len(actionable_snaps) > 1
if has_one_pkg_params and has_multiple_snaps:
- commands = [params + [{'actionable_snaps': [s]}] for s in actionable_snaps]
+ self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps, bundle=False, refresh=refresh)
else:
- commands = [params + [{'actionable_snaps': actionable_snaps}]]
- self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
+ self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps, refresh=refresh)
+ self.vars.run_info = run_info
if rc == 0:
return
@@ -314,10 +407,13 @@ class Snap(CmdStateModuleHelper):
self.vars.meta('classic').set(output=True)
self.vars.meta('channel').set(output=True)
- actionable_snaps = [s for s in self.vars.name if not self.is_snap_installed(s)]
- if actionable_snaps:
- self.process_actionable_snaps(actionable_snaps)
+ actionable_refresh = [snap for snap in self.vars.name if self.vars.snap_status_map[snap] == Snap.CHANNEL_MISMATCH]
+ if actionable_refresh:
+ self._present(actionable_refresh, refresh=True)
+ actionable_install = [snap for snap in self.vars.name if self.vars.snap_status_map[snap] == Snap.NOT_INSTALLED]
+ if actionable_install:
+ self._present(actionable_install)
self.set_options()
@@ -325,7 +421,7 @@ class Snap(CmdStateModuleHelper):
if self.vars.options is None:
return
- actionable_snaps = [s for s in self.vars.name if self.is_snap_installed(s)]
+ actionable_snaps = [s for s in self.vars.name if self.vars.snap_status_map[s] != Snap.NOT_INSTALLED]
overall_options_changed = []
for snap_name in actionable_snaps:
@@ -360,11 +456,9 @@ class Snap(CmdStateModuleHelper):
if options_changed:
self.changed = True
- if not self.module.check_mode:
- params = [{'state': 'set'}, {'name': snap_name}, {'options': options_changed}]
-
- rc, out, err = self.run_command(params=params)
-
+ if not self.check_mode:
+ with self.runner("_set name options") as ctx:
+ rc, out, err = ctx.run(name=snap_name, options=options_changed)
if rc != 0:
if 'has no "configure" hook' in err:
msg = "Snap '{snap}' does not have any configurable options".format(snap=snap_name)
@@ -377,18 +471,16 @@ class Snap(CmdStateModuleHelper):
if overall_options_changed:
self.vars.options_changed = overall_options_changed
- def _generic_state_action(self, actionable_func, actionable_var, params=None):
+ def _generic_state_action(self, actionable_func, actionable_var, params):
actionable_snaps = [s for s in self.vars.name if actionable_func(s)]
if not actionable_snaps:
return
self.changed = True
self.vars[actionable_var] = actionable_snaps
- if self.module.check_mode:
+ if self.check_mode:
return
- if params is None:
- params = ['state']
- commands = [params + [{'actionable_snaps': actionable_snaps}]]
- self.vars.cmd, rc, out, err = self._run_multiple_commands(commands)
+ self.vars.cmd, rc, out, err, run_info = self._run_multiple_commands(params, actionable_snaps)
+ self.vars.run_info = run_info
if rc == 0:
return
msg = "Ooops! Snap operation failed while executing '{cmd}', please examine logs and " \
@@ -396,7 +488,7 @@ class Snap(CmdStateModuleHelper):
self.do_raise(msg=msg)
def state_absent(self):
- self._generic_state_action(self.is_snap_installed, "snaps_removed", ['classic', 'channel', 'state'])
+ self._generic_state_action(lambda s: self.vars.snap_status_map[s] != Snap.NOT_INSTALLED, "snaps_removed", ['classic', 'channel', 'state'])
def state_enabled(self):
self._generic_state_action(lambda s: not self.is_snap_enabled(s), "snaps_enabled", ['classic', 'channel', 'state'])