summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/google/plugins/modules/gcpubsub.py
blob: 2d9230c8f7bb6a5521e7420faa0bffbf35709bef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#!/usr/bin/python
# -*- coding: utf-8 -*-

# Copyright: (c) 2016, Google Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type

DOCUMENTATION = '''
---
module: gcpubsub
short_description: Create and Delete Topics/Subscriptions, Publish and pull messages on PubSub
description:
    - Create and Delete Topics/Subscriptions, Publish and pull messages on PubSub.
      See U(https://cloud.google.com/pubsub/docs) for an overview.
requirements:
  - google-auth >= 0.5.0
  - google-cloud-pubsub >= 0.22.0
notes:
  - Subscription pull happens before publish.  You cannot publish and pull in the same task.
author:
  - Tom Melendez (@supertom) <tom@supertom.com>
options:
  topic:
    type: str
    description:
       - GCP pubsub topic name.
       - Only the name, not the full path, is required.
    required: yes
  subscription:
    type: dict
    description:
       - Dictionary containing a subscription name associated with a topic (required), along with optional ack_deadline, push_endpoint and pull.
         For pulling from a subscription, message_ack (bool), max_messages (int) and return_immediate are available as subfields.
         See subfields name, push_endpoint and ack_deadline for more information.
    suboptions:
      name:
        description:
          - Subfield of subscription. Required if subscription is specified. See examples.
      ack_deadline:
        description:
          - Subfield of subscription. Not required. Default deadline for subscriptions to ACK the message before it is resent. See examples.
      pull:
        description:
          - Subfield of subscription. Not required. If specified, messages will be retrieved from topic via the
            provided subscription name. max_messages (int; default None; max number of messages to pull),
            message_ack (bool; default False; acknowledge the message) and return_immediately
            (bool; default True, don't wait for messages to appear). If the messages are acknowledged,
            changed is set to True, otherwise, changed is False.
      push_endpoint:
        description:
          - Subfield of subscription.  Not required.  If specified, message will be sent to an endpoint.
            See U(https://cloud.google.com/pubsub/docs/advanced#push_endpoints) for more information.
  publish:
    type: list
    description:
        - List of dictionaries describing messages and attributes to be published.  Dictionary is in message(str):attributes(dict) format.
          Only message is required.
  state:
    type: str
    description:
        - State of the topic or queue.
        - Applies to the most granular resource.
        - If subscription isspecified we remove it.
        - If only topic is specified, that is what is removed.
        - NOTE - A topic can be removed without first removing the subscription.
    choices: [ absent, present ]
    default: present
  project_id:
    type: str
    description:
      - your GCE project ID
  credentials_file:
    type: str
    description:
      - path to the JSON file associated with the service account email
  service_account_email:
    type: str
    description:
      - service account email
'''

EXAMPLES = '''
# (Message will be pushed; there is no check to see if the message was pushed before
- name: Create a topic and publish a message to it
  community.google.gcpubsub:
    topic: ansible-topic-example
    state: present

# Subscriptions associated with topic are not deleted.
- name: Delete Topic
  community.google.gcpubsub:
    topic: ansible-topic-example
    state: absent

# Setting absent will keep the messages from being sent
- name: Publish multiple messages, with attributes (key:value available with the message)
  community.google.gcpubsub:
    topic: '{{ topic_name }}'
    state: present
    publish:
      - message: this is message 1
        attributes:
          mykey1: myvalue
          mykey2: myvalu2
          mykey3: myvalue3
      - message: this is message 2
        attributes:
          server: prod
          sla: "99.9999"
          owner: fred

- name: Create Subscription (pull)
  community.google.gcpubsub:
    topic: ansible-topic-example
    subscription:
    - name: mysub
    state: present

# pull is default, ack_deadline is not required
- name: Create Subscription with ack_deadline and push endpoint
  community.google.gcpubsub:
    topic: ansible-topic-example
    subscription:
    - name: mysub
      ack_deadline: "60"
      push_endpoint: http://pushendpoint.example.com
    state: present

# Setting push_endpoint to "None" converts subscription to pull.
- name: Subscription change from push to pull
  community.google.gcpubsub:
    topic: ansible-topic-example
    subscription:
      name: mysub
      push_endpoint: "None"

### Topic will not be deleted
- name: Delete subscription
  community.google.gcpubsub:
    topic: ansible-topic-example
    subscription:
    - name: mysub
    state: absent

# only pull keyword is required.
- name: Pull messages from subscription
  community.google.gcpubsub:
    topic: ansible-topic-example
    subscription:
      name: ansible-topic-example-sub
      pull:
        message_ack: yes
        max_messages: "100"
'''

RETURN = '''
publish:
    description: List of dictionaries describing messages and attributes to be published.  Dictionary is in message(str):attributes(dict) format.
                 Only message is required.
    returned: Only when specified
    type: list
    sample: "publish: ['message': 'my message', attributes: {'key1': 'value1'}]"

pulled_messages:
    description: list of dictionaries containing message info.  Fields are ack_id, attributes, data, message_id.
    returned: Only when subscription.pull is specified
    type: list
    sample: [{ "ack_id": "XkASTCcYREl...","attributes": {"key1": "val1",...}, "data": "this is message 1", "message_id": "49107464153705"},..]

state:
    description: The state of the topic or subscription. Value will be either 'absent' or 'present'.
    returned: Always
    type: str
    sample: "present"

subscription:
    description: Name of subscription.
    returned: When subscription fields are specified
    type: str
    sample: "mysubscription"

topic:
    description: Name of topic.
    returned: Always
    type: str
    sample: "mytopic"
'''

