summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/general/plugins/modules/pagerduty_user.py
blob: eb8a3095626b61762d5878ffeff880586ed9191a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright (c) 2020, Zainab Alsaffar <Zainab.Alsaffar@mail.rit.edu>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type

DOCUMENTATION = r'''
---
module: pagerduty_user
short_description: Manage a user account on PagerDuty
description:
    - This module manages the creation/removal of a user account on PagerDuty.
version_added: '1.3.0'
author: Zainab Alsaffar (@zanssa)
requirements:
    - pdpyras python module = 4.1.1
    - PagerDuty API Access
extends_documentation_fragment:
    - community.general.attributes
attributes:
    check_mode:
        support: full
    diff_mode:
        support: none
options:
    access_token:
        description:
            - An API access token to authenticate with the PagerDuty REST API.
        required: true
        type: str
    pd_user:
        description:
            - Name of the user in PagerDuty.
        required: true
        type: str
    pd_email:
        description:
            - The user's email address.
            - O(pd_email) is the unique identifier used and cannot be updated using this module.
        required: true
        type: str
    pd_role:
        description:
            - The user's role.
        choices: ['global_admin', 'manager', 'responder', 'observer', 'stakeholder', 'limited_stakeholder', 'restricted_access']
        default: 'responder'
        type: str
    state:
        description:
            - State of the user.
            - On V(present), it creates a user if the user doesn't exist.
            - On V(absent), it removes a user if the account exists.
        choices: ['present', 'absent']
        default: 'present'
        type: str
    pd_teams:
        description:
            - The teams to which the user belongs.
            - Required if O(state=present).
        type: list
        elements: str
'''

EXAMPLES = r'''
- name: Create a user account on PagerDuty
  community.general.pagerduty_user:
    access_token: 'Your_Access_token'
    pd_user: user_full_name
    pd_email: user_email
    pd_role: user_pd_role
    pd_teams: user_pd_teams
    state: "present"

- name: Remove a user account from PagerDuty
  community.general.pagerduty_user:
    access_token: 'Your_Access_token'
    pd_user: user_full_name
    pd_email: user_email
    state: "absent"
'''

RETURN = r''' # '''

from os import path
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils import deps

with deps.declare("pdpyras", url="https://github.com/PagerDuty/pdpyras"):
    from pdpyras import APISession, PDClientError


class PagerDutyUser(object):
    def __init__(self, module, session):
        self._module = module
        self._apisession = session

    # check if the user exists
    def does_user_exist(self, pd_email):
        for user in self._apisession.iter_all('users'):
            if user['email'] == pd_email:
                return user['id']

    # create a user account on PD
    def add_pd_user(self, pd_name, pd_email, pd_role):
        try:
            user = self._apisession.persist('users', 'email', {
                "name": pd_name,
                "email": pd_email,
                "type": "user",
                "role": pd_role,
            })
            return user

        except PDClientError as e:
            if e.response.status_code == 400:
                self._module.fail_json(
                    msg="Failed to add %s due to invalid argument" % (pd_name))
            if e.response.status_code == 401:
                self._module.fail_json(msg="Failed to add %s due to invalid API key" % (pd_name))
            if e.response.status_code == 402:
                self._module.fail_json(
                    msg="Failed to add %s due to inability to perform the action within the API token" % (pd_name))
            if e.response.status_code == 403:
                self._module.fail_json(
                    msg="Failed to add %s due to inability to review the requested resource within the API token" % (pd_name))
            if e.response.status_code == 429:
                self._module.fail_json(
                    msg="Failed to add %s due to reaching the limit of making requests" % (pd_name))

    # delete a user account from PD
    def delete_user(self, pd_user_id, pd_name):
        try:
            user_path = path.join('/users/', pd_user_id)
            self._apisession.rdelete(user_path)

        except PDClientError as e:
            if e.response.status_code == 404:
                self._module.fail_json(
                    msg="Failed to remove %s as user was not found" % (pd_name))
            if e.response.status_code == 403:
                self._module.fail_json(
                    msg="Failed to remove %s due to inability to review the requested resource within the API token" % (pd_name))
            if e.response.status_code == 401:
                # print out the list of incidents
                pd_incidents = self.get_incidents_assigned_to_user(pd_user_id)
                self._module.fail_json(msg="Failed to remove %s as user has assigned incidents %s" % (pd_name, pd_incidents))
            if e.response.status_code == 429:
                self._module.fail_json(
                    msg="Failed to remove %s due to reaching the limit of making requests" % (pd_name))

    # get incidents assigned to a user
    def get_incidents_assigned_to_user(self, pd_user_id):
        incident_info = {}
        incidents = self._apisession.list_all('incidents', params={'user_ids[]': [pd_user_id]})

        for incident in incidents:
            incident_info = {
                'title': incident['title'],
                'key': incident['incident_key'],
                'status': incident['status']
            }
        return incident_info

    # add a user to a team/teams
    def add_user_to_teams(self, pd_user_id, pd_teams, pd_role):
        updated_team = None
        for team in pd_teams:
            team_info = self._apisession.find('teams', team, attribute='name')
            if team_info is not None:
                try:
                    updated_team = self._apisession.rput('/teams/' + team_info['id'] + '/users/' + pd_user_id, json={
                        'role': pd_role
                    })
                except PDClientError:
                    updated_team = None
        return updated_team


