diff options
Diffstat (limited to 'ansible_collections/community/rabbitmq/plugins')
20 files changed, 3959 insertions, 0 deletions
diff --git a/ansible_collections/community/rabbitmq/plugins/doc_fragments/__init__.py b/ansible_collections/community/rabbitmq/plugins/doc_fragments/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/doc_fragments/__init__.py diff --git a/ansible_collections/community/rabbitmq/plugins/doc_fragments/rabbitmq.py b/ansible_collections/community/rabbitmq/plugins/doc_fragments/rabbitmq.py new file mode 100644 index 00000000..acae82e8 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/doc_fragments/rabbitmq.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2016, Jorge Rodriguez <jorge.rodriguez@tiriel.eu> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + + +class ModuleDocFragment(object): + # Parameters for RabbitMQ modules + DOCUMENTATION = r''' +options: + login_user: + description: + - RabbitMQ user for connection. + type: str + default: guest + login_password: + description: + - RabbitMQ password for connection. + type: str + default: guest + login_host: + description: + - RabbitMQ host for connection. + type: str + default: localhost + login_port: + description: + - RabbitMQ management API port. + type: str + default: '15672' + login_protocol: + description: + - RabbitMQ management API protocol. + type: str + choices: [ http , https ] + default: http + ca_cert: + description: + - CA certificate to verify SSL connection to management API. + type: path + aliases: [ cacert ] + client_cert: + description: + - Client certificate to send on SSL connections to management API. + type: path + aliases: [ cert ] + client_key: + description: + - Private key matching the client certificate. + type: path + aliases: [ key ] + vhost: + description: + - RabbitMQ virtual host. + type: str + default: "/" +''' diff --git a/ansible_collections/community/rabbitmq/plugins/lookup/rabbitmq.py b/ansible_collections/community/rabbitmq/plugins/lookup/rabbitmq.py new file mode 100644 index 00000000..9ecd97e5 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/lookup/rabbitmq.py @@ -0,0 +1,189 @@ +# (c) 2018, John Imison <john+github@imison.net> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' + name: rabbitmq + author: John Imison (@Im0) + short_description: Retrieve messages from an AMQP/AMQPS RabbitMQ queue. + description: + - This lookup uses a basic get to retrieve all, or a limited number C(count), messages from a RabbitMQ queue. + options: + url: + description: + - An URI connection string to connect to the AMQP/AMQPS RabbitMQ server. + - For more information refer to the URI spec U(https://www.rabbitmq.com/uri-spec.html). + required: True + queue: + description: + - The queue to get messages from. + required: True + count: + description: + - How many messages to collect from the queue. + - If not set, defaults to retrieving all the messages from the queue. + requirements: + - The python pika package U(https://pypi.org/project/pika/). + notes: + - This lookup implements BlockingChannel.basic_get to get messages from a RabbitMQ server. + - After retrieving a message from the server, receipt of the message is acknowledged and the message on the server is deleted. + - Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. + - More information about pika can be found at U(https://pika.readthedocs.io/en/stable/). + - This plugin is tested against RabbitMQ. Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed. + - Assigning the return messages to a variable under C(vars) may result in unexpected results as the lookup is evaluated every time the + variable is referenced. + - Currently this plugin only handles text based messages from a queue. Unexpected results may occur when retrieving binary data. +''' + + +EXAMPLES = """ +- name: Get all messages off a queue + debug: + msg: "{{ lookup('community.rabbitmq.rabbitmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello') }}" + + +# If you are intending on using the returned messages as a variable in more than +# one task (eg. debug, template), it is recommended to set_fact. + +- name: Get 2 messages off a queue and set a fact for re-use + set_fact: + messages: "{{ lookup('community.rabbitmq.rabbiotmq', url='amqp://guest:guest@192.168.0.10:5672/%2F', queue='hello', count=2) }}" + +- name: Dump out contents of the messages + debug: + var: messages + +""" + +RETURN = """ + _list: + description: + - A list of dictionaries with keys and value from the queue. + type: list + contains: + content_type: + description: The content_type on the message in the queue. + type: str + delivery_mode: + description: The delivery_mode on the message in the queue. + type: str + delivery_tag: + description: The delivery_tag on the message in the queue. + type: str + exchange: + description: The exchange the message came from. + type: str + message_count: + description: The message_count for the message on the queue. + type: str + msg: + description: The content of the message. + type: str + redelivered: + description: The redelivered flag. True if the message has been delivered before. + type: bool + routing_key: + description: The routing_key on the message in the queue. + type: str + headers: + description: The headers for the message returned from the queue. + type: dict + json: + description: If application/json is specified in content_type, json will be loaded into variables. + type: dict + +""" + +import json + +from ansible.errors import AnsibleError, AnsibleParserError +from ansible.plugins.lookup import LookupBase +from ansible.module_utils._text import to_native, to_text +from ansible.utils.display import Display + +try: + import pika + from pika import spec + HAS_PIKA = True +except ImportError: + HAS_PIKA = False + +display = Display() + + +class LookupModule(LookupBase): + + def run(self, terms, variables=None, url=None, queue=None, count=None): + if not HAS_PIKA: + raise AnsibleError('pika python package is required for rabbitmq lookup.') + if not url: + raise AnsibleError('URL is required for rabbitmq lookup.') + if not queue: + raise AnsibleError('Queue is required for rabbitmq lookup.') + + display.vvv(u"terms:%s : variables:%s url:%s queue:%s count:%s" % (terms, variables, url, queue, count)) + + try: + parameters = pika.URLParameters(url) + except Exception as e: + raise AnsibleError("URL malformed: %s" % to_native(e)) + + try: + connection = pika.BlockingConnection(parameters) + except Exception as e: + raise AnsibleError("Connection issue: %s" % to_native(e)) + + try: + conn_channel = connection.channel() + except pika.exceptions.AMQPChannelError as e: + try: + connection.close() + except pika.exceptions.AMQPConnectionError as ie: + raise AnsibleError("Channel and connection closing issues: %s / %s" % to_native(e), to_native(ie)) + raise AnsibleError("Channel issue: %s" % to_native(e)) + + ret = [] + idx = 0 + + while True: + method_frame, properties, body = conn_channel.basic_get(queue=queue) + if method_frame: + display.vvv(u"%s, %s, %s " % (method_frame, properties, to_text(body))) + + # TODO: In the future consider checking content_type and handle text/binary data differently. + msg_details = dict({ + 'msg': to_text(body), + 'message_count': method_frame.message_count, + 'routing_key': method_frame.routing_key, + 'delivery_tag': method_frame.delivery_tag, + 'redelivered': method_frame.redelivered, + 'exchange': method_frame.exchange, + 'delivery_mode': properties.delivery_mode, + 'content_type': properties.content_type, + 'headers': properties.headers + }) + if properties.content_type == 'application/json': + try: + msg_details['json'] = json.loads(msg_details['msg']) + except ValueError as e: + raise AnsibleError("Unable to decode JSON for message %s: %s" % (method_frame.delivery_tag, to_native(e))) + + ret.append(msg_details) + conn_channel.basic_ack(method_frame.delivery_tag) + idx += 1 + if method_frame.message_count == 0 or idx == count: + break + # If we didn't get a method_frame, exit. + else: + break + + if connection.is_closed: + return [ret] + else: + try: + connection.close() + except pika.exceptions.AMQPConnectionError: + pass + return [ret] diff --git a/ansible_collections/community/rabbitmq/plugins/module_utils/_version.py b/ansible_collections/community/rabbitmq/plugins/module_utils/_version.py new file mode 100644 index 00000000..bf994226 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/module_utils/_version.py @@ -0,0 +1,343 @@ +# Vendored copy of distutils/version.py from CPython 3.9.5 +# +# Implements multiple version numbering conventions for the +# Python Module Distribution Utilities. +# +# PSF License (see PSF-license.txt or https://opensource.org/licenses/Python-2.0) +# + +"""Provides classes to represent module version numbers (one class for +each style of version numbering). There are currently two such classes +implemented: StrictVersion and LooseVersion. + +Every version number class implements the following interface: + * the 'parse' method takes a string and parses it to some internal + representation; if the string is an invalid version number, + 'parse' raises a ValueError exception + * the class constructor takes an optional string argument which, + if supplied, is passed to 'parse' + * __str__ reconstructs the string that was passed to 'parse' (or + an equivalent string -- ie. one that will generate an equivalent + version number instance) + * __repr__ generates Python code to recreate the version number instance + * _cmp compares the current instance with either another instance + of the same class or a string (which will be parsed to an instance + of the same class, thus must follow the same rules) +""" + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import re + +try: + RE_FLAGS = re.VERBOSE | re.ASCII # type: ignore[attr-defined] +except AttributeError: + RE_FLAGS = re.VERBOSE + + +class Version: + """Abstract base class for version numbering classes. Just provides + constructor (__init__) and reproducer (__repr__), because those + seem to be the same for all version numbering classes; and route + rich comparisons to _cmp. + """ + + def __init__(self, vstring=None): + if vstring: + self.parse(vstring) + + def __repr__(self): + return "%s ('%s')" % (self.__class__.__name__, str(self)) + + def __eq__(self, other): + c = self._cmp(other) + if c is NotImplemented: + return c + return c == 0 + + def __lt__(self, other): + c = self._cmp(other) + if c is NotImplemented: + return c + return c < 0 + + def __le__(self, other): + c = self._cmp(other) + if c is NotImplemented: + return c + return c <= 0 + + def __gt__(self, other): + c = self._cmp(other) + if c is NotImplemented: + return c + return c > 0 + + def __ge__(self, other): + c = self._cmp(other) + if c is NotImplemented: + return c + return c >= 0 + + +# Interface for version-number classes -- must be implemented +# by the following classes (the concrete ones -- Version should +# be treated as an abstract class). +# __init__ (string) - create and take same action as 'parse' +# (string parameter is optional) +# parse (string) - convert a string representation to whatever +# internal representation is appropriate for +# this style of version numbering +# __str__ (self) - convert back to a string; should be very similar +# (if not identical to) the string supplied to parse +# __repr__ (self) - generate Python code to recreate +# the instance +# _cmp (self, other) - compare two version numbers ('other' may +# be an unparsed version string, or another +# instance of your version class) + + +class StrictVersion(Version): + """Version numbering for anal retentives and software idealists. + Implements the standard interface for version number classes as + described above. A version number consists of two or three + dot-separated numeric components, with an optional "pre-release" tag + on the end. The pre-release tag consists of the letter 'a' or 'b' + followed by a number. If the numeric components of two version + numbers are equal, then one with a pre-release tag will always + be deemed earlier (lesser) than one without. + + The following are valid version numbers (shown in the order that + would be obtained by sorting according to the supplied cmp function): + + 0.4 0.4.0 (these two are equivalent) + 0.4.1 + 0.5a1 + 0.5b3 + 0.5 + 0.9.6 + 1.0 + 1.0.4a3 + 1.0.4b1 + 1.0.4 + + The following are examples of invalid version numbers: + + 1 + 2.7.2.2 + 1.3.a4 + 1.3pl1 + 1.3c4 + + The rationale for this version numbering system will be explained + in the distutils documentation. + """ + + version_re = re.compile(r'^(\d+) \. (\d+) (\. (\d+))? ([ab](\d+))?$', + RE_FLAGS) + + def parse(self, vstring): + match = self.version_re.match(vstring) + if not match: + raise ValueError("invalid version number '%s'" % vstring) + + (major, minor, patch, prerelease, prerelease_num) = \ + match.group(1, 2, 4, 5, 6) + + if patch: + self.version = tuple(map(int, [major, minor, patch])) + else: + self.version = tuple(map(int, [major, minor])) + (0,) + + if prerelease: + self.prerelease = (prerelease[0], int(prerelease_num)) + else: + self.prerelease = None + + def __str__(self): + if self.version[2] == 0: + vstring = '.'.join(map(str, self.version[0:2])) + else: + vstring = '.'.join(map(str, self.version)) + + if self.prerelease: + vstring = vstring + self.prerelease[0] + str(self.prerelease[1]) + + return vstring + + def _cmp(self, other): + if isinstance(other, str): + other = StrictVersion(other) + elif not isinstance(other, StrictVersion): + return NotImplemented + + if self.version != other.version: + # numeric versions don't match + # prerelease stuff doesn't matter + if self.version < other.version: + return -1 + else: + return 1 + + # have to compare prerelease + # case 1: neither has prerelease; they're equal + # case 2: self has prerelease, other doesn't; other is greater + # case 3: self doesn't have prerelease, other does: self is greater + # case 4: both have prerelease: must compare them! + + if (not self.prerelease and not other.prerelease): + return 0 + elif (self.prerelease and not other.prerelease): + return -1 + elif (not self.prerelease and other.prerelease): + return 1 + elif (self.prerelease and other.prerelease): + if self.prerelease == other.prerelease: + return 0 + elif self.prerelease < other.prerelease: + return -1 + else: + return 1 + else: + raise AssertionError("never get here") + +# end class StrictVersion + +# The rules according to Greg Stein: +# 1) a version number has 1 or more numbers separated by a period or by +# sequences of letters. If only periods, then these are compared +# left-to-right to determine an ordering. +# 2) sequences of letters are part of the tuple for comparison and are +# compared lexicographically +# 3) recognize the numeric components may have leading zeroes +# +# The LooseVersion class below implements these rules: a version number +# string is split up into a tuple of integer and string components, and +# comparison is a simple tuple comparison. This means that version +# numbers behave in a predictable and obvious way, but a way that might +# not necessarily be how people *want* version numbers to behave. There +# wouldn't be a problem if people could stick to purely numeric version +# numbers: just split on period and compare the numbers as tuples. +# However, people insist on putting letters into their version numbers; +# the most common purpose seems to be: +# - indicating a "pre-release" version +# ('alpha', 'beta', 'a', 'b', 'pre', 'p') +# - indicating a post-release patch ('p', 'pl', 'patch') +# but of course this can't cover all version number schemes, and there's +# no way to know what a programmer means without asking them. +# +# The problem is what to do with letters (and other non-numeric +# characters) in a version number. The current implementation does the +# obvious and predictable thing: keep them as strings and compare +# lexically within a tuple comparison. This has the desired effect if +# an appended letter sequence implies something "post-release": +# eg. "0.99" < "0.99pl14" < "1.0", and "5.001" < "5.001m" < "5.002". +# +# However, if letters in a version number imply a pre-release version, +# the "obvious" thing isn't correct. Eg. you would expect that +# "1.5.1" < "1.5.2a2" < "1.5.2", but under the tuple/lexical comparison +# implemented here, this just isn't so. +# +# Two possible solutions come to mind. The first is to tie the +# comparison algorithm to a particular set of semantic rules, as has +# been done in the StrictVersion class above. This works great as long +# as everyone can go along with bondage and discipline. Hopefully a +# (large) subset of Python module programmers will agree that the +# particular flavour of bondage and discipline provided by StrictVersion +# provides enough benefit to be worth using, and will submit their +# version numbering scheme to its domination. The free-thinking +# anarchists in the lot will never give in, though, and something needs +# to be done to accommodate them. +# +# Perhaps a "moderately strict" version class could be implemented that +# lets almost anything slide (syntactically), and makes some heuristic +# assumptions about non-digits in version number strings. This could +# sink into special-case-hell, though; if I was as talented and +# idiosyncratic as Larry Wall, I'd go ahead and implement a class that +# somehow knows that "1.2.1" < "1.2.2a2" < "1.2.2" < "1.2.2pl3", and is +# just as happy dealing with things like "2g6" and "1.13++". I don't +# think I'm smart enough to do it right though. +# +# In any case, I've coded the test suite for this module (see +# ../test/test_version.py) specifically to fail on things like comparing +# "1.2a2" and "1.2". That's not because the *code* is doing anything +# wrong, it's because the simple, obvious design doesn't match my +# complicated, hairy expectations for real-world version numbers. It +# would be a snap to fix the test suite to say, "Yep, LooseVersion does +# the Right Thing" (ie. the code matches the conception). But I'd rather +# have a conception that matches common notions about version numbers. + + +class LooseVersion(Version): + """Version numbering for anarchists and software realists. + Implements the standard interface for version number classes as + described above. A version number consists of a series of numbers, + separated by either periods or strings of letters. When comparing + version numbers, the numeric components will be compared + numerically, and the alphabetic components lexically. The following + are all valid version numbers, in no particular order: + + 1.5.1 + 1.5.2b2 + 161 + 3.10a + 8.02 + 3.4j + 1996.07.12 + 3.2.pl0 + 3.1.1.6 + 2g6 + 11g + 0.960923 + 2.2beta29 + 1.13++ + 5.5.kw + 2.0b1pl0 + + In fact, there is no such thing as an invalid version number under + this scheme; the rules for comparison are simple and predictable, + but may not always give the results you want (for some definition + of "want"). + """ + + component_re = re.compile(r'(\d+ | [a-z]+ | \.)', re.VERBOSE) + + def __init__(self, vstring=None): + if vstring: + self.parse(vstring) + + def parse(self, vstring): + # I've given up on thinking I can reconstruct the version string + # from the parsed tuple -- so I just store the string here for + # use by __str__ + self.vstring = vstring + components = [x for x in self.component_re.split(vstring) if x and x != '.'] + for i, obj in enumerate(components): + try: + components[i] = int(obj) + except ValueError: + pass + + self.version = components + + def __str__(self): + return self.vstring + + def __repr__(self): + return "LooseVersion ('%s')" % str(self) + + def _cmp(self, other): + if isinstance(other, str): + other = LooseVersion(other) + elif not isinstance(other, LooseVersion): + return NotImplemented + + if self.version == other.version: + return 0 + if self.version < other.version: + return -1 + if self.version > other.version: + return 1 + +# end class LooseVersion diff --git a/ansible_collections/community/rabbitmq/plugins/module_utils/rabbitmq.py b/ansible_collections/community/rabbitmq/plugins/module_utils/rabbitmq.py new file mode 100644 index 00000000..f981c853 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/module_utils/rabbitmq.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2016, Jorge Rodriguez <jorge.rodriguez@tiriel.eu> +# Copyright: (c) 2018, John Imison <john+github@imison.net> +# +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible.module_utils._text import to_native +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils.six.moves.urllib import parse as urllib_parse +from mimetypes import MimeTypes + +import os +import json +import traceback + +PIKA_IMP_ERR = None +try: + import pika + import pika.exceptions + from pika import spec + HAS_PIKA = True +except ImportError: + PIKA_IMP_ERR = traceback.format_exc() + HAS_PIKA = False + + +def rabbitmq_argument_spec(): + return dict( + login_user=dict(type='str', default='guest'), + login_password=dict(type='str', default='guest', no_log=True), + login_host=dict(type='str', default='localhost'), + login_port=dict(type='str', default='15672'), + login_protocol=dict(type='str', default='http', choices=['http', 'https']), + ca_cert=dict(type='path', aliases=['cacert']), + client_cert=dict(type='path', aliases=['cert']), + client_key=dict(type='path', aliases=['key']), + vhost=dict(type='str', default='/'), + ) + + +# notification/rabbitmq_basic_publish.py +class RabbitClient(): + def __init__(self, module): + self.module = module + self.params = module.params + self.check_required_library() + self.check_host_params() + self.url = self.params['url'] + self.proto = self.params['proto'] + self.username = self.params['username'] + self.password = self.params['password'] + self.host = self.params['host'] + self.port = self.params['port'] + self.vhost = self.params['vhost'] + self.queue = self.params['queue'] + self.exchange = self.params['exchange'] + self.routing_key = self.params['routing_key'] + self.headers = self.params['headers'] + self.cafile = self.params['cafile'] + self.certfile = self.params['certfile'] + self.keyfile = self.params['keyfile'] + + if self.host is not None: + self.build_url() + + if self.cafile is not None: + self.append_ssl_certs() + + self.connect_to_rabbitmq() + + def check_required_library(self): + if not HAS_PIKA: + self.module.fail_json(msg=missing_required_lib("pika"), exception=PIKA_IMP_ERR) + + def check_host_params(self): + # Fail if url is specified and other conflicting parameters have been specified + if self.params['url'] is not None and any(self.params[k] is not None for k in ['proto', 'host', 'port', 'password', 'username', 'vhost']): + self.module.fail_json(msg="url and proto, host, port, vhost, username or password cannot be specified at the same time.") + + # Fail if url not specified and there is a missing parameter to build the url + if self.params['url'] is None and any(self.params[k] is None for k in ['proto', 'host', 'port', 'password', 'username', 'vhost']): + self.module.fail_json(msg="Connection parameters must be passed via url, or, proto, host, port, vhost, username or password.") + + def append_ssl_certs(self): + ssl_options = {} + if self.cafile: + ssl_options['cafile'] = self.cafile + if self.certfile: + ssl_options['certfile'] = self.certfile + if self.keyfile: + ssl_options['keyfile'] = self.keyfile + + self.url = self.url + '?ssl_options=' + urllib_parse.quote(json.dumps(ssl_options)) + + @staticmethod + def rabbitmq_argument_spec(): + return dict( + url=dict(type='str'), + proto=dict(type='str', choices=['amqp', 'amqps']), + host=dict(type='str'), + port=dict(type='int'), + username=dict(type='str'), + password=dict(type='str', no_log=True), + vhost=dict(type='str'), + queue=dict(type='str') + ) + + ''' Consider some file size limits here ''' + def _read_file(self, path): + try: + with open(path, "rb") as file_handle: + return file_handle.read() + except IOError as e: + self.module.fail_json(msg="Unable to open file %s: %s" % (path, to_native(e))) + + @staticmethod + def _check_file_mime_type(path): + mime = MimeTypes() + return mime.guess_type(path) + + def build_url(self): + self.url = '{0}://{1}:{2}@{3}:{4}/{5}'.format(self.proto, + self.username, + self.password, + self.host, + self.port, + self.vhost) + + def connect_to_rabbitmq(self): + """ + Function to connect to rabbitmq using username and password + """ + try: + parameters = pika.URLParameters(self.url) + except Exception as e: + self.module.fail_json(msg="URL malformed: %s" % to_native(e)) + + try: + self.connection = pika.BlockingConnection(parameters) + except Exception as e: + self.module.fail_json(msg="Connection issue: %s" % to_native(e)) + + try: + self.conn_channel = self.connection.channel() + except pika.exceptions.AMQPChannelError as e: + self.close_connection() + self.module.fail_json(msg="Channel issue: %s" % to_native(e)) + + def close_connection(self): + try: + self.connection.close() + except pika.exceptions.AMQPConnectionError: + pass + + def basic_publish(self): + self.content_type = self.params.get("content_type") + + if self.params.get("body") is not None: + args = dict( + body=self.params.get("body"), + properties=pika.BasicProperties(content_type=self.content_type, delivery_mode=1, headers=self.headers)) + + # If src (file) is defined and content_type is left as default, do a mime lookup on the file + if self.params.get("src") is not None and self.content_type == 'text/plain': + self.content_type = RabbitClient._check_file_mime_type(self.params.get("src"))[0] + self.headers.update( + filename=os.path.basename(self.params.get("src")) + ) + + args = dict( + body=self._read_file(self.params.get("src")), + properties=pika.BasicProperties(content_type=self.content_type, + delivery_mode=1, + headers=self.headers + )) + elif self.params.get("src") is not None: + args = dict( + body=self._read_file(self.params.get("src")), + properties=pika.BasicProperties(content_type=self.content_type, + delivery_mode=1, + headers=self.headers + )) + + try: + # If queue and exchange is not defined post to random queue, RabbitMQ will return the queue name of the automatically generated queue. + if self.queue is None and self.exchange is None: + result = self.conn_channel.queue_declare(queue='', + durable=self.params.get("durable"), + exclusive=self.params.get("exclusive"), + auto_delete=self.params.get("auto_delete")) + self.conn_channel.confirm_delivery() + self.queue = result.method.queue + elif self.queue is not None and self.exchange is None: + self.conn_channel.queue_declare(queue=self.queue, + durable=self.params.get("durable"), + exclusive=self.params.get("exclusive"), + auto_delete=self.params.get("auto_delete")) + self.conn_channel.confirm_delivery() + except Exception as e: + self.module.fail_json(msg="Queue declare issue: %s" % to_native(e)) + + # https://github.com/ansible/ansible/blob/devel/lib/ansible/module_utils/cloudstack.py#L150 + # If routing key is not defined, but, the queue is... we will use the queue name as routing_key. + if self.routing_key is not None: + args['routing_key'] = self.routing_key + elif self.routing_key is None and self.queue is not None: + args['routing_key'] = self.queue + elif self.routing_key is None and self.exchange is not None: + args['routing_key'] = self.exchange + else: + args['routing_key'] = '' + + # If exchange is not specified use the default/nameless exchange + if self.exchange is None: + args['exchange'] = '' + else: + args['exchange'] = self.exchange + if self.routing_key is None: + args['routing_key'] = self.exchange + + # self.module.fail_json(msg="%s %s %s" % (to_native(self.queue), to_native(self.exchange), to_native(self.routing_key))) + try: + self.conn_channel.basic_publish(**args) + return True + except pika.exceptions.UnroutableError: + return False diff --git a/ansible_collections/community/rabbitmq/plugins/module_utils/version.py b/ansible_collections/community/rabbitmq/plugins/module_utils/version.py new file mode 100644 index 00000000..c771682b --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/module_utils/version.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Felix Fontein <felix@fontein.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +"""Provide version object to compare version numbers.""" + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +# Once we drop support for Ansible 2.9, ansible-base 2.10, and ansible-core 2.11, we can +# remove the _version.py file, and replace the following import by +# +# from ansible.module_utils.compat.version import LooseVersion + +from ._version import LooseVersion, StrictVersion diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_binding.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_binding.py new file mode 100644 index 00000000..6826c3af --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_binding.py @@ -0,0 +1,298 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2015, Manuel Sousa <manuel.sousa@gmail.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_binding +author: Manuel Sousa (@manuel-sousa) + +short_description: Manage rabbitMQ bindings +description: + - This module uses rabbitMQ REST APIs to create / delete bindings. +requirements: [ "requests >= 1.0.0" ] +options: + state: + description: + - Whether the bindings should be present or absent. + type: str + choices: [ "present", "absent" ] + default: present + name: + description: + - source exchange to create binding on. + type: str + required: true + aliases: [ "src", "source" ] + destination: + description: + - destination exchange or queue for the binding. + type: str + required: true + aliases: [ "dst", "dest" ] + destination_type: + description: + - Either queue or exchange. + type: str + required: true + choices: [ "queue", "exchange" ] + aliases: [ "type", "dest_type" ] + routing_key: + description: + - routing key for the binding. + type: str + default: "#" + arguments: + description: + - extra arguments for exchange. If defined this argument is a key/value dictionary + type: dict + required: false + default: {} +extends_documentation_fragment: + - community.rabbitmq.rabbitmq + +''' + +EXAMPLES = r''' +- name: Bind myQueue to directExchange with routing key info + community.rabbitmq.rabbitmq_binding: + name: directExchange + destination: myQueue + type: queue + routing_key: info + +- name: Bind directExchange to topicExchange with routing key *.info + community.rabbitmq.rabbitmq_binding: + name: topicExchange + destination: topicExchange + type: exchange + routing_key: '*.info' +''' + +import json +import traceback + +REQUESTS_IMP_ERR = None +try: + import requests + HAS_REQUESTS = True +except ImportError: + REQUESTS_IMP_ERR = traceback.format_exc() + HAS_REQUESTS = False + +from ansible.module_utils.six.moves.urllib import parse as urllib_parse +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible_collections.community.rabbitmq.plugins.module_utils.rabbitmq import rabbitmq_argument_spec + + +class RabbitMqBinding(object): + def __init__(self, module): + """ + :param module: + """ + self.module = module + self.name = self.module.params['name'] + self.login_user = self.module.params['login_user'] + self.login_password = self.module.params['login_password'] + self.login_host = self.module.params['login_host'] + self.login_port = self.module.params['login_port'] + self.login_protocol = self.module.params['login_protocol'] + self.vhost = self.module.params['vhost'] + self.destination = self.module.params['destination'] + self.destination_type = 'q' if self.module.params['destination_type'] == 'queue' else 'e' + self.routing_key = self.module.params['routing_key'] + self.arguments = self.module.params['arguments'] + self.verify = self.module.params['ca_cert'] + self.cert = self.module.params['client_cert'] + self.key = self.module.params['client_key'] + self.props = urllib_parse.quote(self.routing_key) if self.routing_key != '' else '~' + self.base_url = '{0}://{1}:{2}/api/bindings'.format(self.login_protocol, + self.login_host, + self.login_port) + self.url = '{0}/{1}/e/{2}/{3}/{4}/{5}'.format(self.base_url, + urllib_parse.quote(self.vhost, safe=''), + urllib_parse.quote(self.name, safe=''), + self.destination_type, + urllib_parse.quote(self.destination, safe=''), + self.props) + self.result = { + 'changed': False, + 'name': self.module.params['name'], + } + self.authentication = ( + self.login_user, + self.login_password + ) + self.request = requests + self.http_check_states = { + 200: True, + 404: False, + } + self.http_actionable_states = { + 201: True, + 204: True, + } + self.api_result = self.request.get(self.url, auth=self.authentication, verify=self.verify, cert=(self.cert, self.key)) + + def run(self): + """ + :return: + """ + self.check_presence() + self.check_mode() + self.action_mode() + + def check_presence(self): + """ + :return: + """ + if self.check_should_throw_fail(): + self.fail() + + def change_required(self): + """ + :return: + """ + if self.module.params['state'] == 'present': + if not self.is_present(): + return True + elif self.module.params['state'] == 'absent': + if self.is_present(): + return True + return False + + def is_present(self): + """ + :return: + """ + return self.http_check_states.get(self.api_result.status_code, False) + + def check_mode(self): + """ + :return: + """ + if self.module.check_mode: + result = self.result + result['changed'] = self.change_required() + result['details'] = self.api_result.json() if self.is_present() else self.api_result.text + result['arguments'] = self.module.params['arguments'] + self.module.exit_json(**result) + + def check_reply_is_correct(self): + """ + :return: + """ + if self.api_result.status_code in self.http_check_states: + return True + return False + + def check_should_throw_fail(self): + """ + :return: + """ + if not self.is_present(): + if not self.check_reply_is_correct(): + return True + return False + + def action_mode(self): + """ + :return: + """ + result = self.result + if self.change_required(): + if self.module.params['state'] == 'present': + self.create() + if self.module.params['state'] == 'absent': + self.remove() + if self.action_should_throw_fail(): + self.fail() + result['changed'] = True + result['destination'] = self.module.params['destination'] + self.module.exit_json(**result) + else: + result['changed'] = False + self.module.exit_json(**result) + + def action_reply_is_correct(self): + """ + :return: + """ + if self.api_result.status_code in self.http_actionable_states: + return True + return False + + def action_should_throw_fail(self): + """ + :return: + """ + if not self.action_reply_is_correct(): + return True + return False + + def create(self): + """ + :return: + """ + self.url = '{0}/{1}/e/{2}/{3}/{4}'.format(self.base_url, + urllib_parse.quote(self.vhost, safe=''), + urllib_parse.quote(self.name, safe=''), + self.destination_type, + urllib_parse.quote(self.destination, safe='')) + self.api_result = self.request.post(self.url, + auth=self.authentication, + verify=self.verify, + cert=(self.cert, self.key), + headers={"content-type": "application/json"}, + data=json.dumps({ + 'routing_key': self.routing_key, + 'arguments': self.arguments + })) + + def remove(self): + """ + :return: + """ + self.api_result = self.request.delete(self.url, auth=self.authentication, verify=self.verify, cert=(self.cert, self.key)) + + def fail(self): + """ + :return: + """ + self.module.fail_json( + msg="Unexpected reply from API", + status=self.api_result.status_code, + details=self.api_result.text + ) + + +def main(): + + argument_spec = rabbitmq_argument_spec() + argument_spec.update( + dict( + state=dict(default='present', choices=['present', 'absent'], type='str'), + name=dict(required=True, aliases=["src", "source"], type='str'), + destination=dict(required=True, aliases=["dst", "dest"], type='str'), + destination_type=dict(required=True, aliases=["type", "dest_type"], choices=["queue", "exchange"], + type='str'), + routing_key=dict(default='#', type='str', no_log=False), + arguments=dict(default=dict(), type='dict') + ) + ) + module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) + + if not HAS_REQUESTS: + module.fail_json(msg=missing_required_lib("requests"), exception=REQUESTS_IMP_ERR) + + RabbitMqBinding(module).run() + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_exchange.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_exchange.py new file mode 100644 index 00000000..41f3f742 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_exchange.py @@ -0,0 +1,234 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2015, Manuel Sousa <manuel.sousa@gmail.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_exchange +author: Manuel Sousa (@manuel-sousa) + +short_description: Manage rabbitMQ exchange +description: + - This module uses rabbitMQ Rest API to create/delete exchanges +requirements: [ "requests >= 1.0.0" ] +options: + name: + description: + - Name of the exchange to create. + type: str + required: true + state: + description: + - Whether the exchange should be present or absent. + type: str + choices: [ "present", "absent" ] + required: false + default: present + durable: + description: + - Whether exchange is durable or not. + type: bool + required: false + default: true + exchange_type: + description: + - Type for the exchange. + - If using I(x-delayed-message), I(x-random), I(x-consistent-hash) or I(x-recent-history) the respective plugin on + - the RabbitMQ server must be enabled. + type: str + required: false + choices: [ "fanout", "direct", "headers", "topic", "x-delayed-message", "x-random", "x-consistent-hash", "x-recent-history" ] + aliases: [ "type" ] + default: direct + auto_delete: + description: + - If the exchange should delete itself after all queues/exchanges unbound from it. + type: bool + required: false + default: false + internal: + description: + - Exchange is available only for other exchanges. + type: bool + required: false + default: false + arguments: + description: + - Extra arguments for exchange. If defined this argument is a key/value dictionary. + type: dict + required: false + default: {} +extends_documentation_fragment: + - community.rabbitmq.rabbitmq + +''' + +EXAMPLES = r''' +- name: Create direct exchange + community.rabbitmq.rabbitmq_exchange: + name: directExchange + +- name: Create topic exchange on vhost + community.rabbitmq.rabbitmq_exchange: + name: topicExchange + type: topic + vhost: myVhost +''' + +import json +import traceback + +REQUESTS_IMP_ERR = None +try: + import requests + HAS_REQUESTS = True +except ImportError: + REQUESTS_IMP_ERR = traceback.format_exc() + HAS_REQUESTS = False + +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.six.moves.urllib import parse as urllib_parse +from ansible_collections.community.rabbitmq.plugins.module_utils.rabbitmq import rabbitmq_argument_spec + + +def main(): + + argument_spec = rabbitmq_argument_spec() + argument_spec.update( + dict( + state=dict(default='present', choices=['present', 'absent'], type='str'), + name=dict(required=True, type='str'), + durable=dict(default=True, type='bool'), + auto_delete=dict(default=False, type='bool'), + internal=dict(default=False, type='bool'), + exchange_type=dict(default='direct', aliases=['type'], + choices=['fanout', 'direct', 'headers', 'topic', 'x-delayed-message', + 'x-random', 'x-consistent-hash', 'x-recent-history'], + type='str'), + arguments=dict(default=dict(), type='dict') + ) + ) + module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) + + url = "%s://%s:%s/api/exchanges/%s/%s" % ( + module.params['login_protocol'], + module.params['login_host'], + module.params['login_port'], + urllib_parse.quote(module.params['vhost'], ''), + urllib_parse.quote(module.params['name'], '') + ) + + if not HAS_REQUESTS: + module.fail_json(msg=missing_required_lib("requests"), exception=REQUESTS_IMP_ERR) + + # exchange plugin type to plugin name mapping + exchange_plugins = {'x-consistent-hash': 'rabbitmq_consistent_hash_exchange', + 'x-random': 'rabbitmq_random_exchange', + 'x-delayed-message': 'rabbitmq_delayed_message_exchange', + 'x-recent-history': 'rabbitmq_recent_history_exchange'} + result = dict(changed=False, name=module.params['name']) + + # Check if exchange already exists + r = requests.get(url, auth=(module.params['login_user'], module.params['login_password']), + verify=module.params['ca_cert'], cert=(module.params['client_cert'], module.params['client_key'])) + + if r.status_code == 200: + exchange_exists = True + response = r.json() + elif r.status_code == 404: + exchange_exists = False + response = r.text + else: + module.fail_json( + msg="Invalid response from RESTAPI when trying to check if exchange exists", + details=r.text + ) + + if module.params['state'] == 'present': + change_required = not exchange_exists + else: + change_required = exchange_exists + + # Check if attributes change on existing exchange + if not change_required and r.status_code == 200 and module.params['state'] == 'present': + if not ( + response['durable'] == module.params['durable'] and + response['auto_delete'] == module.params['auto_delete'] and + response['internal'] == module.params['internal'] and + response['type'] == module.params['exchange_type'] + ): + module.fail_json( + msg="RabbitMQ RESTAPI doesn't support attribute changes for existing exchanges" + ) + + # Exit if check_mode + if module.check_mode: + result['changed'] = change_required + result['details'] = response + result['arguments'] = module.params['arguments'] + module.exit_json(**result) + + # Do changes + if change_required: + if module.params['state'] == 'present': + r = requests.put( + url, + auth=(module.params['login_user'], module.params['login_password']), + headers={"content-type": "application/json"}, + data=json.dumps({ + "durable": module.params['durable'], + "auto_delete": module.params['auto_delete'], + "internal": module.params['internal'], + "type": module.params['exchange_type'], + "arguments": module.params['arguments'] + }), + verify=module.params['ca_cert'], + cert=(module.params['client_cert'], module.params['client_key']) + ) + elif module.params['state'] == 'absent': + r = requests.delete(url, auth=(module.params['login_user'], module.params['login_password']), + verify=module.params['ca_cert'], cert=(module.params['client_cert'], module.params['client_key'])) + + # RabbitMQ 3.6.7 changed this response code from 204 to 201 + if r.status_code == 204 or r.status_code == 201: + result['changed'] = True + module.exit_json(**result) + else: + rjson = r.json() + if (rjson['reason'].startswith('unknown exchange type')): + try: + module.fail_json( + msg=("Error creating exchange. You may need to enable the '%s' plugin for exchange type %s" % + (exchange_plugins[module.params['exchange_type']], module.params['exchange_type'])), + status=r.status_code, + details=r.text + ) + except KeyError: + module.fail_json( + msg=("Error creating exchange. You may need to enable a plugin for exchange type %s" % + module.params['exchange_type']), + status=r.status_code, + details=r.text + ) + else: + module.fail_json( + msg="Error creating exchange", + status=r.status_code, + details=r.text + ) + + else: + module.exit_json( + changed=False, + name=module.params['name'] + ) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_feature_flag.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_feature_flag.py new file mode 100644 index 00000000..23db08b6 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_feature_flag.py @@ -0,0 +1,98 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Damian Dabrowski <damian@dabrowski.cloud> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: rabbitmq_feature_flag +short_description: Enables feature flag +description: + - Allows to enable specified feature flag. +author: "Damian Dabrowski (@damiandabrowski5)" +version_added: '1.1.0' +options: + name: + description: + - Feature flag name. + type: str + required: true + node: + description: + - Erlang node name of the target rabbit node. + type: str + default: rabbit +''' + +EXAMPLES = r''' +- name: Enable the 'maintenance_mode_status' feature flag on 'rabbit@node-1' + community.rabbitmq.rabbitmq_feature_flag: + name: maintenance_mode_status + node: rabbit@node-1 +''' + +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqFeatureFlag(object): + + def __init__(self, module, name, node): + self.module = module + self.name = name + self.node = node + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + self.state = self.get_flag_state() + + def _exec(self, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmqctl, '-q', '-n', self.node] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.splitlines() + return list() + + def get_flag_state(self): + global_parameters = self._exec(['list_feature_flags'], True) + + for param_item in global_parameters: + name, state = param_item.split('\t') + if name == self.name: + if state == 'enabled': + return 'enabled' + return 'disabled' + return 'unavailable' + + def enable(self): + self._exec(['enable_feature_flag', self.name]) + + +def main(): + arg_spec = dict( + name=dict(type='str', required=True), + node=dict(type='str', default='rabbit') + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + name = module.params['name'] + node = module.params['node'] + + result = dict(changed=False) + rabbitmq_feature_flag = RabbitMqFeatureFlag(module, name, node) + + if rabbitmq_feature_flag.state == 'disabled': + rabbitmq_feature_flag.enable() + result['changed'] = True + if rabbitmq_feature_flag.state == 'unavailable': + module.fail_json(msg="%s feature flag is not available" % (name)) + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_global_parameter.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_global_parameter.py new file mode 100644 index 00000000..8cecce5f --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_global_parameter.py @@ -0,0 +1,158 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# Copyright: (c) 2017, Juergen Kirschbaum <jk@jk-itc.de> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_global_parameter +short_description: Manage RabbitMQ global parameters +description: + - Manage dynamic, cluster-wide global parameters for RabbitMQ +author: "Juergen Kirschbaum (@jgkirschbaum)" +options: + name: + description: + - Name of the global parameter being set + type: str + required: true + default: null + value: + description: + - Value of the global parameter, as a JSON term + type: str + required: false + default: null + node: + description: + - erlang node name of the rabbit we wish to configure + type: str + required: false + default: rabbit + state: + description: + - Specify if global parameter is to be added or removed + type: str + required: false + default: present + choices: [ 'present', 'absent'] +''' + +EXAMPLES = r''' +- name: Set the global parameter 'cluster_name' to a value of 'mq-cluster' (in quotes) + community.rabbitmq.rabbitmq_global_parameter: + name: cluster_name + value: "{{ 'mq-cluster' | to_json }}" + state: present +''' + +RETURN = r''' +name: + description: name of the global parameter being set + returned: success + type: str + sample: "cluster_name" +value: + description: value of the global parameter, as a JSON term + returned: changed + type: str + sample: "the-cluster-name" +''' + +import json +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqGlobalParameter(object): + def __init__(self, module, name, value, node): + self.module = module + self.name = name + self.value = value + self.node = node + + self._value = None + + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + def _exec(self, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmqctl, '-q', '-n', self.node] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.splitlines() + return list() + + def get(self): + global_parameters = [param for param in self._exec(['list_global_parameters'], True) if param.strip()] + + for idx, param_item in enumerate(global_parameters): + name, value = param_item.split('\t') + # RabbitMQ 3.8.x and above return table header, ignore it + if idx == 0 and name == 'name' and value == 'value': + continue + + if name == self.name: + self._value = json.loads(value) + return True + return False + + def set(self): + self._exec(['set_global_parameter', + self.name, + json.dumps(self.value)]) + + def delete(self): + self._exec(['clear_global_parameter', self.name]) + + def has_modifications(self): + return self.value != self._value + + +def main(): + arg_spec = dict( + name=dict(type='str', required=True), + value=dict(type='str', default=None), + state=dict(default='present', choices=['present', 'absent']), + node=dict(type='str', default='rabbit') + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + name = module.params['name'] + value = module.params['value'] + if isinstance(value, str): + value = json.loads(value) + state = module.params['state'] + node = module.params['node'] + + result = dict(changed=False) + rabbitmq_global_parameter = RabbitMqGlobalParameter(module, name, value, node) + + if rabbitmq_global_parameter.get(): + if state == 'absent': + rabbitmq_global_parameter.delete() + result['changed'] = True + else: + if rabbitmq_global_parameter.has_modifications(): + rabbitmq_global_parameter.set() + result['changed'] = True + elif state == 'present': + rabbitmq_global_parameter.set() + result['changed'] = True + + result['name'] = name + result['value'] = value + result['state'] = state + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_parameter.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_parameter.py new file mode 100644 index 00000000..5ecbd4ec --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_parameter.py @@ -0,0 +1,155 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_parameter +short_description: Manage RabbitMQ parameters +description: + - Manage dynamic, cluster-wide parameters for RabbitMQ +author: Chris Hoffman (@chrishoffman) +options: + component: + description: + - Name of the component of which the parameter is being set + type: str + required: true + name: + description: + - Name of the parameter being set + type: str + required: true + value: + description: + - Value of the parameter, as a JSON term + type: str + vhost: + description: + - vhost to apply access privileges. + type: str + default: / + node: + description: + - erlang node name of the rabbit we wish to configure + type: str + default: rabbit + state: + description: + - Specify if parameter is to be added or removed + type: str + default: present + choices: [ 'present', 'absent'] +''' + +EXAMPLES = r""" +- name: Set the federation parameter 'local_username' to a value of 'guest' (in quotes) + community.rabbitmq.rabbitmq_parameter: + component: federation + name: local-username + value: '"guest"' + state: present +""" +import json +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqParameter(object): + def __init__(self, module, component, name, value, vhost, node): + self.module = module + self.component = component + self.name = name + self.value = value + self.vhost = vhost + self.node = node + + self._value = None + + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + def _exec(self, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmqctl, '-q', '-n', self.node] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.strip().splitlines() + return list() + + def get(self): + parameters = [param for param in self._exec(['list_parameters', '-p', self.vhost], True) if param.strip()] + + for param_item in parameters: + component, name, value = param_item.split('\t') + + if component == self.component and name == self.name: + self._value = json.loads(value) + return True + return False + + def set(self): + self._exec(['set_parameter', + '-p', + self.vhost, + self.component, + self.name, + json.dumps(self.value)]) + + def delete(self): + self._exec(['clear_parameter', '-p', self.vhost, self.component, self.name]) + + def has_modifications(self): + return self.value != self._value + + +def main(): + arg_spec = dict( + component=dict(required=True), + name=dict(required=True), + value=dict(default=None), + vhost=dict(default='/'), + state=dict(default='present', choices=['present', 'absent']), + node=dict(default='rabbit') + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + component = module.params['component'] + name = module.params['name'] + value = module.params['value'] + if isinstance(value, str): + value = json.loads(value) + vhost = module.params['vhost'] + state = module.params['state'] + node = module.params['node'] + + result = dict(changed=False) + rabbitmq_parameter = RabbitMqParameter(module, component, name, value, vhost, node) + + if rabbitmq_parameter.get(): + if state == 'absent': + rabbitmq_parameter.delete() + result['changed'] = True + else: + if rabbitmq_parameter.has_modifications(): + rabbitmq_parameter.set() + result['changed'] = True + elif state == 'present': + rabbitmq_parameter.set() + result['changed'] = True + + result['component'] = component + result['name'] = name + result['vhost'] = vhost + result['state'] = state + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_plugin.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_plugin.py new file mode 100644 index 00000000..86a5b6ae --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_plugin.py @@ -0,0 +1,187 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_plugin +short_description: Manage RabbitMQ plugins +description: + - This module can be used to enable or disable RabbitMQ plugins. +author: + - Chris Hoffman (@chrishoffman) +options: + names: + description: + - Comma-separated list of plugin names. Also, accepts plugin name. + type: str + required: true + aliases: [name] + new_only: + description: + - Only enable missing plugins. + - Does not disable plugins that are not in the names list. + type: bool + default: false + state: + description: + - Specify if plugins are to be enabled or disabled. + type: str + default: enabled + choices: [enabled, disabled] + broker_state: + description: + - Specify whether the broker should be online or offline for the plugin change. + type: str + default: online + choices: [online, offline] + prefix: + description: + - Specify a custom install prefix to a Rabbit. + type: str +''' + +EXAMPLES = r''' +- name: Enables the rabbitmq_management plugin + community.rabbitmq.rabbitmq_plugin: + names: rabbitmq_management + state: enabled + +- name: Enable multiple rabbitmq plugins + community.rabbitmq.rabbitmq_plugin: + names: rabbitmq_management,rabbitmq_management_visualiser + state: enabled + +- name: Disable plugin + community.rabbitmq.rabbitmq_plugin: + names: rabbitmq_management + state: disabled + +- name: Enable every plugin in list with existing plugins + community.rabbitmq.rabbitmq_plugin: + names: rabbitmq_management,rabbitmq_management_visualiser,rabbitmq_shovel,rabbitmq_shovel_management + state: enabled + new_only: true + +- name: Enables the rabbitmq_peer_discovery_aws plugin without requiring a broker connection. + community.rabbitmq.rabbitmq_plugin: + names: rabbitmq_peer_discovery_aws_plugin + state: enabled + broker_state: offline +''' + +RETURN = r''' +enabled: + description: list of plugins enabled during task run + returned: always + type: list + sample: ["rabbitmq_management"] +disabled: + description: list of plugins disabled during task run + returned: always + type: list + sample: ["rabbitmq_management"] +''' + +import os +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqPlugins(object): + + def __init__(self, module): + self.module = module + bin_path = '' + if module.params['prefix']: + if os.path.isdir(os.path.join(module.params['prefix'], 'bin')): + bin_path = os.path.join(module.params['prefix'], 'bin') + elif os.path.isdir(os.path.join(module.params['prefix'], 'sbin')): + bin_path = os.path.join(module.params['prefix'], 'sbin') + else: + # No such path exists. + module.fail_json(msg="No binary folder in prefix %s" % module.params['prefix']) + + self._rabbitmq_plugins = os.path.join(bin_path, "rabbitmq-plugins") + else: + self._rabbitmq_plugins = module.get_bin_path('rabbitmq-plugins', True) + + def _exec(self, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmq_plugins] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.splitlines() + return list() + + def get_all(self): + list_output = self._exec(['list', '-E', '-m'], True) + plugins = [] + for plugin in list_output: + if not plugin: + break + plugins.append(plugin) + + return plugins + + def enable(self, name): + self._exec(['enable', "--%s" % self.module.params['broker_state'], name]) + + def disable(self, name): + self._exec(['disable', "--%s" % self.module.params['broker_state'], name]) + + +def main(): + arg_spec = dict( + names=dict(required=True, aliases=['name']), + new_only=dict(default='no', type='bool'), + state=dict(default='enabled', choices=['enabled', 'disabled']), + broker_state=dict(default='online', choices=['online', 'offline']), + prefix=dict(required=False, default=None) + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + result = dict() + names = module.params['names'].split(',') + new_only = module.params['new_only'] + state = module.params['state'] + + rabbitmq_plugins = RabbitMqPlugins(module) + enabled_plugins = rabbitmq_plugins.get_all() + + enabled = [] + disabled = [] + if state == 'enabled': + if not new_only: + for plugin in enabled_plugins: + if " " in plugin: + continue + if plugin not in names: + rabbitmq_plugins.disable(plugin) + disabled.append(plugin) + + for name in names: + if name not in enabled_plugins: + rabbitmq_plugins.enable(name) + enabled.append(name) + else: + for plugin in enabled_plugins: + if plugin in names: + rabbitmq_plugins.disable(plugin) + disabled.append(plugin) + + result['changed'] = len(enabled) > 0 or len(disabled) > 0 + result['enabled'] = enabled + result['disabled'] = disabled + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_policy.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_policy.py new file mode 100644 index 00000000..441a72ce --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_policy.py @@ -0,0 +1,253 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, John Dewey <john@dewey.ws> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_policy +short_description: Manage the state of policies in RabbitMQ +description: + - Manage the state of a policy in RabbitMQ. +author: John Dewey (@retr0h) +options: + name: + description: + - The name of the policy to manage. + type: str + required: true + vhost: + description: + - The name of the vhost to apply to. + type: str + default: / + apply_to: + description: + - What the policy applies to. Requires RabbitMQ 3.2.0 or later. + type: str + default: all + choices: [all, exchanges, queues] + pattern: + description: + - A regex of queues to apply the policy to. Required when + C(state=present). This option is no longer required as of Ansible 2.9. + type: str + required: false + default: null + tags: + description: + - A dict or string describing the policy. Required when + C(state=present). This option is no longer required as of Ansible 2.9. + type: dict + required: false + default: null + priority: + description: + - The priority of the policy. + type: str + default: '0' + node: + description: + - Erlang node name of the rabbit we wish to configure. + type: str + default: rabbit + state: + description: + - The state of the policy. + type: str + default: present + choices: [present, absent] +''' + +EXAMPLES = r''' +- name: ensure the default vhost contains the HA policy via a dict + community.rabbitmq.rabbitmq_policy: + name: HA + pattern: .* + args: + tags: + ha-mode: all + +- name: ensure the default vhost contains the HA policy + community.rabbitmq.rabbitmq_policy: + name: HA + pattern: .* + tags: + ha-mode: all +''' + +import json +import re +from ansible_collections.community.rabbitmq.plugins.module_utils.version import LooseVersion as Version + +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqPolicy(object): + + def __init__(self, module, name): + self._module = module + self._name = name + self._vhost = module.params['vhost'] + self._pattern = module.params['pattern'] + self._apply_to = module.params['apply_to'] + self._tags = module.params['tags'] + self._priority = module.params['priority'] + self._node = module.params['node'] + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + self._version = self._rabbit_version() + + def _exec(self, + args, + force_exec_in_check_mode=False, + split_lines=True, + add_vhost=True): + if (not self._module.check_mode + or (self._module.check_mode and force_exec_in_check_mode)): + cmd = [self._rabbitmqctl, '-q', '-n', self._node] + + if add_vhost: + args.insert(1, '-p') + args.insert(2, self._vhost) + + rc, out, err = self._module.run_command(cmd + args, check_rc=True) + if split_lines: + return out.splitlines() + + return out + return list() + + def _rabbit_version(self): + status = self._exec(['status'], True, False, False) + + # 3.7.x erlang style output + version_match = re.search('{rabbit,".*","(?P<version>.*)"}', status) + if version_match: + return Version(version_match.group('version')) + + # 3.8.x style ouput + version_match = re.search('RabbitMQ version: (?P<version>.*)', status) + if version_match: + return Version(version_match.group('version')) + + return None + + def _list_policies(self): + if self._version and self._version >= Version('3.7.9'): + # Remove first header line from policies list for version > 3.7.9 + return self._exec(['list_policies'], True)[1:] + + return self._exec(['list_policies'], True) + + def has_modifications(self): + if self._pattern is None or self._tags is None: + self._module.fail_json( + msg=('pattern and tags are required for ' + 'state=present')) + + if self._version and self._version >= Version('3.7.0'): + # Change fields order in rabbitmqctl output in version 3.7 + return not any( + self._policy_check(policy, apply_to_fno=3, pattern_fno=2) + for policy in self._list_policies()) + else: + return not any( + self._policy_check(policy) for policy in self._list_policies()) + + def should_be_deleted(self): + return any( + self._policy_check_by_name(policy) + for policy in self._list_policies()) + + def set(self): + args = ['set_policy'] + args.append(self._name) + args.append(self._pattern) + args.append(json.dumps(self._tags)) + args.append('--priority') + args.append(self._priority) + if self._apply_to != 'all': + args.append('--apply-to') + args.append(self._apply_to) + return self._exec(args) + + def clear(self): + return self._exec(['clear_policy', self._name]) + + def _policy_check(self, + policy, + name_fno=1, + apply_to_fno=2, + pattern_fno=3, + tags_fno=4, + priority_fno=5): + if not policy: + return False + + policy_data = policy.split('\t') + + policy_name = policy_data[name_fno] + apply_to = policy_data[apply_to_fno] + pattern = policy_data[pattern_fno].replace('\\\\', '\\') + + try: + tags = json.loads(policy_data[tags_fno]) + except json.decoder.JSONDecodeError: + tags = policy_data[tags_fno] + + priority = policy_data[priority_fno] + + return (policy_name == self._name and apply_to == self._apply_to + and tags == self._tags and priority == self._priority + and pattern == self._pattern) + + def _policy_check_by_name(self, policy): + if not policy: + return False + + policy_name = policy.split('\t')[1] + + return policy_name == self._name + + +def main(): + arg_spec = dict( + name=dict(required=True), + vhost=dict(default='/'), + pattern=dict(required=False, default=None), + apply_to=dict(default='all', choices=['all', 'exchanges', 'queues']), + tags=dict(type='dict', required=False, default=None), + priority=dict(default='0'), + node=dict(default='rabbit'), + state=dict(default='present', choices=['present', 'absent']), + ) + + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + name = module.params['name'] + state = module.params['state'] + rabbitmq_policy = RabbitMqPolicy(module, name) + + result = dict(changed=False, name=name, state=state) + + if state == 'present' and rabbitmq_policy.has_modifications(): + rabbitmq_policy.set() + result['changed'] = True + elif state == 'absent' and rabbitmq_policy.should_be_deleted(): + rabbitmq_policy.clear() + result['changed'] = True + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_publish.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_publish.py new file mode 100644 index 00000000..38b5a64b --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_publish.py @@ -0,0 +1,247 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# (c) 2018, John Imison <john+github@imison.net> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_publish +short_description: Publish a message to a RabbitMQ queue. +description: + - Publish a message on a RabbitMQ queue using a blocking connection. +options: + url: + description: + - An URL connection string to connect to the RabbitMQ server. + - I(url) and I(host)/I(port)/I(user)/I(pass)/I(vhost) are mutually exclusive, use either or but not both. + type: str + proto: + description: + - The protocol to use. + type: str + choices: [amqps, amqp] + host: + description: + - The RabbitMQ server hostname or IP. + type: str + port: + description: + - The RabbitMQ server port. + type: int + username: + description: + - The RabbitMQ username. + type: str + password: + description: + - The RabbitMQ password. + type: str + vhost: + description: + - The virtual host to target. + - If default vhost is required, use C('%2F'). + type: str + queue: + description: + - The queue to publish a message to. If no queue is specified, RabbitMQ will return a random queue name. + - A C(queue) cannot be provided if an C(exchange) is specified. + type: str + exchange: + description: + - The exchange to publish a message to. + - An C(exchange) cannot be provided if a C(queue) is specified. + type: str + routing_key: + description: + - The routing key. + type: str + body: + description: + - The body of the message. + - A C(body) cannot be provided if a C(src) is specified. + type: str + src: + description: + - A file to upload to the queue. Automatic mime type detection is attempted if content_type is not defined (left as default). + - A C(src) cannot be provided if a C(body) is specified. + - The filename is added to the headers of the posted message to RabbitMQ. Key being the C(filename), value is the filename. + type: path + aliases: ['file'] + content_type: + description: + - The content type of the body. + type: str + default: text/plain + durable: + description: + - Set the queue to be durable. + type: bool + default: False + exclusive: + description: + - Set the queue to be exclusive. + type: bool + default: False + auto_delete: + description: + - Set the queue to auto delete. + type: bool + default: False + headers: + description: + - A dictionary of headers to post with the message. + type: dict + default: {} + cafile: + description: + - CA file used during connection to the RabbitMQ server over SSL. + - If this option is specified, also I(certfile) and I(keyfile) must be specified. + type: str + certfile: + description: + - Client certificate to establish SSL connection. + - If this option is specified, also I(cafile) and I(keyfile) must be specified. + type: str + keyfile: + description: + - Client key to establish SSL connection. + - If this option is specified, also I(cafile) and I(certfile) must be specified. + type: str + + + +requirements: [ pika ] +notes: + - This module requires the pika python library U(https://pika.readthedocs.io/). + - Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. + - This module is tested against RabbitMQ. Other AMQP 0.9.1 protocol based servers may work but not tested/guaranteed. + - The certificate authentication was tested with certificates created + via U(https://www.rabbitmq.com/ssl.html#automated-certificate-generation) and RabbitMQ + configuration variables C(ssl_options.verify = verify_peer) & C(ssl_options.fail_if_no_peer_cert = true). +author: "John Imison (@Im0)" +''' + +EXAMPLES = r''' +- name: Publish to an exchange + community.rabbitmq.rabbitmq_publish: + exchange: exchange1 + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + body: "Hello exchange from ansible module rabbitmq_publish" + content_type: "text/plain" + +- name: Publish to an exchange with routing_key + community.rabbitmq.rabbitmq_publish: + exchange: exchange1 + routing_key: queue1 + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + body: "Hello queue via exchange routing_key from ansible module rabbitmq_publish" + content_type: "text/plain" + +- name: Publish a message to a queue with headers + community.rabbitmq.rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + queue: 'test' + body: "Hello world from ansible module rabbitmq_publish" + content_type: "text/plain" + headers: + myHeader: myHeaderValue + +- name: Publish a file to a queue + community.rabbitmq.rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + queue: 'images' + file: 'path/to/logo.gif' + +- name: RabbitMQ auto generated queue + community.rabbitmq.rabbitmq_publish: + url: "amqp://guest:guest@192.168.0.32:5672/%2F" + body: "Hello world random queue from ansible module rabbitmq_publish" + content_type: "text/plain" + +- name: Publish with certs + community.rabbitmq.rabbitmq_publish: + url: "amqps://guest:guest@192.168.0.32:5671/%2F" + body: "Hello test queue from ansible module rabbitmq_publish via SSL certs" + queue: 'test' + content_type: "text/plain" + cafile: 'ca_certificate.pem' + certfile: 'client_certificate.pem' + keyfile: 'client_key.pem' + +''' + +RETURN = r''' +result: + description: + - If posted to an exchange, the result contains the status I(msg), content type I(content_type) the exchange name I(exchange) + - and the routing key I(routing_key). + - If posted to a queue, the result contains the status I(msg), content type I(content_type) and the queue name I(queue). + returned: success + type: dict + sample: | + 'result': { 'content_type': 'text/plain', 'msg': 'Successfully published to queue test', 'queue': 'test' } +''' + +try: + import pika + HAS_PIKA = True +except ImportError: + HAS_PIKA = False + + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native, to_text +from ansible_collections.community.rabbitmq.plugins.module_utils.rabbitmq import RabbitClient + + +def main(): + argument_spec = RabbitClient.rabbitmq_argument_spec() + argument_spec.update( + exchange=dict(type='str'), + routing_key=dict(type='str', required=False, no_log=False), + body=dict(type='str', required=False), + src=dict(aliases=['file'], type='path', required=False), + content_type=dict(default="text/plain", type='str'), + durable=dict(default=False, type='bool'), + exclusive=dict(default=False, type='bool'), + auto_delete=dict(default=False, type='bool'), + headers=dict(default={}, type='dict'), + cafile=dict(type='str', required=False), + certfile=dict(type='str', required=False), + keyfile=dict(type='str', required=False, no_log=False), + ) + module = AnsibleModule( + argument_spec=argument_spec, + mutually_exclusive=[['body', 'src'], ['queue', 'exchange']], + required_together=[['cafile', 'certfile', 'keyfile']], + supports_check_mode=False + ) + + rabbitmq = RabbitClient(module) + + if rabbitmq.basic_publish(): + rabbitmq.close_connection() + if (rabbitmq.queue is not None): + module.exit_json(changed=True, result={"msg": "Successfully published to queue %s" % rabbitmq.queue, + "queue": rabbitmq.queue, "content_type": rabbitmq.content_type} + ) + elif (rabbitmq.exchange is not None): + module.exit_json(changed=True, result={"msg": "Successfully published to exchange %s" % rabbitmq.exchange, + "routing_key": rabbitmq.routing_key, "exchange": rabbitmq.exchange, "content_type": rabbitmq.content_type} + ) + + else: + rabbitmq.close_connection() + if (rabbitmq.queue is not None): + module.fail_json(changed=False, msg="Unsuccessful publishing to queue %s" % rabbitmq.queue) + elif (rabbitmq.exchange is not None): + module.fail_json(changed=False, msg="Unsuccessful publishing to exchange %s" % rabbitmq.exchange) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_queue.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_queue.py new file mode 100644 index 00000000..1290f471 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_queue.py @@ -0,0 +1,264 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2015, Manuel Sousa <manuel.sousa@gmail.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_queue +author: Manuel Sousa (@manuel-sousa) + +short_description: Manage rabbitMQ queues +description: + - This module uses rabbitMQ Rest API to create/delete queues. + - Due to limitations in the API, it cannot modify existing queues. +requirements: [ "requests >= 1.0.0" ] +options: + name: + description: + - Name of the queue. + type: str + required: true + state: + description: + - Whether the queue should be present or absent. + type: str + choices: [ "present", "absent" ] + default: present + durable: + description: + - whether queue is durable or not. + type: bool + default: true + auto_delete: + description: + - if the queue should delete itself after all queues/queues unbound from it. + type: bool + default: false + message_ttl: + description: + - How long a message can live in queue before it is discarded (milliseconds). + type: int + auto_expires: + description: + - How long a queue can be unused before it is automatically deleted (milliseconds). + type: int + max_length: + description: + - How many messages can the queue contain before it starts rejecting. + type: int + dead_letter_exchange: + description: + - Optional name of an exchange to which messages will be republished if they + - are rejected or expire. + type: str + dead_letter_routing_key: + description: + - Optional replacement routing key to use when a message is dead-lettered. + - Original routing key will be used if unset. + type: str + max_priority: + description: + - Maximum number of priority levels for the queue to support. + - If not set, the queue will not support message priorities. + - Larger numbers indicate higher priority. + type: int + arguments: + description: + - extra arguments for queue. If defined this argument is a key/value dictionary + - Arguments here take precedence over parameters. If both are defined, the + argument will be used. + type: dict + default: {} +extends_documentation_fragment: +- community.rabbitmq.rabbitmq + +''' + +EXAMPLES = r''' +- name: Create a queue + community.rabbitmq.rabbitmq_queue: + name: myQueue + +- name: Create a queue on remote host + community.rabbitmq.rabbitmq_queue: + name: myRemoteQueue + login_user: user + login_password: secret + login_host: remote.example.org + +# You may specify different types of queues using the arguments parameter. +- name: Create RabbitMQ stream + become: yes + community.rabbitmq.rabbitmq_queue: + name: test-x + arguments: + x-queue-type: stream + x-max-age: 24h +''' + +import json +import traceback + +REQUESTS_IMP_ERR = None +try: + import requests + HAS_REQUESTS = True +except ImportError: + REQUESTS_IMP_ERR = traceback.format_exc() + HAS_REQUESTS = False + +from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.six.moves.urllib import parse as urllib_parse +from ansible_collections.community.rabbitmq.plugins.module_utils.rabbitmq import rabbitmq_argument_spec + + +def check_if_arg_changed(module, current_args, desired_args, arg_name): + if arg_name not in current_args: + if arg_name in desired_args: + module.fail_json( + msg=("RabbitMQ RESTAPI doesn't support attribute changes for existing queues." + "Attempting to set %s which is not currently set." % arg_name), + ) + # else don't care + else: # arg_name in current_args + if arg_name in desired_args: + if current_args[arg_name] != desired_args[arg_name]: + module.fail_json( + msg=("RabbitMQ RESTAPI doesn't support attribute changes for existing queues.\n" + "Attempting to change %s from '%s' to '%s'" % (arg_name, current_args[arg_name], desired_args[arg_name])) + ) + else: + module.fail_json( + msg=("RabbitMQ RESTAPI doesn't support attribute changes for existing queues." + "Attempting to unset %s which is currently set to '%s'." % (arg_name, current_args[arg_name])), + ) + + +def main(): + + argument_spec = rabbitmq_argument_spec() + argument_spec.update( + dict( + state=dict(default='present', choices=['present', 'absent'], type='str'), + name=dict(required=True, type='str'), + durable=dict(default=True, type='bool'), + auto_delete=dict(default=False, type='bool'), + message_ttl=dict(default=None, type='int'), + auto_expires=dict(default=None, type='int'), + max_length=dict(default=None, type='int'), + dead_letter_exchange=dict(default=None, type='str'), + dead_letter_routing_key=dict(default=None, type='str', no_log=False), + arguments=dict(default=dict(), type='dict'), + max_priority=dict(default=None, type='int') + ) + ) + module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) + + url = "%s://%s:%s/api/queues/%s/%s" % ( + module.params['login_protocol'], + module.params['login_host'], + module.params['login_port'], + urllib_parse.quote(module.params['vhost'], ''), + urllib_parse.quote(module.params['name'], '') + ) + + if not HAS_REQUESTS: + module.fail_json(msg=missing_required_lib("requests"), exception=REQUESTS_IMP_ERR) + + result = dict(changed=False, name=module.params['name']) + + # Check if queue already exists + r = requests.get(url, auth=(module.params['login_user'], module.params['login_password']), + verify=module.params['ca_cert'], cert=(module.params['client_cert'], module.params['client_key'])) + + if r.status_code == 200: + queue_exists = True + response = r.json() + elif r.status_code == 404: + queue_exists = False + response = r.text + else: + module.fail_json( + msg="Invalid response from RESTAPI when trying to check if queue exists", + details=r.text + ) + + arg_map = { + 'message_ttl': 'x-message-ttl', + 'auto_expires': 'x-expires', + 'max_length': 'x-max-length', + 'dead_letter_exchange': 'x-dead-letter-exchange', + 'dead_letter_routing_key': 'x-dead-letter-routing-key', + 'max_priority': 'x-max-priority' + } + + # Sync arguments with parameters (the final request uses module.params['arguments']) + for k, v in arg_map.items(): + if module.params[k] is not None: + module.params['arguments'][v] = module.params[k] + + if module.params['state'] == 'present': + add_or_delete_required = not queue_exists + else: + add_or_delete_required = queue_exists + + # Check if attributes change on existing queue + if not add_or_delete_required and r.status_code == 200 and module.params['state'] == 'present': + check_if_arg_changed(module, response, module.params, 'durable') + check_if_arg_changed(module, response, module.params, 'auto_delete') + + for arg in arg_map.values(): + check_if_arg_changed(module, response['arguments'], module.params['arguments'], arg) + + # Exit if check_mode + if module.check_mode: + result['changed'] = add_or_delete_required + result['details'] = response + result['arguments'] = module.params['arguments'] + module.exit_json(**result) + + # Do changes + if add_or_delete_required: + if module.params['state'] == 'present': + r = requests.put( + url, + auth=(module.params['login_user'], module.params['login_password']), + headers={"content-type": "application/json"}, + data=json.dumps({ + "durable": module.params['durable'], + "auto_delete": module.params['auto_delete'], + "arguments": module.params['arguments'] + }), + verify=module.params['ca_cert'], + cert=(module.params['client_cert'], module.params['client_key']) + ) + elif module.params['state'] == 'absent': + r = requests.delete(url, auth=(module.params['login_user'], module.params['login_password']), + verify=module.params['ca_cert'], cert=(module.params['client_cert'], module.params['client_key'])) + + # RabbitMQ 3.6.7 changed this response code from 204 to 201 + if r.status_code == 204 or r.status_code == 201: + result['changed'] = True + module.exit_json(**result) + else: + module.fail_json( + msg="Error creating queue", + status=r.status_code, + details=r.text + ) + + else: + module.exit_json( + changed=False, + name=module.params['name'] + ) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_upgrade.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_upgrade.py new file mode 100644 index 00000000..e44d7e09 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_upgrade.py @@ -0,0 +1,127 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Damian Dabrowski <damian@dabrowski.cloud> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: rabbitmq_upgrade +short_description: Execute rabbitmq-upgrade commands +description: + - Allows to execute rabbitmq-upgrade commands +author: "Damian Dabrowski (@damiandabrowski5)" +version_added: '1.1.0' +options: + action: + description: + - Specify action to be executed. + type: str + required: true + choices: ['await_online_quorum_plus_one','await_online_synchronized_mirror','post_upgrade','drain','revive'] + node: + description: + - Erlang node name of the target rabbit node. + type: str + required: false + default: rabbit +''' + +EXAMPLES = r''' +- name: Drain 'rabbit@node-1' node (in other words, put it into maintenance mode) + community.rabbitmq.rabbitmq_upgrade: + action: drain + node: rabbit@node-1 +''' + +import json +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqUpgrade(object): + + def __init__(self, module, action, node, result): + self.module = module + self.action = action + self.node = node + self.result = result + + def _exec(self, binary, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self.module.get_bin_path(binary, True)] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.splitlines() + return list() + + def is_node_under_maintenance(self): + cmd = self._exec('rabbitmq-diagnostics', + ['--formatter', 'json', 'status', '-n', self.node], True) + node_status = json.loads("".join(cmd)) + maint_enabled = node_status['is_under_maintenance'] + if maint_enabled: + return True + return False + + def is_maint_flag_enabled(self): + feature_flags = self._exec('rabbitmqctl', ['list_feature_flags', '-q'], True) + for param_item in feature_flags: + name, state = param_item.split('\t') + if name == 'maintenance_mode_status' and state == 'enabled': + return True + return False + + def drain(self): + if not self.is_maint_flag_enabled(): + self.module.fail_json(msg='maintenance_mode_status feature_flag is disabled.') + if not self.is_node_under_maintenance(): + self._exec('rabbitmq-upgrade', ['drain', '-n', self.node]) + self.result['changed'] = True + + def revive(self): + if not self.is_maint_flag_enabled(): + self.module.fail_json(msg='maintenance_mode_status feature_flag is disabled.') + if self.is_node_under_maintenance(): + self._exec('rabbitmq-upgrade', ['revive', '-n', self.node]) + self.result['changed'] = True + + def await_online_quorum_plus_one(self): + self._exec('rabbitmq-upgrade', ['await_online_quorum_plus_one']) + self.result['changed'] = True + + def await_online_synchronized_mirror(self): + self._exec('rabbitmq-upgrade', ['await_online_synchronized_mirror']) + self.result['changed'] = True + + def post_upgrade(self): + self._exec('rabbitmq-upgrade', ['post_upgrade']) + self.result['changed'] = True + + +def main(): + arg_spec = dict( + action=dict( + choices=['await_online_quorum_plus_one', 'await_online_synchronized_mirror', 'post_upgrade', 'drain', 'revive'], + required=True), + node=dict(type='str', default='rabbit') + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + action = module.params['action'] + node = module.params['node'] + result = dict(changed=False) + + rabbitmq_upgrade = RabbitMqUpgrade(module, action, node, result) + + getattr(rabbitmq_upgrade, action)() + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user.py new file mode 100644 index 00000000..87d6864f --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user.py @@ -0,0 +1,567 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_user +short_description: Manage RabbitMQ users +description: + - Add or remove users to RabbitMQ and assign permissions +author: Chris Hoffman (@chrishoffman) +options: + user: + description: + - Name of user to add + type: str + required: true + aliases: [username, name] + password: + description: + - Password of user to add. + - To change the password of an existing user, you must also specify + C(update_password=always). + type: str + tags: + description: + - User tags specified as comma delimited + type: str + permissions: + description: + - a list of dicts, each dict contains vhost, configure_priv, write_priv, and read_priv, + and represents a permission rule for that vhost. + - This option should be preferable when you care about all permissions of the user. + - You should use vhost, configure_priv, write_priv, and read_priv options instead + if you care about permissions for just some vhosts. + type: list + elements: dict + default: [] + vhost: + description: + - vhost to apply access privileges. + - This option will be ignored when permissions option is used. + type: str + default: / + node: + description: + - erlang node name of the rabbit we wish to configure + type: str + default: rabbit + configure_priv: + description: + - Regular expression to restrict configure actions on a resource + for the specified vhost. + - By default all actions are restricted. + - This option will be ignored when permissions option is used. + type: str + default: '^$' + write_priv: + description: + - Regular expression to restrict configure actions on a resource + for the specified vhost. + - By default all actions are restricted. + - This option will be ignored when permissions option is used. + type: str + default: '^$' + read_priv: + description: + - Regular expression to restrict configure actions on a resource + for the specified vhost. + - By default all actions are restricted. + - This option will be ignored when permissions option is used. + type: str + default: '^$' + topic_permissions: + description: + - A list of dicts, each dict contains vhost, exchange, read_priv and write_priv, + and represents a topic permission rule for that vhost. + - By default vhost is C(/) and exchange is C(amq.topic). + - Supported since RabbitMQ 3.7.0. If RabbitMQ is older and topic_permissions are + set, the module will fail. + type: list + elements: dict + default: [] + version_added: '1.2.0' + force: + description: + - Deletes and recreates the user. + type: bool + default: false + state: + description: + - Specify if user is to be added or removed + type: str + default: present + choices: ['present', 'absent'] + update_password: + description: + - C(on_create) will only set the password for newly created users. C(always) will update passwords if they differ. + type: str + required: false + default: on_create + choices: ['on_create', 'always'] +''' + +EXAMPLES = r''' +- name: |- + Add user to server and assign full access control on / vhost. + The user might have permission rules for other vhost but you don't care. + community.rabbitmq.rabbitmq_user: + user: joe + password: changeme + vhost: / + configure_priv: .* + read_priv: .* + write_priv: .* + state: present + +- name: |- + Add user to server and assign full access control on / vhost. + The user doesn't have permission rules for other vhosts + community.rabbitmq.rabbitmq_user: + user: joe + password: changeme + permissions: + - vhost: / + configure_priv: .* + read_priv: .* + write_priv: .* + state: present + +- name: |- + Add user to server and assign some topic permissions on / vhost. + The user doesn't have topic permission rules for other vhosts + community.rabbitmq.rabbitmq_user: + user: joe + password: changeme + topic_permissions: + - vhost: / + exchange: amq.topic + read_priv: .* + write_priv: 'prod\\.logging\\..*' + state: present +''' + +import ansible_collections.community.rabbitmq.plugins.module_utils.version as Version +import json +import re + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.collections import count + + +def normalized_permissions(vhost_permission_list): + """Older versions of RabbitMQ output permissions with slightly different names. + + In older versions of RabbitMQ, the names of the permissions had the `_priv` suffix, which was removed in versions + >= 3.7.6. For simplicity we only check the `configure` permission. If it's in the old format then all the other + ones will be wrong too. + """ + for vhost_permission in vhost_permission_list: + if 'configure_priv' in vhost_permission: + yield { + 'configure': vhost_permission['configure_priv'], + 'read': vhost_permission['read_priv'], + 'write': vhost_permission['write_priv'], + 'vhost': vhost_permission['vhost'] + } + else: + yield vhost_permission + + +def as_permission_dict(vhost_permission_list): + return dict([(vhost_permission['vhost'], vhost_permission) for vhost_permission + in normalized_permissions(vhost_permission_list)]) + + +def as_topic_permission_dict(topic_permission_list): + return dict([((perm['vhost'], perm['exchange']), perm) for perm + in topic_permission_list]) + + +def only(vhost, vhost_permissions): + return {vhost: vhost_permissions.get(vhost, {})} + + +def first(iterable): + return next(iter(iterable)) + + +class RabbitMqUser(object): + def __init__(self, module, username, password, tags, permissions, + topic_permissions, node, bulk_permissions=False): + self.module = module + self.username = username + self.password = password or '' + self.node = node + self.tags = list() if not tags else tags.replace(' ', '').split(',') + self.permissions = as_permission_dict(permissions) + self.topic_permissions = as_topic_permission_dict(topic_permissions) + self.bulk_permissions = bulk_permissions + + self.existing_tags = None + self.existing_permissions = dict() + self.existing_topic_permissions = dict() + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + self._version = self._check_version() + + def _check_version(self): + """Get the version of the RabbitMQ server.""" + version = self._rabbitmq_version_post_3_7(fail_on_error=False) + if not version: + version = self._rabbitmq_version_pre_3_7(fail_on_error=False) + if not version: + self.module.fail_json(msg="Could not determine the version of the RabbitMQ server.") + return version + + def _fail(self, msg, stop_execution=False): + if stop_execution: + self.module.fail_json(msg=msg) + # This is a dummy return to prevent linters from throwing errors. + return None + + def _rabbitmq_version_post_3_7(self, fail_on_error=False): + """Use the JSON formatter to get a machine readable output of the version. + + At this point we do not know which RabbitMQ server version we are dealing with and which + version of `rabbitmqctl` we are using, so we will try to use the JSON formatter and see + what happens. In some versions of + """ + def int_list_to_str(ints): + return ''.join([chr(i) for i in ints]) + + rc, output, err = self._exec(['status', '--formatter', 'json'], check_rc=False) + if rc != 0: + return self._fail(msg="Could not parse the version of the RabbitMQ server, " + "because `rabbitmqctl status` returned no output.", + stop_execution=fail_on_error) + try: + status_json = json.loads(output) + if 'rabbitmq_version' in status_json: + return Version.StrictVersion(status_json['rabbitmq_version']) + for application in status_json.get('running_applications', list()): + if application[0] == 'rabbit': + if isinstance(application[1][0], int): + return Version.StrictVersion(int_list_to_str(application[2])) + else: + return Version.StrictVersion(application[1]) + return self._fail(msg="Could not find RabbitMQ version of `rabbitmqctl status` command.", + stop_execution=fail_on_error) + except ValueError as e: + return self._fail(msg="Could not parse output of `rabbitmqctl status` as JSON: {exc}.".format(exc=repr(e)), + stop_execution=fail_on_error) + + def _rabbitmq_version_pre_3_7(self, fail_on_error=False): + """Get the version of the RabbitMQ Server. + + Before version 3.7.6 the `rabbitmqctl` utility did not support the + `--formatter` flag, so the output has to be parsed using regular expressions. + """ + version_reg_ex = r"{rabbit,\"RabbitMQ\",\"([0-9]+\.[0-9]+\.[0-9]+)\"}" + rc, output, err = self._exec(['status'], check_rc=False) + if rc != 0: + if fail_on_error: + self.module.fail_json(msg="Could not parse the version of the RabbitMQ server, because" + " `rabbitmqctl status` returned no output.") + else: + return None + reg_ex_res = re.search(version_reg_ex, output, re.IGNORECASE) + if not reg_ex_res: + return self._fail(msg="Could not parse the version of the RabbitMQ server from the output of " + "`rabbitmqctl status` command: {output}.".format(output=output), + stop_execution=fail_on_error) + try: + return Version.StrictVersion(reg_ex_res.group(1)) + except ValueError as e: + return self._fail(msg="Could not parse the version of the RabbitMQ server: {exc}.".format(exc=repr(e)), + stop_execution=fail_on_error) + + def _exec(self, args, check_rc=True): + """Execute a command using the `rabbitmqctl` utility. + + By default the _exec call will cause the module to fail, if the error code is non-zero. If the `check_rc` + flag is set to False, then the exit_code, stdout and stderr will be returned to the calling function to + perform whatever error handling it needs. + + :param args: the arguments to pass to the `rabbitmqctl` utility + :param check_rc: when set to True, fail if the utility's exit code is non-zero + :return: the output of the command or all the outputs plus the error code in case of error + """ + cmd = [self._rabbitmqctl, '-q'] + if self.node: + cmd.extend(['-n', self.node]) + rc, out, err = self.module.run_command(cmd + args) + if check_rc and rc != 0: + # check_rc is not passed to the `run_command` method directly to allow for more fine grained checking of + # error messages returned by `rabbitmqctl`. + user_error_msg_regex = r"(Only root or .* .* run rabbitmqctl)" + user_error_msg = re.search(user_error_msg_regex, out) + if user_error_msg: + self.module.fail_json(msg="Wrong user used to run the `rabbitmqctl` utility: {err}" + .format(err=user_error_msg.group(1))) + else: + self.module.fail_json(msg="rabbitmqctl exited with non-zero code: {err}".format(err=err), + rc=rc, stdout=out) + return out if check_rc else (rc, out, err) + + def get(self): + """Retrieves the list of registered users from the node. + + If the user is already present, the node will also be queried for the user's permissions and topic + permissions. + If the version of the node is >= 3.7.6 the JSON formatter will be used, otherwise the plaintext will be + parsed. + """ + if self._version >= Version.StrictVersion('3.7.6'): + users = dict([(user_entry['user'], user_entry['tags']) + for user_entry in json.loads(self._exec(['list_users', '--formatter', 'json']))]) + else: + users = self._exec(['list_users']) + + def process_tags(tags): + if not tags: + return list() + return tags.replace('[', '').replace(']', '').replace(' ', '').strip('\t').split(',') + + users_and_tags = [user_entry.split('\t') for user_entry in users.strip().split('\n')] + + users = dict() + for user_parts in users_and_tags: + users[user_parts[0]] = process_tags(user_parts[1]) if len(user_parts) > 1 else [] + + self.existing_tags = users.get(self.username, list()) + self.existing_permissions = self._get_permissions() if self.username in users else dict() + self.existing_topic_permissions = self._get_topic_permissions() if self.username in users else dict() + return self.username in users + + def _get_permissions(self): + """Get permissions of the user from RabbitMQ.""" + if self._version >= Version.StrictVersion('3.7.6'): + permissions = json.loads(self._exec(['list_user_permissions', self.username, '--formatter', 'json'])) + else: + output = self._exec(['list_user_permissions', self.username]).strip().split('\n') + perms_out = [perm.split('\t') for perm in output if perm.strip()] + # Filter out headers from the output of the command in case they are still present + perms_out = [perm for perm in perms_out if perm != ["vhost", "configure", "write", "read"]] + + permissions = list() + for vhost, configure, write, read in perms_out: + permissions.append(dict(vhost=vhost, configure=configure, write=write, read=read)) + + if self.bulk_permissions: + return as_permission_dict(permissions) + else: + return only(first(self.permissions.keys()), as_permission_dict(permissions)) + + def _get_topic_permissions(self): + """Get topic permissions of the user from RabbitMQ.""" + if self._version < Version.StrictVersion('3.7.0'): + return dict() + if self._version >= Version.StrictVersion('3.7.6'): + permissions = json.loads(self._exec(['list_user_topic_permissions', self.username, '--formatter', 'json'])) + else: + output = self._exec(['list_user_topic_permissions', self.username]).strip().split('\n') + perms_out = [perm.split('\t') for perm in output if perm.strip()] + permissions = list() + for vhost, exchange, write, read in perms_out: + permissions.append(dict(vhost=vhost, exchange=exchange, write=write, read=read)) + return as_topic_permission_dict(permissions) + + def check_password(self): + """Return `True` if the user can authenticate successfully.""" + rc, out, err = self._exec(['authenticate_user', self.username, self.password], check_rc=False) + return rc == 0 + + def add(self): + self._exec(['add_user', self.username, self.password or '']) + if not self.password: + self._exec(['clear_password', self.username]) + + def delete(self): + self._exec(['delete_user', self.username]) + + def change_password(self): + if self.password: + self._exec(['change_password', self.username, self.password]) + else: + self._exec(['clear_password', self.username]) + + def set_tags(self): + self._exec(['set_user_tags', self.username] + self.tags) + + def set_permissions(self): + permissions_to_add = list() + for vhost, permission_dict in self.permissions.items(): + if permission_dict != self.existing_permissions.get(vhost, {}): + permissions_to_add.append(permission_dict) + + permissions_to_clear = list() + for vhost in self.existing_permissions.keys(): + if vhost not in self.permissions: + permissions_to_clear.append(vhost) + + for vhost in permissions_to_clear: + cmd = 'clear_permissions -p {vhost} {username}'.format(username=self.username, vhost=vhost) + self._exec(cmd.split(' ')) + for permissions in permissions_to_add: + cmd = ('set_permissions -p {vhost} {username} {configure} {write} {read}' + .format(username=self.username, **permissions)) + self._exec(cmd.split(' ')) + self.existing_permissions = self._get_permissions() + + def set_topic_permissions(self): + permissions_to_add = list() + for vhost_exchange, permission_dict in self.topic_permissions.items(): + if permission_dict != self.existing_topic_permissions.get(vhost_exchange, {}): + permissions_to_add.append(permission_dict) + + permissions_to_clear = list() + for vhost_exchange in self.existing_topic_permissions.keys(): + if vhost_exchange not in self.topic_permissions: + permissions_to_clear.append(vhost_exchange) + + for vhost_exchange in permissions_to_clear: + vhost, exchange = vhost_exchange + cmd = ('clear_topic_permissions -p {vhost} {username} {exchange}' + .format(username=self.username, vhost=vhost, exchange=exchange)) + self._exec(cmd.split(' ')) + for permissions in permissions_to_add: + cmd = ('set_topic_permissions -p {vhost} {username} {exchange} {write} {read}' + .format(username=self.username, **permissions)) + self._exec(cmd.split(' ')) + self.existing_topic_permissions = self._get_topic_permissions() + + def has_tags_modifications(self): + return set(self.tags) != set(self.existing_tags) + + def has_permissions_modifications(self): + return self.existing_permissions != self.permissions + + def has_topic_permissions_modifications(self): + return self.existing_topic_permissions != self.topic_permissions + + +def main(): + arg_spec = dict( + user=dict(required=True, aliases=['username', 'name']), + password=dict(default=None, no_log=True), + tags=dict(default=None), + permissions=dict(default=list(), type='list', elements='dict'), + vhost=dict(default='/'), + configure_priv=dict(default='^$'), + write_priv=dict(default='^$'), + read_priv=dict(default='^$'), + topic_permissions=dict(default=list(), type='list', elements='dict'), + force=dict(default='no', type='bool'), + state=dict(default='present', choices=['present', 'absent']), + node=dict(default='rabbit'), + update_password=dict(default='on_create', choices=['on_create', 'always'], no_log=False) + ) + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=False + ) + + username = module.params['user'] + password = module.params['password'] + tags = module.params['tags'] + permissions = module.params['permissions'] + vhost = module.params['vhost'] + configure_priv = module.params['configure_priv'] + write_priv = module.params['write_priv'] + read_priv = module.params['read_priv'] + topic_permissions = module.params['topic_permissions'] + force = module.params['force'] + state = module.params['state'] + node = module.params['node'] + update_password = module.params['update_password'] + + if permissions: + vhosts = [permission.get('vhost', '/') for permission in permissions] + if any(vhost_count > 1 for vhost_count in count(vhosts).values()): + module.fail_json(msg="Error parsing vhost permissions: You can't " + "have two permission dicts for the same vhost") + bulk_permissions = True + else: + perm = { + 'vhost': vhost, + 'configure_priv': configure_priv, + 'write_priv': write_priv, + 'read_priv': read_priv + } + permissions.append(perm) + bulk_permissions = False + + if topic_permissions: + vhost_exchanges = [ + (permission.get('vhost', '/'), permission.get('exchange')) + for permission in topic_permissions + ] + if any(ve_count > 1 for ve_count in count(vhost_exchanges).values()): + module.fail_json(msg="Error parsing vhost topic_permissions: You can't " + "have two topic permission dicts for the same vhost " + "and the same exchange") + + for permission in permissions: + if not permission['vhost']: + module.fail_json(msg="Error parsing vhost permissions: You can't" + "have an empty vhost when setting permissions") + + for permission in topic_permissions: + permission.setdefault('vhost', '/') + permission.setdefault('exchange', 'amq.topic') + # Normalize the arguments + for perm_name in ("read", "write"): + suffixed_perm_name = "{perm_name}_priv".format(perm_name=perm_name) + if suffixed_perm_name in permission: + permission[perm_name] = permission.pop(suffixed_perm_name) + + rabbitmq_user = RabbitMqUser(module, username, password, tags, permissions, + topic_permissions, node, + bulk_permissions=bulk_permissions) + + result = dict(changed=False, user=username, state=state) + if rabbitmq_user.get(): + if state == 'absent': + rabbitmq_user.delete() + result['changed'] = True + else: + if force: + rabbitmq_user.delete() + rabbitmq_user.add() + rabbitmq_user.get() + result['changed'] = True + elif update_password == 'always': + if not rabbitmq_user.check_password(): + rabbitmq_user.change_password() + result['changed'] = True + + if rabbitmq_user.has_tags_modifications(): + rabbitmq_user.set_tags() + result['changed'] = True + + if rabbitmq_user.has_permissions_modifications(): + rabbitmq_user.set_permissions() + result['changed'] = True + + if rabbitmq_user.has_topic_permissions_modifications(): + rabbitmq_user.set_topic_permissions() + result['changed'] = True + elif state == 'present': + rabbitmq_user.add() + rabbitmq_user.set_tags() + rabbitmq_user.set_permissions() + rabbitmq_user.set_topic_permissions() + result['changed'] = True + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user_limits.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user_limits.py new file mode 100644 index 00000000..9a92ad3f --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_user_limits.py @@ -0,0 +1,217 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: rabbitmq_user_limits +short_description: Manage RabbitMQ user limits +description: + - Manage the state of user limits in RabbitMQ. Supported since RabbitMQ version 3.8.10. +author: Aitor Pazos (@aitorpazos) +version_added: '1.1.0' +options: + user: + description: + - Name of user to manage limits for. + type: str + required: true + aliases: [username, name] + max_connections: + description: + - Max number of concurrent client connections. + - Negative value means "no limit". + - Ignored when the I(state) is C(absent). + type: int + default: -1 + max_channels: + description: + - Max number of channels. + - Negative value means "no limit". + - Ignored when the I(state) is C(absent). + type: int + default: -1 + node: + description: + - Name of the RabbitMQ Erlang node to manage. + type: str + state: + description: + - Specify whether the limits are to be set or cleared. + - If set to C(absent), the limits of both I(max_connections) and I(max_channels) will be cleared. + type: str + default: present + choices: [present, absent] +notes: + - Supports C(check_mode). +''' + +EXAMPLES = r''' +- name: Limit both of the max number of connections and channels on the user 'guest'. + community.rabbitmq.rabbitmq_user_limits: + user: guest + max_connections: 64 + max_channels: 256 + state: present + +# This task implicitly clears the max number of channels limit using default value: -1. +- name: Limit the max number of connections on the user 'guest'. + community.rabbitmq.rabbitmq_user_limits: + user: guest + max_connections: 64 + state: present + +- name: Clear the limits on the user 'guest'. + community.rabbitmq.rabbitmq_user_limits: + user: guest + state: absent +''' + +RETURN = r''' # ''' + +import json +import re +from ansible_collections.community.rabbitmq.plugins.module_utils.version import LooseVersion as Version +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqUserLimits(object): + def __init__(self, module): + self._module = module + self._max_connections = module.params['max_connections'] + self._max_channels = module.params['max_channels'] + self._node = module.params['node'] + self._state = module.params['state'] + self._user = module.params['user'] + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + self._version = self._rabbit_version() + + def _exec(self, + args, + force_exec_in_check_mode=False): + + if not self._module.check_mode or (self._module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmqctl, '-q'] + if self._node is not None: + cmd.extend(['-n', self._node]) + rc, out, err = self._module.run_command(cmd + args, check_rc=True) + + return out + return "" + + def _rabbit_version(self): + status = self._exec(['status'], True) + + # 3.7.x erlang style output + version_match = re.search('{rabbit,".*","(?P<version>.*)"}', status) + if version_match: + return Version(version_match.group('version')) + + # 3.8.x style output + version_match = re.search('RabbitMQ version: (?P<version>.*)', status) + if version_match: + return Version(version_match.group('version')) + + return None + + def _assert_version(self): + if self._version and self._version < Version('3.8.10'): + self._module.fail_json(changed=False, + msg="User limits are only available for RabbitMQ >= 3.8.10. Detected version: %s" % self._version) + + def list(self): + self._assert_version() + + exec_result = self._exec(['list_user_limits', '--user', self._user], False) + max_connections = None + max_channels = None + if exec_result: + user_limits = json.loads(exec_result) + if 'max-connections' in user_limits: + max_connections = user_limits['max-connections'] + if 'max-channels' in user_limits: + max_channels = user_limits['max-channels'] + return dict( + max_connections=max_connections, + max_channels=max_channels + ) + + def set(self): + self._assert_version() + + if self._module.check_mode: + return + + if self._max_connections != -1: + json_str = '{{"max-connections": {0}}}'.format(self._max_connections) + self._exec(['set_user_limits', self._user, json_str]) + else: + self._exec(['clear_user_limits', self._user, "max-connections"]) + + if self._max_channels != -1: + json_str = '{{"max-channels": {0}}}'.format(self._max_channels) + self._exec(['set_user_limits', self._user, json_str]) + else: + self._exec(['clear_user_limits', self._user, "max-channels"]) + + def clear(self): + self._assert_version() + + if self._module.check_mode: + return + + return self._exec(['clear_user_limits', self._user, 'all']) + + +def main(): + arg_spec = dict( + user=dict(required=True, type='str', aliases=['username', 'name']), + max_connections=dict(default=-1, type='int'), + max_channels=dict(default=-1, type='int'), + state=dict(default='present', choices=['present', 'absent'], type='str'), + node=dict(default=None, type='str') + ) + + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + max_connections = module.params['max_connections'] + max_channels = module.params['max_channels'] + state = module.params['state'] + + module_result = dict(changed=False) + rabbitmq_user_limits = RabbitMqUserLimits(module) + current_status = rabbitmq_user_limits.list() + + if state == 'present': + wanted_status = dict( + max_connections=max_connections, + max_channels=max_channels + ) + else: # state == 'absent' + wanted_status = dict( + max_connections=None, + max_channels=None + ) + + if current_status != wanted_status: + module_result['changed'] = True + if state == 'present': + rabbitmq_user_limits.set() + else: # state == 'absent' + rabbitmq_user_limits.clear() + + module.exit_json(**module_result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost.py new file mode 100644 index 00000000..cfcecc6a --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost.py @@ -0,0 +1,142 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2013, Chatham Financial <oss@chathamfinancial.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_vhost +short_description: Manage the state of a virtual host in RabbitMQ +description: + - Manage the state of a virtual host in RabbitMQ +author: Chris Hoffman (@chrishoffman) +options: + name: + description: + - The name of the vhost to manage + type: str + required: true + aliases: [vhost] + node: + description: + - erlang node name of the rabbit we wish to configure + type: str + default: rabbit + tracing: + description: + - Enable/disable tracing for a vhost + type: bool + default: false + aliases: [trace] + state: + description: + - The state of vhost + type: str + default: present + choices: [present, absent] +''' + +EXAMPLES = r''' +- name: Ensure that the vhost /test exists. + community.rabbitmq.rabbitmq_vhost: + name: /test + state: present +''' + +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqVhost(object): + def __init__(self, module, name, tracing, node): + self.module = module + self.name = name + self.tracing = tracing + self.node = node + + self._tracing = False + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + def _exec(self, args, force_exec_in_check_mode=False): + if not self.module.check_mode or (self.module.check_mode and force_exec_in_check_mode): + cmd = [self._rabbitmqctl, '-q', '-n', self.node] + rc, out, err = self.module.run_command(cmd + args, check_rc=True) + return out.splitlines() + return list() + + def get(self): + vhosts = self._exec(['list_vhosts', 'name', 'tracing'], True) + + for vhost in vhosts: + if '\t' not in vhost: + continue + + name, tracing = vhost.split('\t') + if name == self.name: + self._tracing = self.module.boolean(tracing) + return True + return False + + def add(self): + return self._exec(['add_vhost', self.name]) + + def delete(self): + return self._exec(['delete_vhost', self.name]) + + def set_tracing(self): + if self.tracing != self._tracing: + if self.tracing: + self._enable_tracing() + else: + self._disable_tracing() + return True + return False + + def _enable_tracing(self): + return self._exec(['trace_on', '-p', self.name]) + + def _disable_tracing(self): + return self._exec(['trace_off', '-p', self.name]) + + +def main(): + arg_spec = dict( + name=dict(required=True, aliases=['vhost']), + tracing=dict(default='off', aliases=['trace'], type='bool'), + state=dict(default='present', choices=['present', 'absent']), + node=dict(default='rabbit'), + ) + + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + name = module.params['name'] + tracing = module.params['tracing'] + state = module.params['state'] + node = module.params['node'] + result = dict(changed=False, name=name, state=state) + rabbitmq_vhost = RabbitMqVhost(module, name, tracing, node) + + if rabbitmq_vhost.get(): + if state == 'absent': + rabbitmq_vhost.delete() + result['changed'] = True + else: + if rabbitmq_vhost.set_tracing(): + result['changed'] = True + elif state == 'present': + rabbitmq_vhost.add() + rabbitmq_vhost.set_tracing() + result['changed'] = True + + module.exit_json(**result) + + +if __name__ == '__main__': + main() diff --git a/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost_limits.py b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost_limits.py new file mode 100644 index 00000000..e2f36960 --- /dev/null +++ b/ansible_collections/community/rabbitmq/plugins/modules/rabbitmq_vhost_limits.py @@ -0,0 +1,174 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2018, Hiroyuki Matsuo <h.matsuo.engineer@gmail.com> +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: rabbitmq_vhost_limits +author: Hiroyuki Matsuo (@h-matsuo) + +short_description: Manage the state of virtual host limits in RabbitMQ +description: + - This module sets/clears certain limits on a virtual host. + - The configurable limits are I(max_connections) and I(max-queues). + +options: + max_connections: + description: + - Max number of concurrent client connections. + - Negative value means "no limit". + - Ignored when the I(state) is C(absent). + type: int + default: -1 + max_queues: + description: + - Max number of queues. + - Negative value means "no limit". + - Ignored when the I(state) is C(absent). + type: int + default: -1 + node: + description: + - Name of the RabbitMQ Erlang node to manage. + type: str + state: + description: + - Specify whether the limits are to be set or cleared. + - If set to C(absent), the limits of both I(max_connections) and I(max-queues) will be cleared. + type: str + default: present + choices: [present, absent] + vhost: + description: + - Name of the virtual host to manage. + type: str + default: / +''' + +EXAMPLES = r''' +- name: Limit both of the max number of connections and queues on the vhost '/'. + community.rabbitmq.rabbitmq_vhost_limits: + vhost: / + max_connections: 64 + max_queues: 256 + state: present + +- name: |- + Limit the max number of connections on the vhost '/'. + This task implicitly clears the max number of queues limit using default value: -1. + community.rabbitmq.rabbitmq_vhost_limits: + vhost: / + max_connections: 64 + state: present + +- name: Clear the limits on the vhost '/'. + community.rabbitmq.rabbitmq_vhost_limits: + vhost: / + state: absent +''' + +RETURN = r''' # ''' + + +import json +from ansible.module_utils.basic import AnsibleModule + + +class RabbitMqVhostLimits(object): + def __init__(self, module): + self._module = module + self._max_connections = module.params['max_connections'] + self._max_queues = module.params['max_queues'] + self._node = module.params['node'] + self._state = module.params['state'] + self._vhost = module.params['vhost'] + self._rabbitmqctl = module.get_bin_path('rabbitmqctl', True) + + def _exec(self, args): + cmd = [self._rabbitmqctl, '-q', '-p', self._vhost] + if self._node is not None: + cmd.extend(['-n', self._node]) + rc, out, err = self._module.run_command(cmd + args, check_rc=True) + return dict(rc=rc, out=out.splitlines(), err=err.splitlines()) + + def list(self): + exec_result = self._exec(['list_vhost_limits']) + vhost_limits = exec_result['out'][0] + max_connections = None + max_queues = None + if vhost_limits: + vhost_limits = json.loads(vhost_limits) + if 'max-connections' in vhost_limits: + max_connections = vhost_limits['max-connections'] + if 'max-queues' in vhost_limits: + max_queues = vhost_limits['max-queues'] + return dict( + max_connections=max_connections, + max_queues=max_queues + ) + + def set(self): + if self._module.check_mode: + return + json_str = '{{"max-connections": {0}, "max-queues": {1}}}'.format(self._max_connections, self._max_queues) + self._exec(['set_vhost_limits', json_str]) + + def clear(self): + if self._module.check_mode: + return + self._exec(['clear_vhost_limits']) + + +def main(): + arg_spec = dict( + max_connections=dict(default=-1, type='int'), + max_queues=dict(default=-1, type='int'), + node=dict(default=None, type='str'), + state=dict(default='present', choices=['present', 'absent'], type='str'), + vhost=dict(default='/', type='str') + ) + + module = AnsibleModule( + argument_spec=arg_spec, + supports_check_mode=True + ) + + max_connections = module.params['max_connections'] + max_queues = module.params['max_queues'] + node = module.params['node'] + state = module.params['state'] + vhost = module.params['vhost'] + + module_result = dict(changed=False) + rabbitmq_vhost_limits = RabbitMqVhostLimits(module) + current_status = rabbitmq_vhost_limits.list() + + if state == 'present': + wanted_status = dict( + max_connections=max_connections, + max_queues=max_queues + ) + else: # state == 'absent' + wanted_status = dict( + max_connections=None, + max_queues=None + ) + + if current_status != wanted_status: + module_result['changed'] = True + if state == 'present': + rabbitmq_vhost_limits.set() + else: # state == 'absent' + rabbitmq_vhost_limits.clear() + + module.exit_json(**module_result) + + +if __name__ == '__main__': + main() |