1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2020, Zainab Alsaffar <Zainab.Alsaffar@mail.rit.edu>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
module: pagerduty_user
short_description: Manage a user account on PagerDuty
description:
- This module manages the creation/removal of a user account on PagerDuty.
version_added: '1.3.0'
author: Zainab Alsaffar (@zanssa)
requirements:
- pdpyras python module = 4.1.1
- PagerDuty API Access
extends_documentation_fragment:
- community.general.attributes
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
access_token:
description:
- An API access token to authenticate with the PagerDuty REST API.
required: true
type: str
pd_user:
description:
- Name of the user in PagerDuty.
required: true
type: str
pd_email:
description:
- The user's email address.
- O(pd_email) is the unique identifier used and cannot be updated using this module.
required: true
type: str
pd_role:
description:
- The user's role.
choices: ['global_admin', 'manager', 'responder', 'observer', 'stakeholder', 'limited_stakeholder', 'restricted_access']
default: 'responder'
type: str
state:
description:
- State of the user.
- On V(present), it creates a user if the user doesn't exist.
- On V(absent), it removes a user if the account exists.
choices: ['present', 'absent']
default: 'present'
type: str
pd_teams:
description:
- The teams to which the user belongs.
- Required if O(state=present).
type: list
elements: str
'''
EXAMPLES = r'''
- name: Create a user account on PagerDuty
community.general.pagerduty_user:
access_token: 'Your_Access_token'
pd_user: user_full_name
pd_email: user_email
pd_role: user_pd_role
pd_teams: user_pd_teams
state: "present"
- name: Remove a user account from PagerDuty
community.general.pagerduty_user:
access_token: 'Your_Access_token'
pd_user: user_full_name
pd_email: user_email
state: "absent"
'''
RETURN = r''' # '''
from os import path
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.general.plugins.module_utils import deps
with deps.declare("pdpyras", url="https://github.com/PagerDuty/pdpyras"):
from pdpyras import APISession, PDClientError
class PagerDutyUser(object):
def __init__(self, module, session):
self._module = module
self._apisession = session
# check if the user exists
def does_user_exist(self, pd_email):
for user in self._apisession.iter_all('users'):
if user['email'] == pd_email:
return user['id']
# create a user account on PD
def add_pd_user(self, pd_name, pd_email, pd_role):
try:
user = self._apisession.persist('users', 'email', {
"name": pd_name,
"email": pd_email,
"type": "user",
"role": pd_role,
})
return user
except PDClientError as e:
if e.response.status_code == 400:
self._module.fail_json(
msg="Failed to add %s due to invalid argument" % (pd_name))
if e.response.status_code == 401:
self._module.fail_json(msg="Failed to add %s due to invalid API key" % (pd_name))
if e.response.status_code == 402:
self._module.fail_json(
msg="Failed to add %s due to inability to perform the action within the API token" % (pd_name))
if e.response.status_code == 403:
self._module.fail_json(
msg="Failed to add %s due to inability to review the requested resource within the API token" % (pd_name))
if e.response.status_code == 429:
self._module.fail_json(
msg="Failed to add %s due to reaching the limit of making requests" % (pd_name))
# delete a user account from PD
def delete_user(self, pd_user_id, pd_name):
try:
user_path = path.join('/users/', pd_user_id)
self._apisession.rdelete(user_path)
except PDClientError as e:
if e.response.status_code == 404:
self._module.fail_json(
msg="Failed to remove %s as user was not found" % (pd_name))
if e.response.status_code == 403:
self._module.fail_json(
msg="Failed to remove %s due to inability to review the requested resource within the API token" % (pd_name))
if e.response.status_code == 401:
# print out the list of incidents
pd_incidents = self.get_incidents_assigned_to_user(pd_user_id)
self._module.fail_json(msg="Failed to remove %s as user has assigned incidents %s" % (pd_name, pd_incidents))
if e.response.status_code == 429:
self._module.fail_json(
msg="Failed to remove %s due to reaching the limit of making requests" % (pd_name))
# get incidents assigned to a user
def get_incidents_assigned_to_user(self, pd_user_id):
incident_info = {}
incidents = self._apisession.list_all('incidents', params={'user_ids[]': [pd_user_id]})
for incident in incidents:
incident_info = {
'title': incident['title'],
'key': incident['incident_key'],
'status': incident['status']
}
return incident_info
# add a user to a team/teams
def add_user_to_teams(self, pd_user_id, pd_teams, pd_role):
updated_team = None
for team in pd_teams:
team_info = self._apisession.find('teams', team, attribute='name')
if team_info is not None:
try:
updated_team = self._apisession.rput('/teams/' + team_info['id'] + '/users/' + pd_user_id, json={
'role': pd_role
})
except PDClientError:
updated_team = None
return updated_team
def main():
module = AnsibleModule(
argument_spec=dict(
access_token=dict(type='str', required=True, no_log=True),
pd_user=dict(type='str', required=True),
pd_email=dict(type='str', required=True),
state=dict(type='str', default='present', choices=['present', 'absent']),
pd_role=dict(type='str', default='responder',
choices=['global_admin', 'manager', 'responder', 'observer', 'stakeholder', 'limited_stakeholder', 'restricted_access']),
pd_teams=dict(type='list', elements='str', required=False)),
required_if=[['state', 'present', ['pd_teams']], ],
supports_check_mode=True,
)
deps.validate(module)
access_token = module.params['access_token']
pd_user = module.params['pd_user']
pd_email = module.params['pd_email']
state = module.params['state']
pd_role = module.params['pd_role']
pd_teams = module.params['pd_teams']
if pd_role:
pd_role_gui_value = {
'global_admin': 'admin',
'manager': 'user',
'responder': 'limited_user',
'observer': 'observer',
'stakeholder': 'read_only_user',
'limited_stakeholder': 'read_only_limited_user',
'restricted_access': 'restricted_access'
}
pd_role = pd_role_gui_value[pd_role]
# authenticate with PD API
try:
session = APISession(access_token)
except PDClientError as e:
module.fail_json(msg="Failed to authenticate with PagerDuty: %s" % e)
user = PagerDutyUser(module, session)
user_exists = user.does_user_exist(pd_email)
if user_exists:
if state == "absent":
# remove user
if not module.check_mode:
user.delete_user(user_exists, pd_user)
module.exit_json(changed=True, result="Successfully deleted user %s" % pd_user)
else:
module.exit_json(changed=False, result="User %s already exists." % pd_user)
# in case that the user does not exist
else:
if state == "absent":
module.exit_json(changed=False, result="User %s was not found." % pd_user)
else:
# add user, adds user with the default notification rule and contact info (email)
if not module.check_mode:
user.add_pd_user(pd_user, pd_email, pd_role)
# get user's id
pd_user_id = user.does_user_exist(pd_email)
# add a user to the team/s
user.add_user_to_teams(pd_user_id, pd_teams, pd_role)
module.exit_json(changed=True, result="Successfully created & added user %s to team %s" % (pd_user, pd_teams))
if __name__ == "__main__":
main()
|