def main():
    module = AnsibleModule(
        argument_spec=dict(
            access_token=dict(type='str', required=True, no_log=True),
            pd_user=dict(type='str', required=True),
            pd_email=dict(type='str', required=True),
            state=dict(type='str', default='present', choices=['present', 'absent']),
            pd_role=dict(type='str', default='responder',
                         choices=['global_admin', 'manager', 'responder', 'observer', 'stakeholder', 'limited_stakeholder', 'restricted_access']),
            pd_teams=dict(type='list', elements='str', required=False)),
        required_if=[['state', 'present', ['pd_teams']], ],
        supports_check_mode=True,
    )

    deps.validate(module)

    access_token = module.params['access_token']
    pd_user = module.params['pd_user']
    pd_email = module.params['pd_email']
    state = module.params['state']
    pd_role = module.params['pd_role']
    pd_teams = module.params['pd_teams']

    if pd_role:
        pd_role_gui_value = {
            'global_admin': 'admin',
            'manager': 'user',
            'responder': 'limited_user',
            'observer': 'observer',
            'stakeholder': 'read_only_user',
            'limited_stakeholder': 'read_only_limited_user',
            'restricted_access': 'restricted_access'
        }
        pd_role = pd_role_gui_value[pd_role]

    # authenticate with PD API
    try:
        session = APISession(access_token)
    except PDClientError as e:
        module.fail_json(msg="Failed to authenticate with PagerDuty: %s" % e)

    user = PagerDutyUser(module, session)

    user_exists = user.does_user_exist(pd_email)

    if user_exists:
        if state == "absent":
            # remove user
            if not module.check_mode:
                user.delete_user(user_exists, pd_user)
            module.exit_json(changed=True, result="Successfully deleted user %s" % pd_user)
        else:
            module.exit_json(changed=False, result="User %s already exists." % pd_user)

        # in case that the user does not exist
    else:
        if state == "absent":
            module.exit_json(changed=False, result="User %s was not found." % pd_user)

        else:
            # add user, adds user with the default notification rule and contact info (email)
            if not module.check_mode:
                user.add_pd_user(pd_user, pd_email, pd_role)
                # get user's id
                pd_user_id = user.does_user_exist(pd_email)
                # add a user to the team/s
                user.add_user_to_teams(pd_user_id, pd_teams, pd_role)
            module.exit_json(changed=True, result="Successfully created & added user %s to team %s" % (pd_user, pd_teams))


if __name__ == "__main__":
    main()