try:
    from ast import literal_eval
    HAS_PYTHON26 = True
except ImportError:
    HAS_PYTHON26 = False

try:
    from google.cloud import pubsub
    HAS_GOOGLE_CLOUD_PUBSUB = True
except ImportError as e:
    HAS_GOOGLE_CLOUD_PUBSUB = False

from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.google.plugins.module_utils.gcp import check_min_pkg_version, get_google_cloud_credentials


CLOUD_CLIENT = 'google-cloud-pubsub'
CLOUD_CLIENT_MINIMUM_VERSION = '0.22.0'
CLOUD_CLIENT_USER_AGENT = 'ansible-pubsub-0.1'


def publish_messages(message_list, topic):
    with topic.batch() as batch:
        for message in message_list:
            msg = message['message']
            attrs = {}
            if 'attributes' in message:
                attrs = message['attributes']
            batch.publish(bytes(msg), **attrs)
    return True


def pull_messages(pull_params, sub):
    """
    :rtype: tuple (output, changed)
    """
    changed = False
    max_messages = pull_params.get('max_messages', None)
    message_ack = pull_params.get('message_ack', 'no')
    return_immediately = pull_params.get('return_immediately', False)

    output = []
    pulled = sub.pull(return_immediately=return_immediately, max_messages=max_messages)

    for ack_id, msg in pulled:
        msg_dict = {'message_id': msg.message_id,
                    'attributes': msg.attributes,
                    'data': msg.data,
                    'ack_id': ack_id}
        output.append(msg_dict)

    if message_ack:
        ack_ids = [m['ack_id'] for m in output]
        if ack_ids:
            sub.acknowledge(ack_ids)
            changed = True
    return (output, changed)


def main():

    module = AnsibleModule(
        argument_spec=dict(
            topic=dict(type='str', required=True),
            state=dict(type='str', default='present', choices=['absent', 'present']),
            publish=dict(type='list'),
            subscription=dict(type='dict'),
            service_account_email=dict(type='str'),
            credentials_file=dict(type='str'),
            project_id=dict(type='str'),
        ),
    )

    if not HAS_PYTHON26:
        module.fail_json(
            msg="GCE module requires python's 'ast' module, python v2.6+")

    if not HAS_GOOGLE_CLOUD_PUBSUB:
        module.fail_json(msg="Please install google-cloud-pubsub library.")

    if not check_min_pkg_version(CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION):
        module.fail_json(msg="Please install %s client version %s" % (CLOUD_CLIENT, CLOUD_CLIENT_MINIMUM_VERSION))

    mod_params = {}
    mod_params['publish'] = module.params.get('publish')
    mod_params['state'] = module.params.get('state')
    mod_params['topic'] = module.params.get('topic')
    mod_params['subscription'] = module.params.get('subscription')

    creds, params = get_google_cloud_credentials(module)
    pubsub_client = pubsub.Client(project=params['project_id'], credentials=creds, use_gax=False)
    pubsub_client.user_agent = CLOUD_CLIENT_USER_AGENT

    changed = False
    json_output = {}

    t = None
    if mod_params['topic']:
        t = pubsub_client.topic(mod_params['topic'])
    s = None
    if mod_params['subscription']:
        # Note: default ack deadline cannot be changed without deleting/recreating subscription
        s = t.subscription(mod_params['subscription']['name'],
                           ack_deadline=mod_params['subscription'].get('ack_deadline', None),
                           push_endpoint=mod_params['subscription'].get('push_endpoint', None))

    if mod_params['state'] == 'absent':
        # Remove the most granular resource.  If subscription is specified
        # we remove it.  If only topic is specified, that is what is removed.
        # Note that a topic can be removed without first removing the subscription.
        # TODO(supertom): Enhancement: Provide an option to only delete a topic
        # if there are no subscriptions associated with it (which the API does not support).
        if s is not None:
            if s.exists():
                s.delete()
                changed = True
        else:
            if t.exists():
                t.delete()
                changed = True
    elif mod_params['state'] == 'present':
        if not t.exists():
            t.create()
            changed = True
        if s:
            if not s.exists():
                s.create()
                s.reload()
                changed = True
            else:
                # Subscription operations
                # TODO(supertom): if more 'update' operations arise, turn this into a function.
                s.reload()
                push_endpoint = mod_params['subscription'].get('push_endpoint', None)
                if push_endpoint is not None:
                    if push_endpoint != s.push_endpoint:
                        if push_endpoint == 'None':
                            push_endpoint = None
                        s.modify_push_configuration(push_endpoint=push_endpoint)
                        s.reload()
                        changed = push_endpoint == s.push_endpoint

                if 'pull' in mod_params['subscription']:
                    if s.push_endpoint is not None:
                        module.fail_json(msg="Cannot pull messages, push_endpoint is configured.")
                    (json_output['pulled_messages'], changed) = pull_messages(
                        mod_params['subscription']['pull'], s)

        # publish messages to the topic
        if mod_params['publish'] and len(mod_params['publish']) > 0:
            changed = publish_messages(mod_params['publish'], t)

    json_output['changed'] = changed
    json_output.update(mod_params)
    module.exit_json(**json_output)


if __name__ == '__main__':
    main()