summaryrefslogtreecommitdiffstats
path: root/ansible_collections/community/rabbitmq/plugins/lookup/rabbitmq.py
blob: 9ecd97e5fa0c933e1a1b34560cfcc2dd7c84ebfa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# (c) 2018, John Imison <john+github@imison.net>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = '''
    name: rabbitmq
    author: John Imison (@Im0)
    short_description: Retrieve messages from an AMQP/AMQPS RabbitMQ queue.
    description:
        - This lookup uses a basic get to retrieve all, or a limited number C(count), messages from a RabbitMQ queue.
    options:
      url:
        description:
          - An URI connection string to connect to the AMQP/AMQPS RabbitMQ server.
          - For more information refer to the URI spec U(https://www.rabbitmq.com/uri-spec.html).
        required: True
      queue:
        description:
          - The queue to get messages from.
        required: True
      count:
        description:
          - How many messages to collect from the queue.
          - If not set, defaults to retrieving all the messages from the queue.
    requirements:
        - The python pika package U(https://pypi.org/project/pika/).
    notes:
        - This lookup implements BlockingChannel.basic_get to get messages from a RabbitMQ server.
        - After retrieving a message from the server, receipt of the message is acknowledged and the message on the server is deleted.
        - Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.
        - More information about pika can be found at U(https://pika.readthedocs.io/en/stable/).
        - This plugin is tested against RabbitMQ.  Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed.
        - Assigning the return messages to a variable under C(vars) may result in unexpected results as the lookup is evaluated every time the
          variable is referenced.
        - Currently this plugin only handles text based messages from a queue. Unexpected results may occur when retrieving binary data.
'''


EXAMPLES = """
- name: Get all messages off a queue
  debug:
    msg: "{{ lookup('community.rabbitmq.rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello') }}"


# If you are intending on using the returned messages as a variable in more than
# one task (eg. debug, template), it is recommended to set_fact.

- name: Get 2 messages off a queue and set a fact for re-use
  set_fact:
    messages: "{{ lookup('community.rabbitmq.rabbiotmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello', count=2) }}"

- name: Dump out contents of the messages
  debug:
    var: messages

"""

RETURN = """
  _list:
    description:
      - A list of dictionaries with keys and value from the queue.
    type: list
    contains:
      content_type:
        description: The content_type on the message in the queue.
        type: str
      delivery_mode:
        description: The delivery_mode on the message in the queue.
        type: str
      delivery_tag:
        description: The delivery_tag on the message in the queue.
        type: str
      exchange:
        description: The exchange the message came from.
        type: str
      message_count:
        description: The message_count for the message on the queue.
        type: str
      msg:
        description: The content of the message.
        type: str
      redelivered:
        description: The redelivered flag.  True if the message has been delivered before.
        type: bool
      routing_key:
        description: The routing_key on the message in the queue.
        type: str
      headers:
        description: The headers for the message returned from the queue.
        type: dict
      json:
        description: If application/json is specified in content_type, json will be loaded into variables.
        type: dict

"""

import json

from ansible.errors import AnsibleError, AnsibleParserError
from ansible.plugins.lookup import LookupBase
from ansible.module_utils._text import to_native, to_text
from ansible.utils.display import Display

try:
    import pika
    from pika import spec
    HAS_PIKA = True
except ImportError:
    HAS_PIKA = False

display = Display()


class LookupModule(LookupBase):

    def run(self, terms, variables=None, url=None, queue=None, count=None):
        if not HAS_PIKA:
            raise AnsibleError('pika python package is required for rabbitmq lookup.')
        if not url:
            raise AnsibleError('URL is required for rabbitmq lookup.')
        if not queue:
            raise AnsibleError('Queue is required for rabbitmq lookup.')

        display.vvv(u"terms:%s : variables:%s url:%s queue:%s count:%s" % (terms, variables, url, queue, count))

        try:
            parameters = pika.URLParameters(url)
        except Exception as e:
            raise AnsibleError("URL malformed: %s" % to_native(e))

        try:
            connection = pika.BlockingConnection(parameters)
        except Exception as e:
            raise AnsibleError("Connection issue: %s" % to_native(e))

        try:
            conn_channel = connection.channel()
        except pika.exceptions.AMQPChannelError as e:
            try:
                connection.close()
            except pika.exceptions.AMQPConnectionError as ie:
                raise AnsibleError("Channel and connection closing issues: %s / %s" % to_native(e), to_native(ie))
            raise AnsibleError("Channel issue: %s" % to_native(e))

        ret = []
        idx = 0

        while True:
            method_frame, properties, body = conn_channel.basic_get(queue=queue)
            if method_frame:
                display.vvv(u"%s, %s, %s " % (method_frame, properties, to_text(body)))

                # TODO: In the future consider checking content_type and handle text/binary data differently.
                msg_details = dict({
                                   'msg': to_text(body),
                                   'message_count': method_frame.message_count,
                                   'routing_key': method_frame.routing_key,
                                   'delivery_tag': method_frame.delivery_tag,
                                   'redelivered': method_frame.redelivered,
                                   'exchange': method_frame.exchange,
                                   'delivery_mode': properties.delivery_mode,
                                   'content_type': properties.content_type,
                                   'headers': properties.headers
                                   })
                if properties.content_type == 'application/json':
                    try:
                        msg_details['json'] = json.loads(msg_details['msg'])
                    except ValueError as e:
                        raise AnsibleError("Unable to decode JSON for message %s: %s" % (method_frame.delivery_tag, to_native(e)))

                ret.append(msg_details)
                conn_channel.basic_ack(method_frame.delivery_tag)
                idx += 1
                if method_frame.message_count == 0 or idx == count:
                    break
            # If we didn't get a method_frame, exit.
            else:
                break

        if connection.is_closed:
            return [ret]
        else:
            try:
                connection.close()
            except pika.exceptions.AMQPConnectionError:
                pass
            return [ret]