summaryrefslogtreecommitdiffstats
path: root/ansible_collections/cisco/ise/plugins/action/internal_user_info.py
blob: 5f73153917877205bb46193abf9cfb6230aa4f27 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2021, Cisco Systems
# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type
from ansible.plugins.action import ActionBase

try:
    from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
        AnsibleArgSpecValidator,
    )
except ImportError:
    ANSIBLE_UTILS_IS_INSTALLED = False
else:
    ANSIBLE_UTILS_IS_INSTALLED = True
from ansible.errors import AnsibleActionFail
from ansible_collections.cisco.ise.plugins.plugin_utils.ise import (
    ISESDK,
    ise_argument_spec,
)

# Get common arguements specification
argument_spec = ise_argument_spec()
# Add arguments specific for this module
argument_spec.update(dict(
    name=dict(type="str"),
    id=dict(type="str"),
    page=dict(type="int"),
    size=dict(type="int"),
    sortasc=dict(type="str"),
    sortdsc=dict(type="str"),
    filter=dict(type="list"),
    filterType=dict(type="str"),
))

required_if = []
required_one_of = []
mutually_exclusive = []
required_together = []


class ActionModule(ActionBase):
    def __init__(self, *args, **kwargs):
        if not ANSIBLE_UTILS_IS_INSTALLED:
            raise AnsibleActionFail("ansible.utils is not installed. Execute 'ansible-galaxy collection install ansible.utils'")
        super(ActionModule, self).__init__(*args, **kwargs)
        self._supports_async = False
        self._supports_check_mode = True
        self._result = None

    # Checks the supplied parameters against the argument spec for this module
    def _check_argspec(self):
        aav = AnsibleArgSpecValidator(
            data=self._task.args,
            schema=dict(argument_spec=argument_spec),
            schema_format="argspec",
            schema_conditionals=dict(
                required_if=required_if,
                required_one_of=required_one_of,
                mutually_exclusive=mutually_exclusive,
                required_together=required_together,
            ),
            name=self._task.action,
        )
        valid, errors, self._task.args = aav.validate()
        if not valid:
            raise AnsibleActionFail(errors)

    def get_object(self, params):
        new_object = dict(
            name=params.get("name"),
            id=params.get("id"),
            page=params.get("page"),
            size=params.get("size"),
            sortasc=params.get("sortasc"),
            sortdsc=params.get("sortdsc"),
            filter=params.get("filter"),
            filter_type=params.get("filterType"),
        )
        return new_object

    def run(self, tmp=None, task_vars=None):
        self._task.diff = False
        self._result = super(ActionModule, self).run(tmp, task_vars)
        self._result["changed"] = False
        self._check_argspec()

        self._result.update(dict(ise_response=[]))

        ise = ISESDK(params=self._task.args)

        id = self._task.args.get("id")
        name = self._task.args.get("name")
        if id:
            response = ise.exec(
                family="internal_user",
                function='get_internal_user_by_id',
                params=self.get_object(self._task.args)
            ).response['InternalUser']
            self._result.update(dict(ise_response=response))
            self._result.update(ise.exit_json())
            return self._result
        if name:
            response = ise.exec(
                family="internal_user",
                function='get_internal_user_by_name',
                params=self.get_object(self._task.args)
            ).response['InternalUser']
            self._result.update(dict(ise_response=response))
            self._result.update(ise.exit_json())
            return self._result
        if not name and not id:
            responses = []
            generator = ise.exec(
                family="internal_user",
                function='get_internal_user_generator',
                params=self.get_object(self._task.args),
            )
            try:
                for item in generator:
                    tmp_response = item.response['SearchResult']['resources']
                    if isinstance(tmp_response, list):
                        responses += tmp_response
                    else:
                        responses.append(tmp_response)
                response = responses
            except (TypeError, AttributeError) as e:
                ise.fail_json(
                    msg=(
                        "An error occured when executing operation."
                        " Check the configuration of your API Settings and API Gateway settings on your ISE server."
                        " This collection assumes that the API Gateway, the ERS APIs and OpenAPIs are enabled."
                        " You may want to enable the (ise_debug: True) argument."
                        " The error was: {error}"
                    ).format(error=e)
                )
            except Exception as e:
                ise.fail_json(
                    msg=(
                        "An error occured when executing operation."
                        " The error was: {error}"
                        " You may want to enable the (ise_debug: True) argument."
                    ).format(error=e)
                )

            self._result.update(dict(ise_response=response))
            self._result.update(ise.exit_json())
            return self._result