1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2016, Artem Feofanov <artem.feofanov@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
module: telegram
author:
- "Artem Feofanov (@tyouxa)"
- "Nikolai Lomov (@lomserman)"
short_description: Send notifications via telegram
description:
- Send notifications via telegram bot, to a verified group or user.
- Also, the user may try to use any other telegram bot API method, if you specify I(api_method) argument.
notes:
- You will require a telegram account and create telegram bot to use this module.
extends_documentation_fragment:
- community.general.attributes
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
token:
type: str
description:
- Token identifying your telegram bot.
required: true
api_method:
type: str
description:
- Bot API method.
- For reference, see U(https://core.telegram.org/bots/api).
default: SendMessage
version_added: 2.0.0
api_args:
type: dict
description:
- Any parameters for the method.
- For reference to default method, C(SendMessage), see U(https://core.telegram.org/bots/api#sendmessage).
version_added: 2.0.0
'''
EXAMPLES = """
- name: Send notify to Telegram
community.general.telegram:
token: '9999999:XXXXXXXXXXXXXXXXXXXXXXX'
api_args:
chat_id: 000000
parse_mode: "markdown"
text: "Your precious application has been deployed: https://example.com"
disable_web_page_preview: true
disable_notification: true
- name: Forward message to someone
community.general.telegram:
token: '9999999:XXXXXXXXXXXXXXXXXXXXXXX'
api_method: forwardMessage
api_args:
chat_id: 000000
from_chat_id: 111111
disable_notification: true
message_id: '{{ saved_msg_id }}'
"""
RETURN = """
msg:
description: The message you attempted to send
returned: success
type: str
sample: "Ansible task finished"
telegram_error:
description: Error message gotten from Telegram API
returned: failure
type: str
sample: "Bad Request: message text is empty"
"""
import json
from ansible.module_utils.basic import AnsibleModule
# noinspection PyUnresolvedReferences
from ansible.module_utils.six.moves.urllib.parse import quote
from ansible.module_utils.urls import fetch_url
def main():
module = AnsibleModule(
argument_spec=dict(
token=dict(type='str', required=True, no_log=True),
api_args=dict(type='dict'),
api_method=dict(type="str", default="SendMessage"),
),
supports_check_mode=True
)
token = quote(module.params.get('token'))
api_args = module.params.get('api_args') or {}
api_method = module.params.get('api_method')
# filling backward compatibility args
api_args['chat_id'] = api_args.get('chat_id')
api_args['parse_mode'] = api_args.get('parse_mode')
api_args['text'] = api_args.get('text')
if api_args['parse_mode'] == 'plain':
del api_args['parse_mode']
url = 'https://api.telegram.org/bot{token}/{api_method}'.format(token=token, api_method=api_method)
if module.check_mode:
module.exit_json(changed=False)
response, info = fetch_url(module, url, method="POST", data=json.dumps(api_args),
headers={'Content-Type': 'application/json'})
if info['status'] == 200:
module.exit_json(changed=True)
elif info['status'] == -1:
# SSL errors, connection problems, etc.
module.fail_json(msg="Failed to send message", info=info, response=response)
else:
body = json.loads(info['body'])
module.fail_json(
msg="Failed to send message, return status = {status}\n"
"url = {api_url}\n"
"api_args = {api_args}".format(
status=info['status'], api_url=url, api_args=api_args
),
telegram_error=body['description'],
)
if __name__ == '__main__':
main()
|