diff options
Diffstat (limited to 'ansible_collections/netapp/cloudmanager/tests/unit')
22 files changed, 5935 insertions, 0 deletions
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py new file mode 100644 index 000000000..f60ee6782 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py @@ -0,0 +1,33 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +# +# Compat for python2.7 +# + +# One unittest needs to import builtins via __import__() so we need to have +# the string that represents it +try: + import __builtin__ +except ImportError: + BUILTINS = 'builtins' +else: + BUILTINS = '__builtin__' diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py new file mode 100644 index 000000000..0972cd2e8 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py @@ -0,0 +1,122 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +''' +Compat module for Python3.x's unittest.mock module +''' +import sys + +# Python 2.7 + +# Note: Could use the pypi mock library on python3.x as well as python2.x. It +# is the same as the python3 stdlib mock library + +try: + # Allow wildcard import because we really do want to import all of mock's + # symbols into this compat shim + # pylint: disable=wildcard-import,unused-wildcard-import + from unittest.mock import * +except ImportError: + # Python 2 + # pylint: disable=wildcard-import,unused-wildcard-import + try: + from mock import * + except ImportError: + print('You need the mock library installed on python2.x to run tests') + + +# Prior to 3.4.4, mock_open cannot handle binary read_data +if sys.version_info >= (3,) and sys.version_info < (3, 4, 4): + file_spec = None + + def _iterate_read_data(read_data): + # Helper for mock_open: + # Retrieve lines from read_data via a generator so that separate calls to + # readline, read, and readlines are properly interleaved + sep = b'\n' if isinstance(read_data, bytes) else '\n' + data_as_list = [l + sep for l in read_data.split(sep)] + + if data_as_list[-1] == sep: + # If the last line ended in a newline, the list comprehension will have an + # extra entry that's just a newline. Remove this. + data_as_list = data_as_list[:-1] + else: + # If there wasn't an extra newline by itself, then the file being + # emulated doesn't have a newline to end the last line remove the + # newline that our naive format() added + data_as_list[-1] = data_as_list[-1][:-1] + + for line in data_as_list: + yield line + + def mock_open(mock=None, read_data=''): + """ + A helper function to create a mock to replace the use of `open`. It works + for `open` called directly or used as a context manager. + + The `mock` argument is the mock object to configure. If `None` (the + default) then a `MagicMock` will be created for you, with the API limited + to methods or attributes available on standard file handles. + + `read_data` is a string for the `read` methoddline`, and `readlines` of the + file handle to return. This is an empty string by default. + """ + def _readlines_side_effect(*args, **kwargs): + if handle.readlines.return_value is not None: + return handle.readlines.return_value + return list(_data) + + def _read_side_effect(*args, **kwargs): + if handle.read.return_value is not None: + return handle.read.return_value + return type(read_data)().join(_data) + + def _readline_side_effect(): + if handle.readline.return_value is not None: + while True: + yield handle.readline.return_value + for line in _data: + yield line + + global file_spec + if file_spec is None: + import _io + file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))) + + if mock is None: + mock = MagicMock(name='open', spec=open) + + handle = MagicMock(spec=file_spec) + handle.__enter__.return_value = handle + + _data = _iterate_read_data(read_data) + + handle.write.return_value = None + handle.read.return_value = None + handle.readline.return_value = None + handle.readlines.return_value = None + + handle.read.side_effect = _read_side_effect + handle.readline.side_effect = _readline_side_effect() + handle.readlines.side_effect = _readlines_side_effect + + mock.return_value = handle + return mock diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py new file mode 100644 index 000000000..73a20cf8c --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py @@ -0,0 +1,44 @@ +# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com> +# +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see <http://www.gnu.org/licenses/>. + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +''' +Compat module for Python2.7's unittest module +''' + +import sys + +import pytest + +# Allow wildcard import because we really do want to import all of +# unittests's symbols into this compat shim +# pylint: disable=wildcard-import,unused-wildcard-import +if sys.version_info < (2, 7): + try: + # Need unittest2 on python2.6 + from unittest2 import * + except ImportError: + print('You need unittest2 installed on python2.6.x to run tests') + + class TestCase: + """ skip everything """ + pytestmark = pytest.mark.skip('Skipping Unit Tests on 2.6 as unittest2 may not be available') +else: + from unittest import * diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py new file mode 100644 index 000000000..959cbaef5 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py @@ -0,0 +1,506 @@ +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" unit tests for module_utils netapp.py + + Provides wrappers for cloudmanager REST APIs +""" + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +# import copy # for deepcopy +import json +import pytest +import sys +try: + import requests.exceptions + HAS_REQUESTS_EXC = True +except ImportError: + HAS_REQUESTS_EXC = False + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch +from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp import CloudManagerRestAPI +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils + +if (not netapp_utils.HAS_REQUESTS or not HAS_REQUESTS_EXC) and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + """prepare arguments so that they will be picked up during module creation""" + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleFailJson(Exception): + """Exception class to be raised by module.fail_json and caught by the test case""" + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockModule(): + ''' rough mock for an Ansible module class ''' + def __init__(self): + self.params = {} + + def fail_json(self, *args, **kwargs): # pylint: disable=unused-argument + """function to simulate fail_json: package return data into an exception""" + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class mockResponse: + def __init__(self, json_data, status_code, headers=None, raise_action=None): + self.json_data = json_data + self.status_code = status_code + self.content = json_data + self.headers = headers or {} + self.raise_action = raise_action + + def raise_for_status(self): + pass + + def json(self): + if self.raise_action == 'bad_json': + raise ValueError(self.raise_action) + return self.json_data + + +def create_module(args): + argument_spec = netapp_utils.cloudmanager_host_argument_spec() + set_module_args(args) + module = basic.AnsibleModule(argument_spec) + module.fail_json = fail_json + return module + + +def create_restapi_object(args): + module = create_module(args) + return netapp_utils.CloudManagerRestAPI(module) + + +def mock_args(feature_flags=None, client_id=None): + args = { + 'refresh_token': 'ABCDEFGS' + } + if feature_flags is not None: + args['feature_flags'] = feature_flags + if client_id is not None: + args['client_id'] = client_id + return args + + +TOKEN_DICT = { + 'access_token': 'access_token', + 'token_type': 'token_type' +} + + +def test_missing_params(): + module = MockModule() + with pytest.raises(KeyError) as exc: + netapp_utils.CloudManagerRestAPI(module) + assert exc.match('refresh_token') + + +@patch('requests.request') +def test_get_token_refresh(mock_request): + ''' successfully get token using refresh token ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + ] + # get_token is called when the object is created + rest_api = create_restapi_object(mock_args()) + print(rest_api.token_type, rest_api.token) + assert rest_api.token_type == TOKEN_DICT['token_type'] + assert rest_api.token == TOKEN_DICT['access_token'] + + +@patch('requests.request') +def test_negative_get_token_none(mock_request): + ''' missing refresh token and Service Account ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + ] + # get_token is called when the object is created + args = dict(mock_args()) + args.pop('refresh_token') + # get_token is called when the object is created + with pytest.raises(AnsibleFailJson) as exc: + rest_api = create_restapi_object(args) + msg = 'Missing refresh_token or sa_client_id and sa_secret_key' + assert msg in exc.value.args[0]['msg'] + + +@patch('requests.request') +def test_get_token_sa(mock_request): + ''' successfully get token using Service Account ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + ] + # get_token is called when the object is created + args = dict(mock_args()) + args.pop('refresh_token') + args['sa_client_id'] = '123' + args['sa_secret_key'] = 'a1b2c3' + rest_api = create_restapi_object(args) + print(rest_api.token_type, rest_api.token) + assert rest_api.token_type == TOKEN_DICT['token_type'] + assert rest_api.token == TOKEN_DICT['access_token'] + + +@patch('requests.request') +def test_negative_get_token(mock_request): + ''' error on OAUTH request ''' + mock_request.side_effect = [ + mockResponse(json_data={'message': 'error message'}, status_code=206) + ] + # get_token is called when the object is created + with pytest.raises(AnsibleFailJson) as exc: + rest_api = create_restapi_object(mock_args()) + msg = 'Error acquiring token: error message' + assert msg in exc.value.args[0]['msg'] + + +@patch('requests.request') +def test_get_json(mock_request): + ''' get with no data ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'}) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message == {'key': 'value'} + assert error is None + assert ocr == 'OCR_id' + + +@patch('time.sleep') +@patch('requests.request') +def test_get_retries(mock_request, dont_sleep): + ''' get with no data ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + requests.exceptions.ConnectionError('Max retries exceeded with url:'), + requests.exceptions.ConnectionError('Max retries exceeded with url:'), + mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'}) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message == {'key': 'value'} + assert error is None + assert ocr == 'OCR_id' + + +@patch('time.sleep') +@patch('requests.request') +def test_get_retries_exceeded(mock_request, dont_sleep): + ''' get with no data ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + requests.exceptions.ConnectionError('Max retries exceeded with url:'), + requests.exceptions.ConnectionError('Max retries exceeded with url:'), + requests.exceptions.ConnectionError('Max retries exceeded with url:'), + mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'}) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert 'Max retries exceeded with url:' in error + + +@patch('requests.request') +def test_empty_get_sent_bad_json(mock_request): + ''' get with invalid json ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data='anything', status_code=200, raise_action='bad_json') + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message is None + assert error is None + assert ocr is None + + +@patch('requests.request') +def test_empty_get_sent_203(mock_request): + ''' get with no data and 203 status code ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={}, status_code=203) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message == {} + assert error is None + assert ocr is None + + +@patch('requests.request') +def test_negative_get_sent_203(mock_request): + ''' get with 203 status code - not sure we should error out here ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'message': 'error message'}, status_code=203) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message == {'message': 'error message'} + assert error == 'error message' + assert ocr is None + + +@patch('requests.request') +def test_negative_get_sent_300(mock_request): + ''' get with 300 status code - 300 indicates an error ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={}, status_code=300) + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message == {} + assert error == '300' + assert ocr is None + + +@patch('requests.request') +def test_negative_get_raise_http_exc(mock_request): + ''' get with HTTPError exception ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + requests.exceptions.HTTPError('some exception') + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message is None + assert error == 'some exception' + assert ocr is None + + +@patch('requests.request') +def test_negative_get_raise_conn_exc(mock_request): + ''' get with ConnectionError exception ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + requests.exceptions.ConnectionError('some exception') + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message is None + assert error == 'some exception' + assert ocr is None + + +@patch('requests.request') +def test_negative_get_raise_oserror_exc(mock_request): + ''' get with a general exception ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception') + ] + rest_api = create_restapi_object(mock_args()) + message, error, ocr = rest_api.get('api', None) + print(message, error, ocr) + assert message is None + assert error == 'some exception' + assert ocr is None + + +def test_has_feature_success_default(): + ''' existing feature_flag with default ''' + flag = 'show_modified' + module = create_module(mock_args()) + value = netapp_utils.has_feature(module, flag) + assert value + + +def test_has_feature_success_user_true(): + ''' existing feature_flag with value set to True ''' + flag = 'user_deprecation_warning' + args = dict(mock_args({flag: True})) + module = create_module(args) + value = netapp_utils.has_feature(module, flag) + assert value + + +def test_has_feature_success_user_false(): + ''' existing feature_flag with value set to False ''' + flag = 'user_deprecation_warning' + args = dict(mock_args({flag: False})) + print(args) + module = create_module(args) + value = netapp_utils.has_feature(module, flag) + assert not value + + +def test_has_feature_invalid_key(): + ''' existing feature_flag with unknown key ''' + flag = 'deprecation_warning_bad_key' + module = create_module(mock_args()) + with pytest.raises(AnsibleFailJson) as exc: + netapp_utils.has_feature(module, flag) + msg = 'Internal error: unexpected feature flag: %s' % flag + assert exc.value.args[0]['msg'] == msg + + +def test_has_feature_invalid_bool(): + ''' existing feature_flag with non boolean value ''' + flag = 'deprecation_warning_key' + module = create_module(mock_args({flag: 'str'})) + with pytest.raises(AnsibleFailJson) as exc: + netapp_utils.has_feature(module, flag) + msg = "Error: expected bool type for feature flag" + assert msg in exc.value.args[0]['msg'] + + +STATUS_DICT = { + 'status': 1, + 'error': None +} + + +@patch('time.sleep') +@patch('requests.request') +def test_check_task_status(mock_request, mock_sleep): + ''' successful get with 2 retries ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + mockResponse(json_data=STATUS_DICT, status_code=200) + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + status, error_msg, error = rest_api.check_task_status('api') + assert status == STATUS_DICT['status'] + assert error_msg == STATUS_DICT['error'] + assert error is None + + +@patch('time.sleep') +@patch('requests.request') +def test_negative_check_task_status(mock_request, mock_sleep): + ''' get with 4 failed retries ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + requests.exceptions.ConnectionError('some exception'), + requests.exceptions.HTTPError('some exception'), + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + status, error_msg, error = rest_api.check_task_status('api') + assert status == 0 + assert error_msg == '' + assert error == 'some exception' + + +@patch('time.sleep') +@patch('requests.request') +def test_wait_on_completion(mock_request, mock_sleep): + ''' successful get with 2 retries ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + mockResponse(json_data=STATUS_DICT, status_code=200) + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1) + assert error is None + + +@patch('time.sleep') +@patch('requests.request') +def test_negative_wait_on_completion_failure(mock_request, mock_sleep): + ''' successful get with 2 retries, but status is -1 ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + mockResponse(json_data={'status': -1, 'error': 'task_error'}, status_code=200) + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1) + assert error == 'Failed to task action, error: task_error' + + +@patch('time.sleep') +@patch('requests.request') +def test_negative_wait_on_completion_error(mock_request, mock_sleep): + ''' get with 4 failed retries ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + requests.exceptions.ConnectionError('some exception'), + requests.exceptions.HTTPError('some http exception'), + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1) + assert error == 'some http exception' + + +@patch('time.sleep') +@patch('requests.request') +def test_negative_wait_on_completion_timeout(mock_request, mock_sleep): + ''' successful get with 2 retries, but status is 0 ''' + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + OSError('some exception'), + requests.exceptions.ConnectionError('some exception'), + mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200), + mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200), + mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200) + ] + rest_api = create_restapi_object(mock_args()) + rest_api.module.params['client_id'] = '123' + error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1) + assert error == 'Taking too long for action to task or not properly setup' diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py new file mode 100644 index 000000000..33041f64f --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py @@ -0,0 +1,578 @@ +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" unit tests for module_utils netapp.py + + Provides wrappers for cloudmanager REST APIs +""" + +from __future__ import (absolute_import, division, print_function) +from logging import error +__metaclass__ = type + +# import copy # for deepcopy +import json +import sys +import pytest +try: + import requests.exceptions + HAS_REQUESTS_EXC = True +except ImportError: + HAS_REQUESTS_EXC = False + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch +from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp import CloudManagerRestAPI +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module import cmp as nm_cmp, NetAppModule +if (not netapp_utils.HAS_REQUESTS or not HAS_REQUESTS_EXC) and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + """prepare arguments so that they will be picked up during module creation""" + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class mockResponse: + def __init__(self, json_data, status_code, headers=None): + self.json_data = json_data + self.status_code = status_code + self.content = json_data + self.headers = headers or {} + + def json(self): + return self.json_data + + +def create_module(args): + argument_spec = netapp_utils.cloudmanager_host_argument_spec() + set_module_args(args) + module = basic.AnsibleModule(argument_spec) + return module + + +def create_restapi_object(args): + module = create_module(args) + return netapp_utils.CloudManagerRestAPI(module) + + +def mock_args(feature_flags=None, client_id=None): + args = { + 'refresh_token': 'ABCDEFGS' + } + return args + + +TOKEN_DICT = { + 'access_token': 'access_token', + 'token_type': 'token_type' +} + + +def test_cmp(): + assert nm_cmp(None, 'x') == -1 + assert nm_cmp('y', 'x') == 1 + assert nm_cmp('y', 'X') == 1 + assert nm_cmp(['x', 'y'], ['x', 'X']) == 1 + assert nm_cmp(['x', 'x'], ['x', 'X']) == 0 + + +def test_set_parameters(): + helper = NetAppModule() + helper.set_parameters({'a': None, 'b': 'b'}) + assert 'a' not in helper.parameters + assert 'b' in helper.parameters + + +def test_cd_action(): + desired = {} + helper = NetAppModule() + assert helper.get_cd_action(None, desired) == 'create' + desired['state'] = 'present' + assert helper.get_cd_action(None, desired) == 'create' + assert helper.get_cd_action({}, desired) is None + desired['state'] = 'absent' + assert helper.get_cd_action(None, desired) is None + assert helper.get_cd_action({}, desired) == 'delete' + + +def test_compare_and_update_values(): + current = {'a': 'a', 'b': 'b'} + desired = {} + desired_key = [] + helper = NetAppModule() + assert helper.compare_and_update_values(current, desired, desired_key) == ({}, False) + desired_key = ['a'] + assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'a'}, False) + desired = {'a': 'a'} + assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'a'}, False) + desired = {'a': 'c'} + assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'c'}, True) + + +@patch('requests.request') +def test_get_working_environments_info(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'a': 'b'}, status_code=200), + mockResponse(json_data={'c': 'd'}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_working_environments_info(rest_api, '') == ({'a': 'b'}, None) + assert helper.get_working_environments_info(rest_api, '') == ({'c': 'd'}, '500') + + +def test_look_up_working_environment_by_name_in_list(): + we_list = [{'name': 'bob', 'b': 'b'}, {'name': 'chuck', 'c': 'c'}] + helper = NetAppModule() + assert helper.look_up_working_environment_by_name_in_list(we_list, 'bob') == (we_list[0], None) + error = "look_up_working_environment_by_name_in_list: Working environment not found" + assert helper.look_up_working_environment_by_name_in_list(we_list, 'alice') == (None, error) + + +@patch('requests.request') +def test_get_working_environment_details_by_name(mock_request): + we_list = [{'name': 'bob', 'b': 'b'}, {'name': 'chuck', 'c': 'c'}] + json_data = {'onPremWorkingEnvironments': [], + 'gcpVsaWorkingEnvironments': [], + 'azureVsaWorkingEnvironments': [], + 'vsaWorkingEnvironments': [] + } + json_data_onprem = dict(json_data) + json_data_onprem['onPremWorkingEnvironments'] = we_list + json_data_gcp = dict(json_data) + json_data_gcp['gcpVsaWorkingEnvironments'] = we_list + json_data_azure = dict(json_data) + json_data_azure['azureVsaWorkingEnvironments'] = we_list + json_data_aws = dict(json_data) + json_data_aws['vsaWorkingEnvironments'] = we_list + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'a': 'b'}, status_code=500), # exists + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data={'c': 'd'}, status_code=400), # get all + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data=json_data_onprem, status_code=200), # get all + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data=json_data_gcp, status_code=200), # get all + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data=json_data_azure, status_code=200), # get all + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data=json_data_aws, status_code=200), # get all + mockResponse(json_data={'a': 'b'}, status_code=200), # exists + mockResponse(json_data=json_data, status_code=200), # get all + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_working_environment_details_by_name(rest_api, '', 'name') == (None, '500') + assert helper.get_working_environment_details_by_name(rest_api, '', 'name') == (None, '400') + assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None) + assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None) + assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None) + assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None) + error = "get_working_environment_details_by_name: Working environment not found" + assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (None, error) + + +@patch('requests.request') +def test_get_working_environment_details(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'key': [{'a': 'b'}]}, status_code=200), + mockResponse(json_data={'key': [{'c': 'd'}]}, status_code=500) + ] + helper = NetAppModule() + args = dict(mock_args()) + rest_api = create_restapi_object(args) + helper.parameters['working_environment_id'] = 'test_we' + assert helper.get_working_environment_details(rest_api, '') == ({'key': [{'a': 'b'}]}, None) + error = "Error: get_working_environment_details 500" + assert helper.get_working_environment_details(rest_api, '') == (None, error) + + +@patch('requests.request') +def test_get_working_environment_detail_for_snapmirror(mock_request): + json_data = {'onPremWorkingEnvironments': [], + 'gcpVsaWorkingEnvironments': [], + 'azureVsaWorkingEnvironments': [], + 'vsaWorkingEnvironments': [] + } + json_data_source = dict(json_data) + json_data_source['onPremWorkingEnvironments'] = [{'name': 'test_we_s'}] + json_data_destination = dict(json_data) + json_data_destination['onPremWorkingEnvironments'] = [{'name': 'test_we_d'}] + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + # by id, first test + mockResponse(json_data={'key': [{'publicId': 'test_we_s'}]}, status_code=200), # env details source + mockResponse(json_data={'key': [{'publicId': 'test_we_d'}]}, status_code=200), # env details dest + # by id, second test + mockResponse(json_data={'key': [{'c': 'd'}]}, status_code=500), # error source + # by id, third test + mockResponse(json_data={'key': [{'publicId': 'test_we_s'}]}, status_code=200), # env details source + mockResponse(json_data={'key': [{'e': 'f'}]}, status_code=500), # error source + # by name, first test + mockResponse(json_data={'a': 'b'}, status_code=200), # exists source + mockResponse(json_data=json_data_source, status_code=200), # env details source + mockResponse(json_data={'a': 'b'}, status_code=200), # exists dest + mockResponse(json_data=json_data_destination, status_code=200), # env details dest + # by name, second test + mockResponse(json_data={'key': {'c': 'd'}}, status_code=500), # error source + # by name, third test + mockResponse(json_data={'a': 'b'}, status_code=200), # exists source + mockResponse(json_data=json_data_source, status_code=200), # env details source + mockResponse(json_data={'key': {'e': 'f'}}, status_code=500), # error source + ] + helper = NetAppModule() + args = dict(mock_args()) + rest_api = create_restapi_object(args) + # search by id + helper.parameters['source_working_environment_id'] = 'test_we_s' + helper.parameters['destination_working_environment_id'] = 'test_we_d' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == ({'publicId': 'test_we_s'}, {'publicId': 'test_we_d'}, None) + error = "Error getting WE info: 500: {'key': [{'c': 'd'}]}" + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + error = "Error getting WE info: 500: {'key': [{'e': 'f'}]}" + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + # search by name + del helper.parameters['source_working_environment_id'] + del helper.parameters['destination_working_environment_id'] + helper.parameters['source_working_environment_name'] = 'test_we_s' + helper.parameters['destination_working_environment_name'] = 'test_we_d' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == ({'name': 'test_we_s'}, {'name': 'test_we_d'}, None) + error = '500' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + error = '500' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + # no destination id nor name + del helper.parameters['destination_working_environment_name'] + error = 'Cannot find working environment by destination_working_environment_id or destination_working_environment_name' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + # no source id nor name + del helper.parameters['source_working_environment_name'] + error = 'Cannot find working environment by source_working_environment_id or source_working_environment_name' + assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error) + + +def test_create_account(): + helper = NetAppModule() + error = "Error: creating an account is not supported." + assert helper.create_account("rest_api") == (None, error) + + +@patch('requests.request') +def test_get_or_create_account(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), + mockResponse(json_data=[], status_code=200), + mockResponse(json_data={'c': 'd'}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_or_create_account(rest_api) == ('account_id', None) + error = 'Error: account cannot be located - check credentials or provide account_id.' + assert helper.get_or_create_account(rest_api) == (None, error) + error = '500' + assert helper.get_or_create_account(rest_api) == (None, error) + + +@patch('requests.request') +def test_get_account_info(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), + mockResponse(json_data=[], status_code=200), + mockResponse(json_data={'c': 'd'}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_account_info(rest_api, '') == ([{'accountPublicId': 'account_id'}], None) + assert helper.get_account_info(rest_api, '') == ([], None) + assert helper.get_account_info(rest_api, '') == (None, '500') + + +@patch('requests.request') +def test_get_account_id(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), + mockResponse(json_data=[], status_code=200), + mockResponse(json_data={'c': 'd'}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_account_id(rest_api) == ('account_id', None) + error = 'Error: no account found - check credentials or provide account_id.' + assert helper.get_account_id(rest_api) == (None, error) + error = '500' + assert helper.get_account_id(rest_api) == (None, error) + + +@patch('requests.request') +def test_get_accounts_info(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), + mockResponse(json_data={'c': 'd'}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_accounts_info(rest_api, '') == ([{'accountPublicId': 'account_id'}], None) + error = '500' + assert helper.get_accounts_info(rest_api, '') == (None, error) + + +def test_set_api_root_path(): + helper = NetAppModule() + helper.parameters['working_environment_id'] = 'abc' + working_environment_details = {'cloudProviderName': 'Amazon', 'isHA': False} + helper.set_api_root_path(working_environment_details, helper) + assert helper.api_root_path == '/occm/api/vsa' + working_environment_details = {'cloudProviderName': 'Other', 'isHA': False} + helper.set_api_root_path(working_environment_details, helper) + assert helper.api_root_path == '/occm/api/other/vsa' + working_environment_details = {'cloudProviderName': 'Other', 'isHA': True} + helper.set_api_root_path(working_environment_details, helper) + assert helper.api_root_path == '/occm/api/other/ha' + + +@patch('requests.request') +def test_get_occm_agents_by_account(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'a': 'b'}], status_code=200), + mockResponse(json_data=[{'c': 'd'}], status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_occm_agents_by_account(rest_api, '') == ([{'a': 'b'}], None) + error = '500' + assert helper.get_occm_agents_by_account(rest_api, '') == ([{'c': 'd'}], error) + + +@patch('requests.request') +def test_get_occm_agents_by_name(mock_request): + json_data = {'agents': + [{'name': '', 'provider': ''}, + {'name': 'a1', 'provider': 'p1'}, + {'name': 'a1', 'provider': 'p1'}, + {'name': 'a1', 'provider': 'p2'}, + {'name': 'a2', 'provider': 'p1'}, + ]} + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=json_data, status_code=200), + mockResponse(json_data=json_data, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + expected = [agent for agent in json_data['agents'] if agent['name'] == 'a1' and agent['provider'] == 'p1'] + assert helper.get_occm_agents_by_name(rest_api, 'account', 'a1', 'p1') == (expected, None) + error = '500' + assert helper.get_occm_agents_by_name(rest_api, 'account', 'a1', 'p1') == (expected, error) + + +@patch('requests.request') +def test_get_agents_info(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id + mockResponse(json_data=[{'a': 'b'}], status_code=200), + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id + mockResponse(json_data=[{'c': 'd'}], status_code=500), + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=400), # get account_id + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + assert helper.get_agents_info(rest_api, '') == ([{'a': 'b'}], None) + error = '500' + assert helper.get_agents_info(rest_api, '') == ([{'c': 'd'}], error) + error = '400' + assert helper.get_agents_info(rest_api, '') == (None, error) + + +@patch('requests.request') +def test_get_active_agents_info(mock_request): + json_data = {'agents': + [{'name': '', 'provider': '', 'agentId': 1, 'status': ''}, + {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'}, + {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': ''}, + {'name': 'a1', 'provider': 'p2', 'agentId': 1, 'status': 'active'}, + {'name': 'a2', 'provider': 'p1', 'agentId': 1, 'status': 'active'}, + ]} + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id + mockResponse(json_data=json_data, status_code=200), + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id + mockResponse(json_data=json_data, status_code=500), + mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=400), # get account_id + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + active = [agent for agent in json_data['agents'] if agent['status'] == 'active'] + expected = [{'name': agent['name'], 'client_id': agent['agentId'], 'provider': agent['provider']} for agent in active] + assert helper.get_active_agents_info(rest_api, '') == (expected, None) + error = '500' + assert helper.get_active_agents_info(rest_api, '') == (expected, error) + error = '400' + assert helper.get_active_agents_info(rest_api, '') == (None, error) + + +@patch('requests.request') +def test_get_occm_agent_by_id(mock_request): + json_data = {'agent': + {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'} + } + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=json_data, status_code=200), + mockResponse(json_data=json_data, status_code=500), + mockResponse(json_data={'a': 'b'}, status_code=500), + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + expected = json_data['agent'] + assert helper.get_occm_agent_by_id(rest_api, '') == (expected, None) + error = '500' + assert helper.get_occm_agent_by_id(rest_api, '') == (expected, error) + assert helper.get_occm_agent_by_id(rest_api, '') == ({'a': 'b'}, error) + + +@patch('requests.request') +def test_check_occm_status(mock_request): + json_data = {'agent': + {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'} + } + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=json_data, status_code=200), + mockResponse(json_data=json_data, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + expected = json_data + assert helper.check_occm_status(rest_api, '') == (expected, None) + error = '500' + assert helper.check_occm_status(rest_api, '') == (expected, error) + + +@patch('requests.request') +def test_register_agent_to_service(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={}, status_code=200), + mockResponse(json_data={}, status_code=200), + mockResponse(json_data={}, status_code=500) + ] + helper = NetAppModule() + rest_api = create_restapi_object(mock_args()) + helper.parameters['account_id'] = 'account_id' + helper.parameters['company'] = 'company' + helper.parameters['region'] = 'region' + helper.parameters['subnet_id'] = 'subnet_id' + expected = {} + assert helper.register_agent_to_service(rest_api, 'provider', 'vpc') == (expected, None) + args, kwargs = mock_request.call_args + body = kwargs['json'] + assert 'placement' in body + assert 'network' in body['placement'] + assert body['placement']['network'] == 'vpc' + body_other = body + assert helper.register_agent_to_service(rest_api, 'AWS', 'vpc') == (expected, None) + args, kwargs = mock_request.call_args + body = kwargs['json'] + assert 'placement' in body + assert 'network' in body['placement'] + assert body['placement']['network'] == 'vpc' + assert body_other != body + body['placement']['provider'] = 'provider' + assert body_other == body + error = '500' + assert helper.register_agent_to_service(rest_api, 'provider', 'vpc') == (expected, error) + + +@patch('requests.request') +def test_delete_occm(mock_request): + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'result': 'any'}, status_code=200), + mockResponse(json_data={'result': 'any'}, status_code=500), + ] + helper = NetAppModule() + helper.parameters['account_id'] = 'account_id' + rest_api = create_restapi_object(mock_args()) + assert helper.delete_occm(rest_api, '') == ({'result': 'any'}, None) + error = '500' + assert helper.delete_occm(rest_api, '') == ({'result': 'any'}, error) + + +@patch('requests.request') +def test_delete_occm_agents(mock_request): + agents = [{'agentId': 'a1'}, + {'agentId': 'a2'}] + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data={'result': 'any'}, status_code=200), # a1 + mockResponse(json_data={'result': 'any'}, status_code=200), # a2 + mockResponse(json_data={'result': 'any'}, status_code=500), # a1 + mockResponse(json_data={'result': 'any'}, status_code=200), # a2 + mockResponse(json_data={'result': 'any'}, status_code=200), # a1 + mockResponse(json_data={'result': 'any'}, status_code=200), # a2 + ] + helper = NetAppModule() + helper.parameters['account_id'] = 'account_id' + rest_api = create_restapi_object(mock_args()) + assert helper.delete_occm_agents(rest_api, agents) == [] + error = '500' + assert helper.delete_occm_agents(rest_api, agents) == [({'result': 'any'}, error)] + agents.append({'a': 'b'}) + error = "unexpected agent contents: {'a': 'b'}" + assert helper.delete_occm_agents(rest_api, agents) == [(None, error)] + + +@patch('requests.request') +def test_get_tenant(mock_request): + tenants = [{'publicId': 'a1'}, + {'publicId': 'a2'}] + mock_request.side_effect = [ + mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH + mockResponse(json_data=tenants, status_code=200), # get success + mockResponse(json_data={'result': 'any'}, status_code=500), # get error + ] + helper = NetAppModule() + # helper.parameters['account_id'] = 'account_id' + rest_api = create_restapi_object(mock_args()) + assert helper.get_tenant(rest_api, '') == ('a1', None) + error = "Error: unexpected response on getting tenant for cvo: 500, {'result': 'any'}" + assert helper.get_tenant(rest_api, '') == (None, error) diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py new file mode 100644 index 000000000..b24778f47 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py @@ -0,0 +1,77 @@ +# This code is part of Ansible, but is an independent component. +# This particular file snippet, and this file snippet only, is BSD licensed. +# Modules you write using this snippet, which is embedded dynamically by Ansible +# still belong to the author of the module, and may assign their own license +# to the complete work. +# +# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com> +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" unit tests for module_utils netapp_module.py + + Provides utility functions for cloudmanager REST APIs +""" + +from __future__ import (absolute_import, division, print_function) +from logging import error +__metaclass__ = type + +import pytest +import sys + +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch +from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module import NetAppModule + +if sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as builtins not defined for 2.6 and 2.7') + + +@patch('builtins.open') +def test_certificates(open): + open.return_value = OPEN(data=b'1234') + helper = NetAppModule() + assert helper.encode_certificates('test') == ('MTIzNA==', None) + open.return_value = OPEN(data=b'') + helper = NetAppModule() + assert helper.encode_certificates('test') == (None, 'Error: file is empty') + open.return_value = OPEN(raise_exception=True) + helper = NetAppModule() + assert helper.encode_certificates('test') == (None, 'intentional error') + + +class OPEN: + '''we could use mock_open but I'm not sure it's available in every python version ''' + def __init__(self, data=b'abcd', raise_exception=False): + self.data = data + self.raise_exception = raise_exception + + def read(self): + return self.data + # the following two methods are associated with "with" in with open ... + + def __enter__(self): + if self.raise_exception: + raise OSError('intentional error') + return self + + def __exit__(self, *args): + pass diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py new file mode 100644 index 000000000..db30ada89 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py @@ -0,0 +1,297 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests ONTAP Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate \ + import NetAppCloudmanagerAggregate as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 2, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_create_cloudmanager_aggregate(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 2, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_create_cloudmanager_aggregate_by_workingenv_name(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'working_environment_name': 'wkone', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 2, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_delete_cloudmanager_aggregate(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 2, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_delete_cloudmanager_aggregate_by_workingenv_name(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 2, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_update_cloudmanager_aggregate(self): + return dict({ + 'state': 'present', + 'name': 'TestCMAggregate', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'number_of_disks': 3, + 'disk_size_size': 100, + 'disk_size_unit': 'GB', + 'refresh_token': 'myrefresh_token', + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required ars are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch( + 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_aggregate_pass(self, get_post_api, get_aggregate_api, get_token): + set_module_args(self.set_args_create_cloudmanager_aggregate()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + get_aggregate_api.return_value = None + get_post_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_aggregate: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch( + 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_aggregate_pass(self, get_delete_api, get_aggregate_api, get_token): + set_module_args(self.set_args_delete_cloudmanager_aggregate()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + my_aggregate = { + 'name': 'Dummyname', + 'state': 'online', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'}, + {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}], + 'homeNode': 'testAWSa-01', + } + get_aggregate_api.return_value = my_aggregate + get_delete_api.return_value = 'Aggregated Deleted', None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_aggregate: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_update_cloudmanager_aggregate_pass(self, get_post_api, get_aggregate_api, get_token): + set_module_args(self.set_args_update_cloudmanager_aggregate()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + my_aggregate = { + 'name': 'Dummyname', + 'state': 'online', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'}, + {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}], + 'homeNode': 'testAWSa-01', + } + get_aggregate_api.return_value = my_aggregate + get_post_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_update_cloudmanager_aggregate: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_aggregate_by_workingenv_name_pass(self, get_post_api, get_we, get_aggregate_api, get_token): + data = self.set_args_create_cloudmanager_aggregate_by_workingenv_name() + get_token.return_value = 'test', 'test' + my_we = { + 'name': 'test', + 'publicId': 'test', + 'cloudProviderName': 'Amazon'} + get_we.return_value = my_we, None + data['working_environment_id'] = my_we['publicId'] + set_module_args(data) + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + get_aggregate_api.return_value = None + get_post_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_aggregate: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_aggregate_by_workingenv_name_pass(self, get_delete_api, get_we, get_aggregate_api, get_token): + data = self.set_args_delete_cloudmanager_aggregate_by_workingenv_name() + my_we = { + 'name': 'test', + 'publicId': 'test', + 'cloudProviderName': 'Amazon'} + get_we.return_value = my_we, None + data['working_environment_id'] = my_we['publicId'] + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + my_aggregate = { + 'name': 'Dummyname', + 'state': 'online', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'}, + {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None, + 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}], + 'homeNode': 'testAWSa-01', + } + get_aggregate_api.return_value = my_aggregate + get_delete_api.return_value = 'Aggregated Deleted', None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_aggregate: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py new file mode 100644 index 000000000..cee1e439c --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py @@ -0,0 +1,165 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx \ + import NetAppCloudManagerAWSFSX as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'workspace_id': 'test', + 'region': 'us-west-1', + 'tenant_id': 'account-test', + 'storage_capacity_size': 1024, + 'throughput_capacity': 512, + 'storage_capacity_size_unit': 'TiB', + 'aws_credentials_name': 'test', + 'primary_subnet_id': 'test', + 'secondary_subnet_id': 'test', + 'fsx_admin_password': 'password', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_create_cloudmanager_aws_fsx(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'workspace_id': 'test', + 'region': 'us-west-1', + 'tenant_id': 'account-test', + 'storage_capacity_size': 1024, + 'storage_capacity_size_unit': 'TiB', + 'throughput_capacity': 512, + 'aws_credentials_name': 'test', + 'primary_subnet_id': 'test', + 'secondary_subnet_id': 'test', + 'fsx_admin_password': 'password', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_delete_cloudmanager_aws_fsx(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'tenant_id': 'account-test', + 'refresh_token': 'myrefresh_token', + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.get_aws_credentials_id') + def test_module_fail_when_required_args_present(self, get_aws_credentials_id, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_aws_credentials_id.return_value = '123', None + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_aws_fsx_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.wait_on_completion_for_fsx') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.check_task_status_for_fsx') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.get_aws_credentials_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_aws_fsx_pass(self, get_post_api, get_aws_credentials_id, check_task_status_for_fsx, + wait_on_completion_for_fsx, get_aws_fsx_details, get_token): + set_module_args(self.set_args_create_cloudmanager_aws_fsx()) + get_token.return_value = 'test', 'test' + get_aws_credentials_id.return_value = '123', None + my_obj = my_module() + + response = {'id': 'abcdefg12345'} + get_post_api.return_value = response, None, None + check_task_status_for_fsx.return_value = {'providerDetails': {'status': {'status': 'ON', 'lifecycle': 'AVAILABLE'}}}, None + wait_on_completion_for_fsx.return_value = None + get_aws_fsx_details.return_value = None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_aws_fsx_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_aws_fsx_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_aws_fsx_pass(self, get_delete_api, get_aws_fsx_details, get_token): + set_module_args(self.set_args_delete_cloudmanager_aws_fsx()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_fsx = { + 'name': 'test', + 'id': 'test'} + get_aws_fsx_details.return_value = my_fsx, None + get_delete_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_aws_fsx_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py new file mode 100644 index 000000000..023f993af --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py @@ -0,0 +1,252 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server \ + import NetAppCloudmanagerCifsServer as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'refreshToken', + 'domain': 'test.com', + 'username': 'admin', + 'password': 'abcde', + 'dns_domain': 'test.com', + 'ip_addresses': '["1.0.0.1"]', + 'netbios': 'cvoname', + 'organizational_unit': 'CN=Computers', + }) + + def set_default_args_with_workingenv_name_pass_check(self): + return dict({ + 'state': 'present', + 'working_environment_name': 'weone', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'refreshToken', + 'domain': 'test.com', + 'username': 'admin', + 'password': 'abcde', + 'dns_domain': 'test.com', + 'ip_addresses': '["1.0.0.1"]', + 'netbios': 'cvoname', + 'organizational_unit': 'CN=Computers', + }) + + def set_using_workgroup_args_pass_check(self): + return dict({ + 'state': 'present', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'refreshToken', + 'is_workgroup': True, + 'server_name': 'abc', + 'workgroup_name': 'wk', + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_cifs_server_successfully(self, send_request, create, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = None + create.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_cifs_server_idempotency(self, send_request, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = { + 'domain': 'test.com', + 'dns_domain': 'test.com', + 'ip_addresses': ['1.0.0.1'], + 'netbios': 'cvoname', + 'organizational_unit': 'CN=Computers', + } + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert not exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_cifs_server_using_workgroup_successfully(self, send_request, create, get, get_token): + set_module_args(self.set_using_workgroup_args_pass_check()) + get.return_value = None + create.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.delete_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_delete_cifs_server_successfully(self, send_request, delete, get, get_token): + args = self.set_default_args_pass_check() + args['state'] = 'absent' + set_module_args(args) + get.return_value = { + 'domain': 'test.com', + 'dns_domain': 'test.com', + 'ip_addresses': ['1.0.0.1'], + 'netbios': 'cvoname', + 'organizational_unit': 'CN=Computers', + } + delete.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_cifs_server_successfully(self, send_request, create, get, get_we, get_token): + args = self.set_default_args_with_workingenv_name_pass_check() + my_we = { + 'name': 'test', + 'publicId': 'test', + 'cloudProviderName': 'Amazon'} + get_we.return_value = my_we, None + args['working_environment_id'] = my_we['publicId'] + set_module_args(args) + get.return_value = None + create.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.delete_cifs_server') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_delete_cifs_server_with_workingenv_name_successfully(self, send_request, delete, get, get_we, get_token): + args = self.set_default_args_with_workingenv_name_pass_check() + args['state'] = 'absent' + my_we = { + 'name': 'test', + 'publicId': 'test', + 'cloudProviderName': 'Amazon'} + get_we.return_value = my_we, None + args['working_environment_id'] = my_we['publicId'] + set_module_args(args) + get.return_value = { + 'domain': 'test.com', + 'dns_domain': 'test.com', + 'ip_addresses': ['1.0.0.1'], + 'netbios': 'cvoname', + 'organizational_unit': 'CN=Computers', + } + delete.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py new file mode 100644 index 000000000..dab9cde66 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py @@ -0,0 +1,730 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) +from logging import exception + +__metaclass__ = type + +import json +import sys +import pytest + +HAS_BOTOCORE = True +try: + from botocore.exceptions import ClientError +except ImportError: + HAS_BOTOCORE = False + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch + +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws \ + import NetAppCloudManagerConnectorAWS as my_module, IMPORT_EXCEPTION, main as my_main + +if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION) + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'region': 'us-west-1', + 'key_name': 'dev_automation', + 'subnet_id': 'subnet-test', + 'ami': 'ami-test', + 'security_group_ids': ['sg-test'], + 'refresh_token': 'myrefresh_token', + 'iam_instance_profile_name': 'OCCM_AUTOMATION', + 'account_id': 'account-test', + 'company': 'NetApp' + }) + + def set_args_create_cloudmanager_connector_aws(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'region': 'us-west-1', + 'key_name': 'dev_automation', + 'subnet_id': 'subnet-test', + 'ami': 'ami-test', + 'security_group_ids': ['sg-test'], + 'refresh_token': 'myrefresh_token', + 'iam_instance_profile_name': 'OCCM_AUTOMATION', + 'account_id': 'account-test', + 'company': 'NetApp' + }) + + def set_args_delete_cloudmanager_connector_aws(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'test', + 'instance_id': 'test', + 'region': 'us-west-1', + 'account_id': 'account-test', + 'refresh_token': 'myrefresh_token', + 'company': 'NetApp' + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.create_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.register_agent_to_service') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_ami') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_connector_aws_pass(self, get_post_api, get_ami, register_agent_to_service, get_vpc, create_instance, get_instance, get_token): + set_module_args(self.set_args_create_cloudmanager_connector_aws()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + get_post_api.return_value = None, None, None + get_ami.return_value = 'ami-test' + register_agent_to_service.return_value = 'test', 'test' + get_vpc.return_value = 'test' + create_instance.return_value = 'test', 'test' + get_instance.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_connector_aws: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm') + def test_delete_cloudmanager_connector_aws_pass(self, delete_occm, get_occm_agent_by_id, delete_api, get_instance, delete_instance, get_token): + set_module_args(self.set_args_delete_cloudmanager_connector_aws()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_instance = { + 'InstanceId': 'instance_id_1' + } + get_instance.return_value = my_instance + get_occm_agent_by_id.return_value = {'agentId': 'test', 'state': 'active'}, None + delete_api.return_value = None, None, None + delete_instance.return_value = None + delete_occm.return_value = None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm') + def test_delete_cloudmanager_connector_aws_pass_no_ids(self, delete_occm, get_occm_agents, delete_api, get_instance, delete_instance, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_connector_aws = { + 'name': 'Dummyname', + 'client_id': 'test', + 'refresh_token': 'myrefresh_token', + } + my_instance = { + 'InstanceId': 'instance_id_1' + } + # get_connector_aws.return_value = my_connector_aws + get_instance.return_value = my_instance + delete_api.return_value = None, None, None + delete_instance.return_value = None + get_occm_agents.return_value = [{'agentId': 'test', 'status': 'active'}], None + delete_occm.return_value = None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print() + print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm') + def test_delete_cloudmanager_connector_aws_negative_no_instance(self, delete_occm, get_occm_agents, delete_api, get_instance, delete_instance, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_connector_aws = { + 'name': 'Dummyname', + 'client_id': 'test', + 'refresh_token': 'myrefresh_token', + } + my_instance = None + # get_connector_aws.return_value = my_connector_aws + get_instance.return_value = my_instance + delete_api.return_value = None, None, None + delete_instance.return_value = None + get_occm_agents.return_value = [{'agentId': 'test', 'status': 'active'}], None + delete_occm.return_value = None, "some error on delete occm" + + with pytest.raises(AnsibleFailJson) as exc: + my_obj.apply() + print() + print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value)) + msg = "Error: deleting OCCM agent(s): [(None, 'some error on delete occm')]" + assert msg in exc.value.args[0]['msg'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_empty(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2() + my_obj = my_module() + instance = my_obj.get_instance() + print('instance', instance) + assert instance is None + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_one(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}]) + my_obj = my_module() + instance = my_obj.get_instance() + print('instance', instance) + assert instance + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_many_terminated(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + my_obj = my_module() + instance = my_obj.get_instance() + print('instance', instance) + assert instance is None + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_many_but_only_one_active(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + my_obj = my_module() + instance = my_obj.get_instance() + print('instance', instance) + assert instance + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_many_but_only_one_active(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'active', 'name': 'xxxx'}]) + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.get_instance() + msg = "Error: found multiple instances for name" + assert msg in exc.value.args[0]['msg'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_get_instance_exception(self, get_boto3_client, get_token): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + args.pop('instance_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2(raise_exc=True) + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.get_instance() + msg = "An error occurred (test_only) when calling the describe_instances operation: forced error in unit testing" + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_create_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None + get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, None + my_obj = my_module() + instance = my_obj.create_instance() + print('instance', instance) + assert instance + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.encode_certificates') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance_no_ami_with_tags(self, get_boto3_client, register, get_token, get_occm_agent_by_id, get_account, encode_cert, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP, no account id ''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args.pop('account_id') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + args['proxy_certificates'] = ['cert1', 'cert2'] + set_module_args(args) + get_account.return_value = 'account_id', None + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + encode_cert.return_value = 'base64', None + register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None + get_occm_agent_by_id.side_effect = [ + ({'agentId': 'test', 'status': 'pending'}, None), + ({'agentId': 'test', 'status': 'pending'}, None), + ({'agentId': 'test', 'status': 'active'}, None)] + my_obj = my_module() + instance = my_obj.create_instance() + print('instance', instance) + assert instance + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance_timeout(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None + get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'pending'}, None + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.create_instance() + msg = "Error: taking too long for OCCM agent to be active or not properly setup" + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance_error_in_get_agent(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None + get_occm_agent_by_id.return_value = 'forcing an error', 'intentional error' + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.create_instance() + msg = "Error: not able to get occm status: intentional error, forcing an error" + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_create_instance_error_in_get_account(self, get_boto3_client, get_token, get_account, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP, no account id ''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args.pop('account_id') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + set_module_args(args) + get_account.return_value = 'forcing an error', 'intentional error' + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.create_instance() + msg = "Error: failed to get account: intentional error." + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance_error_in_register(self, get_boto3_client, register, get_token, get_account, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP, no account id ''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args.pop('account_id') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + set_module_args(args) + get_account.return_value = 'account_id', None + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + register.return_value = 'forcing an error', 'intentional error', 'dummy' + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.create_instance() + msg = "Error: unexpected response on connector setup: intentional error, forcing an error" + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.encode_certificates') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + @patch('boto3.client') + def test_create_instance_error_in_open(self, get_boto3_client, register, get_token, get_account, encode_cert, dont_sleep): + ''' additional paths: get_ami, add tags, no public IP, no account id ''' + args = self.set_args_create_cloudmanager_connector_aws() + args.pop('ami') + args.pop('account_id') + args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}] + args['associate_public_ip_address'] = False + args['proxy_certificates'] = ['cert1', 'cert2'] + set_module_args(args) + get_account.return_value = 'account_id', None + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'terminated'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None + encode_cert.return_value = None, 'intentional error' + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.create_instance() + msg = "Error: could not open/read file 'cert1' of proxy_certificates: intentional error" + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.side_effect = [ + ({'agentId': 'test', 'status': 'active'}, None), + ({'agentId': 'test', 'status': 'active'}, None), + ({'agentId': 'test', 'status': 'terminated'}, None)] + my_obj = my_module() + error = my_obj.delete_instance() + assert not error + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm_agents') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_no_client(self, get_boto3_client, get_token, get_occm_agent_by_id, get_occm_agents_by_name, delete_occm_agents, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('client_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.side_effect = [ + ({'agentId': 'test', 'status': 'active'}, None), + ({'agentId': 'test', 'status': 'active'}, None), + ({'agentId': 'test', 'status': 'terminated'}, None)] + get_occm_agents_by_name.return_value = [], None + delete_occm_agents.return_value = None + with pytest.raises(AnsibleExitJson) as exc: + my_main() + assert not get_occm_agent_by_id.called + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance_timeout(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, None + my_obj = my_module() + error = my_obj.delete_instance() + assert 'Error: taking too long for instance to finish terminating.' == error + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance_error_on_agent(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, 'intentional error' + my_obj = my_module() + error = my_obj.delete_instance() + assert 'Error: not able to get occm agent status after deleting instance: intentional error,' in error + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance_client_id_not_found_403(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.return_value = 'Action not allowed for user', '403' + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.apply() + msg = "Error: not able to get occm agent status after deleting instance: 403," + assert msg in exc.value.args[0]['msg'] + print(exc.value.args[0]) + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance_client_id_not_found_other(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agent_by_id.return_value = 'Other error', '404' + my_obj = my_module() + with pytest.raises(AnsibleFailJson) as exc: + my_obj.apply() + msg = "Error: getting OCCM agents: 404," + assert msg in exc.value.args[0]['msg'] + + @patch('time.sleep') + # @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_delete_instance_account_id_not_found(self, get_boto3_client, get_token, dont_sleep): + args = self.set_args_delete_cloudmanager_connector_aws() + args.pop('account_id') + args.pop('client_id') + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + # get_occm_agent_by_id.return_value = 'Other error', '404' + my_obj = my_module() + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + # msg = "Error: getting OCCM agents: 404," + assert exc.value.args[0]['account_id'] is None + assert exc.value.args[0]['client_id'] is None + + @patch('time.sleep') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('boto3.client') + def test_modify_instance(self, get_boto3_client, get_token, get_occm_agents_by_name, dont_sleep): + args = self.set_args_create_cloudmanager_connector_aws() + args['instance_type'] = 't3.large' + set_module_args(args) + get_token.return_value = 'test', 'test' + get_boto3_client.return_value = EC2([{'state': 'active'}, + {'state': 'terminated', 'reservation': '2'}, + {'state': 'terminated', 'name': 'xxxx'}]) + get_occm_agents_by_name.return_value = [{'agentId': 'test', 'status': 'active'}], None + my_obj = my_module() + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + msg = "Note: modifying an existing connector is not supported at this time." + assert msg == exc.value.args[0]['modify'] + + +class EC2: + def __init__(self, get_instances=None, create_instance=True, raise_exc=False): + ''' list of instances as dictionaries: + name, state are optional, and used to build an instance + reservation is optional and defaults to 'default' + ''' + self.get_instances = get_instances if get_instances is not None else [] + self.create_instance = create_instance if create_instance is not None else [] + self.raise_exc = raise_exc + + def describe_instances(self, Filters=None, InstanceIds=None): + ''' return a list of reservations, each reservation is a list of instances + ''' + if self.raise_exc and HAS_BOTOCORE: + raise ClientError({'Error': {'Message': 'forced error in unit testing', 'Code': 'test_only'}}, 'describe_instances') + print('ec2', Filters) + print('ec2', InstanceIds) + return self._build_reservations() + + def describe_images(self, Filters=None, Owners=None): + ''' AMI ''' + return {'Images': [{'CreationDate': 'yyyyy', 'ImageId': 'image_id'}, + {'CreationDate': 'xxxxx', 'ImageId': 'image_id'}, + {'CreationDate': 'zzzzz', 'ImageId': 'image_id'}]} + + def describe_subnets(self, SubnetIds=None): + ''' subnets ''' + return {'Subnets': [{'VpcId': 'vpc_id'}]} + + def run_instances(self, **kwargs): + ''' create and start an instance''' + if self.create_instance: + return {'Instances': [{'InstanceId': 'instance_id'}]} + return {'Instances': []} + + def terminate_instances(self, **kwargs): + ''' terminate an instance''' + return + + def _build_reservations(self): + ''' return a list of reservations, each reservation is a list of instances + ''' + reservations = {} + for instance in self.get_instances: + reservation = instance.get('reservation', 'default') + if reservation not in reservations: + reservations[reservation] = [] + # provide default values for name or state if one is present + name, state = None, None + if 'name' in instance: + name = instance['name'] + state = instance.get('state', 'active') + elif 'state' in instance: + name = instance.get('name', 'd_name') + state = instance['state'] + instance_id = instance.get('instance_id', '12345') + instance_type = instance.get('instance_type', 't3.xlarge') + if name: + reservations[reservation].append({'Name': name, 'State': {'Name': state}, 'InstanceId': instance_id, 'InstanceType': instance_type}) + return { + 'Reservations': [ + {'Instances': instances} for instances in reservations.values() + ] + } diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py new file mode 100644 index 000000000..37a93a291 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py @@ -0,0 +1,178 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch + +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure \ + import NetAppCloudManagerConnectorAzure as my_module, IMPORT_EXCEPTION + +if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION) + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection: + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + + +# using pytest natively, without unittest.TestCase +@pytest.fixture +def patch_ansible(): + with patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) as mocks: + yield mocks + + +def set_default_args_pass_check(): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'location': 'westus', + 'resource_group': 'occm_group_westus', + 'subnet_id': 'Subnet1', + 'vnet_id': 'Vnet1', + 'subscription_id': 'subscriptionId-test', + 'refresh_token': 'myrefresh_token', + 'account_id': 'account-test', + 'company': 'NetApp', + 'admin_username': 'test', + 'admin_password': 'test', + 'network_security_group_name': 'test' + }) + + +def set_args_create_cloudmanager_connector_azure(): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'location': 'westus', + 'resource_group': 'occm_group_westus', + 'subnet_id': 'Subnet1', + 'vnet_id': 'Vnet1', + 'subscription_id': 'subscriptionId-test', + 'refresh_token': 'myrefresh_token', + 'account_id': 'account-test', + 'company': 'NetApp', + 'admin_username': 'test', + 'admin_password': 'test', + 'network_security_group_name': 'test' + }) + + +def set_args_delete_cloudmanager_connector_azure(): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'test', + 'location': 'westus', + 'resource_group': 'occm_group_westus', + 'subnet_id': 'Subnet1', + 'vnet_id': 'Vnet1', + 'subscription_id': 'subscriptionId-test', + 'refresh_token': 'myrefresh_token', + 'account_id': 'account-test', + 'company': 'NetApp', + 'admin_username': 'test', + 'admin_password': 'test', + 'network_security_group_name': 'test' + }) + + +def test_module_fail_when_required_args_missing(patch_ansible): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +def test_module_fail_when_required_args_present(get_token, patch_ansible): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.deploy_azure') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.register_agent_to_service') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') +def test_create_cloudmanager_connector_azure_pass(get_post_api, register_agent_to_service, deploy_azure, get_token, patch_ansible): + set_module_args(set_args_create_cloudmanager_connector_azure()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + get_post_api.return_value = None, None, None + register_agent_to_service.return_value = 'test', 'test' + deploy_azure.return_value = None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_connector_azure: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.get_deploy_azure_vm') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.delete_azure_occm') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') +def test_delete_cloudmanager_connector_azure_pass(get_delete_api, delete_azure_occm, get_deploy_azure_vm, get_token, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_azure()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + get_deploy_azure_vm.return_value = True + delete_azure_occm.return_value = None + get_delete_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_connector_azure: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py new file mode 100644 index 000000000..9d74af2d7 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py @@ -0,0 +1,407 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch + +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp \ + import NetAppCloudManagerConnectorGCP as my_module + +IMPORT_ERRORS = [] +HAS_GCP_COLLECTION = False + +try: + from google import auth + from google.auth.transport import requests + from google.oauth2 import service_account + import yaml + HAS_GCP_COLLECTION = True +except ImportError as exc: + IMPORT_ERRORS.append(str(exc)) + +if not HAS_GCP_COLLECTION and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required google packages on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +# using pytest natively, without unittest.TestCase +@pytest.fixture(name='patch_ansible') +def fixture_patch_ansible(): + with patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) as mocks: + yield mocks + + +def set_default_args_pass_check(): + return dict({ + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'my_refresh_token', + 'state': 'present', + 'name': 'CxName', + 'project_id': 'tlv-support', + 'zone': 'us-west-1', + 'account_id': 'account-test', + 'company': 'NetApp', + 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com', + }) + + +def set_args_create_cloudmanager_connector_gcp(): + return dict({ + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'my_refresh_token', + 'state': 'present', + 'name': 'CxName', + 'project_id': 'tlv-support', + 'zone': 'us-west-1', + 'account_id': 'account-test', + 'company': 'NetApp', + 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com', + 'service_account_path': 'test.json', + }) + + +def set_args_delete_cloudmanager_connector_gcp(): + return dict({ + 'client_id': 'test', + 'refresh_token': 'my_refresh_token', + 'state': 'absent', + 'name': 'CxName', + 'project_id': 'tlv-support', + 'zone': 'us-west-1', + 'account_id': 'account-test', + 'company': 'NetApp', + 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com', + 'service_account_path': 'test.json', + }) + + +def test_module_fail_when_required_args_missing(patch_ansible): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +def test_module_fail_when_required_args_present(get_token, get_gcp_token, patch_ansible): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(set_default_args_pass_check()) + get_token.return_value = 'bearer', 'test' + get_gcp_token.return_value = 'token', None + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.deploy_gcp_vm') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_custom_data_for_gcp') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.create_occm_gcp') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_deploy_vm') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') +def test_create_cloudmanager_connector_gcp_pass(get_post_api, get_vm, create_occm_gcp, get_custom_data_for_gcp, + deploy_gcp_vm, get_gcp_token, get_token, patch_ansible): + set_module_args(set_args_create_cloudmanager_connector_gcp()) + get_token.return_value = 'bearer', 'test' + get_gcp_token.return_value = 'test', None + my_obj = my_module() + + get_vm.return_value = None + deploy_gcp_vm.return_value = None, 'test', None + get_custom_data_for_gcp.return_value = 'test', 'test', None + create_occm_gcp.return_value = 'test' + get_post_api.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_connector_gcp: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'], create_occm_gcp.return_value[1] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.delete_occm_gcp') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_deploy_vm') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_occm_agents') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') +def test_delete_cloudmanager_connector_gcp_pass(get_delete_api, get_agents, get_deploy_vm, delete_occm_gcp, get_gcp_token, get_token, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_gcp()) + get_token.return_value = 'bearer', 'test' + get_gcp_token.return_value = 'test', None + my_obj = my_module() + + my_connector_gcp = { + 'name': 'Dummyname-vm-boot-deployment', + 'client_id': 'test', + 'refresh_token': 'my_refresh_token', + 'operation': {'status': 'active'} + } + get_deploy_vm.return_value = my_connector_gcp + get_agents.return_value = [] + get_delete_api.return_value = None, None, None + delete_occm_gcp.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_connector_gcp: %s' % repr(exc.value)) + + assert exc.value.args[0]['changed'] + + +TOKEN_DICT = { + 'access_token': 'access_token', + 'token_type': 'token_type' +} + + +AGENT_DICTS = { + 'active': { + 'agent': {'status': 'active'}, + }, + 'pending': { + 'agent': {'status': 'pending'}, + }, + 'other': { + 'agent': {'status': 'pending', 'agentId': 'agent11', 'name': 'CxName', 'provider': 'GCP'}, + } +} + + +CLIENT_DICT = { + 'clientId': '12345', + 'clientSecret': 'a1b2c3' +} + +SRR = { + # common responses (json_dict, error, ocr_id) + 'empty_good': ({}, None, None), + 'zero_record': ({'records': []}, None, None), + 'get_token': (TOKEN_DICT, None, None), + 'get_gcp_token': (TOKEN_DICT, None, None), + 'get_agent_status_active': (AGENT_DICTS['active'], None, None), + 'get_agent_status_pending': (AGENT_DICTS['pending'], None, None), + 'get_agent_status_other': (AGENT_DICTS['other'], None, None), + 'get_agents': ({'agents': [AGENT_DICTS['other']['agent']]}, None, None), + 'get_agents_empty': ({'agents': []}, None, None), + 'get_agent_not_found': (b"{'message': 'Action not allowed for user'}", '403', None), + 'get_vm': ({'operation': {'status': 'active'}}, None, None), + 'get_vm_not_found': (b"{'message': 'is not found'}", '404', None), + 'register_agent': (CLIENT_DICT, None, None), + 'end_of_sequence': (None, "Unexpected call to send_request", None), + 'generic_error': (None, "Expected error", None), +} + + +@patch('time.sleep') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_delete_occm_gcp_pass(mock_request, get_gcp_token, ignore_sleep, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['empty_good'], # delete + SRR['get_agent_status_active'], # status + SRR['get_agent_status_pending'], # status + SRR['get_agent_status_other'], # status + SRR['end_of_sequence'], + ] + my_obj = my_module() + + error = my_obj.delete_occm_gcp() + print(error) + print(mock_request.mock_calls) + assert error is None + + +@patch('time.sleep') +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_create_occm_gcp_pass(mock_request, get_gcp_token, ignore_sleep, patch_ansible): + set_module_args(set_args_create_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['register_agent'], # register + SRR['empty_good'], # deploy + SRR['get_agent_status_pending'], # status + SRR['get_agent_status_active'], # status + SRR['end_of_sequence'], + ] + my_obj = my_module() + + client_id = my_obj.create_occm_gcp() + print(client_id) + print(mock_request.mock_calls) + assert client_id == '12345' + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_get_deploy_vm_pass(mock_request, get_gcp_token, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['get_vm'], # get + SRR['end_of_sequence'], + ] + my_obj = my_module() + + vm = my_obj.get_deploy_vm() + print(vm) + print(mock_request.mock_calls) + assert vm == SRR['get_vm'][0] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_get_occm_agents_absent_pass(mock_request, get_gcp_token, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['get_agent_status_active'], # get + SRR['end_of_sequence'], + ] + my_obj = my_module() + + agents = my_obj.get_occm_agents() + print(agents) + print(mock_request.mock_calls) + assert agents == [SRR['get_agent_status_active'][0]['agent']] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_get_occm_agents_present_pass(mock_request, get_gcp_token, patch_ansible): + set_module_args(set_args_create_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['get_agents'], # get + SRR['end_of_sequence'], + ] + my_obj = my_module() + + agents = my_obj.get_occm_agents() + print(agents) + print(mock_request.mock_calls) + assert agents == SRR['get_agents'][0]['agents'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_create_idempotent(mock_request, get_gcp_token, patch_ansible): + set_module_args(set_args_create_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['get_vm'], # get + SRR['get_agents'], # get + SRR['end_of_sequence'], + ] + my_obj = my_module() + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print(mock_request.mock_calls) + print(exc) + assert not exc.value.args[0]['changed'] + assert exc.value.args[0]['client_id'] == SRR['get_agents'][0]['agents'][0]['agentId'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +def test_delete_idempotent(mock_request, get_gcp_token, patch_ansible): + set_module_args(set_args_delete_cloudmanager_connector_gcp()) + get_gcp_token.return_value = 'test', None + mock_request.side_effect = [ + SRR['get_token'], # OAUTH + SRR['get_vm_not_found'], # get vn + SRR['get_agent_not_found'], # get agents + SRR['end_of_sequence'], + ] + my_obj = my_module() + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print(mock_request.mock_calls) + print(exc) + assert not exc.value.args[0]['changed'] + assert exc.value.args[0]['client_id'] == "" + + +# @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token') +# @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') +# def test_delete_idempotent(mock_request, get_gcp_token, patch_ansible): +# set_module_args(set_args_delete_cloudmanager_connector_gcp()) +# get_gcp_token.return_value = 'test', None +# mock_request.side_effect = [ +# SRR['get_token'], # OAUTH +# SRR['get_vm_not_found'], # get vn +# SRR['get_agents'], # get +# SRR['end_of_sequence'], +# ] +# my_obj = my_module() + +# with pytest.raises(AnsibleExitJson) as exc: +# my_obj.apply() +# print(mock_request.mock_calls) +# print(exc) +# assert not exc.value.args[0]['changed'] +# assert exc.value.args[0]['client_id'] == SRR['get_agents'][0][0]['agentId'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py new file mode 100644 index 000000000..e3dc685d4 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py @@ -0,0 +1,426 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch + +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws \ + import NetAppCloudManagerCVOAWS as my_module, IMPORT_EXCEPTION + +if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION) + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'client_id': 'test', + 'region': 'us-west-1', + 'use_latest_version': False, + 'ontap_version': 'ONTAP-9.10.0.T1', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'instance_type': 'm5.xlarge', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_create_cloudmanager_cvo_aws(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'client_id': 'test', + 'region': 'us-west-1', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_delete_cloudmanager_cvo_aws(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'test', + 'region': 'us-west-1', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_create_bynode_cloudmanager_cvo_aws(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'client_id': 'test', + 'region': 'us-west-1', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'license_type': 'cot-premium-byol', + 'platform_serial_number': '12345678', + 'is_ha': False + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + set_module_args(self.set_args_create_cloudmanager_cvo_aws()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_aws() + data['is_ha'] = True + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Essential' + data.pop('subnet_id') + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_ha_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_aws() + data['license_type'] = 'capacity-paygo' + data['capacity_package_name'] = 'Essential' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_ha_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_aws() + data['is_ha'] = True + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Essential' + data.pop('subnet_id') + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_ha_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + data = self.set_args_create_bynode_cloudmanager_cvo_aws() + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_aws_ha_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, get_vpc, wait_on_completion, get_token): + data = self.set_args_create_bynode_cloudmanager_cvo_aws() + data['license_type'] = 'ha-cot-premium-byol' + data['platform_serial_number_node1'] = '12345678' + data['platform_serial_number_node2'] = '23456789' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_working_environment_details_by_name.return_value = None, None + get_post_api.return_value = response, None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + get_vpc.return_value = 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_cvo_aws_pass(self, get_delete_api, get_working_environment_details_by_name, + wait_on_completion, get_token): + set_module_args(self.set_args_delete_cloudmanager_cvo_aws()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_cvo = { + 'name': 'test', + 'publicId': 'test'} + get_working_environment_details_by_name.return_value = my_cvo, None + get_delete_api.return_value = None, None, None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_cvo_aws_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + def test_change_cloudmanager_cvo_aws(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password, update_cvo_tags, + update_tier_level, update_instance_license_type, update_writing_speed_state, get_token): + data = self.set_default_args_pass_check() + data['svm_password'] = 'newpassword' + data['update_svm_password'] = True + data['ontap_version'] = 'ONTAP-9.10.1P3.T1' + data['upgrade_ontap_version'] = True + set_module_args(data) + + modify = ['svm_password', 'aws_tag', 'tier_level', 'ontap_version', 'instance_type', 'license_type', 'writing_speed_state'] + + my_cvo = { + 'name': 'TestA', + 'publicId': 'test', + 'cloudProviderName': 'Amazon', + 'svm_password': 'password', + 'isHa': False, + 'svmName': 'svm_TestA', + 'tenantId': 'Tenant-test', + 'workingEnvironmentType': 'VSA', + } + get_cvo.return_value = my_cvo, None + cvo_property = {'name': 'TestA', + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'ontapClusterProperties': { + 'capacityTierInfo': {'tierLevel': 'normal'}, + 'licenseType': {'capacityLimit': {'size': 2.0, 'unit': 'TB'}, + 'name': 'Cloud Volumes ONTAP Capacity Based Charging'}, + 'ontapVersion': '9.10.0', + 'upgradeVersions': [{'autoUpdateAllowed': False, + 'imageVersion': 'ONTAP-9.10.1P3', + 'lastModified': 1634467078000}], + 'writingSpeedState': 'NORMAL'}, + 'awsProperties': {'accountId': u'123456789011', + 'availabilityZones': [u'us-east-1b'], + 'bootDiskSize': None, + 'cloudProviderAccountId': None, + 'coreDiskExists': True, + 'instances': [{'availabilityZone': 'us-east-1b', + 'id': 'i-31', + 'imageId': 'ami-01a6f1234cb1ec375', + 'instanceProfileId': 'SimFabricPoolInstanceProfileId', + 'instanceType': 'm5.2xlarge', + 'isOCCMInstance': False, + 'isVsaInstance': True, + }], + 'regionName': 'us-west-1', + } + } + get_property.return_value = cvo_property, None + cvo_details = {'cloudProviderName': 'Amazon', + 'isHA': False, + 'name': 'TestA', + 'ontapClusterProperties': None, + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'userTags': {'key1': 'value1'}, + 'workingEnvironmentType': 'VSA'} + get_details.return_value = cvo_details, None + get_token.return_value = 'test', 'test' + my_obj = my_module() + + for item in modify: + if item == 'svm_password': + update_svm_password.return_value = True, None + elif item == 'aws_tag': + update_cvo_tags.return_value = True, None + elif item == 'tier_level': + update_tier_level.return_value = True, None + elif item == 'ontap_version': + upgrade_ontap_image.return_value = True, None + elif item == 'writing_speed_state': + update_writing_speed_state.return_value = True, None + elif item == 'instance_type' or item == 'license_type': + update_instance_license_type.return_value = True, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_change_cloudmanager_cvo_aws: %s' % repr(exc.value)) diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py new file mode 100644 index 000000000..f3e072bdb --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py @@ -0,0 +1,439 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_azure \ + import NetAppCloudManagerCVOAZURE as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + # self.token_type, self.token = self.get_token() + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'client_id': 'test', + 'location': 'westus', + 'use_latest_version': False, + 'ontap_version': 'ONTAP-9.10.0.T1.azure', + 'vnet_id': 'vpc-test', + 'resource_group': 'test', + 'subnet_id': 'subnet-test', + 'subscription_id': 'test', + 'cidr': '10.0.0.0/24', + 'svm_password': 'password', + 'license_type': 'azure-cot-standard-paygo', + 'instance_type': 'Standard_DS4_v2', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_create_cloudmanager_cvo_azure(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'client_id': 'test', + 'location': 'westus', + 'vnet_id': 'vpc-test', + 'resource_group': 'test', + 'subscription_id': 'test', + 'cidr': '10.0.0.0/24', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_delete_cloudmanager_cvo_azure(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'test', + 'location': 'westus', + 'vnet_id': 'vpc-test', + 'resource_group': 'test', + 'subscription_id': 'test', + 'cidr': '10.0.0.0/24', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False + }) + + def set_args_create_bynode_cloudmanager_cvo_azure(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'client_id': 'test', + 'location': 'westus', + 'vnet_id': 'vpc-test', + 'resource_group': 'test', + 'subscription_id': 'test', + 'cidr': '10.0.0.0/24', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'license_type': 'azure-cot-premium-byol', + 'serial_number': '12345678', + 'is_ha': False + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + self.rest_api = MockCMConnection() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + set_module_args(self.set_args_create_cloudmanager_cvo_azure()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_azure() + data['license_type'] = 'capacity-paygo' + data['capacity_package_name'] = 'Essential' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_capacity_license_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_ha_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_azure() + data['is_ha'] = True + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Professional' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_ha_capacity_license_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_bynode_cloudmanager_cvo_azure() + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_nodebase_license_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_ha_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_bynode_cloudmanager_cvo_azure() + data['is_ha'] = True + data['license_type'] = 'azure-ha-cot-premium-byol' + data['platform_serial_number_node1'] = '12345678' + data['platform_serial_number_node2'] = '23456789' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_azure_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_azure() + data['is_ha'] = True + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Essential' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_azure_ha_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_cvo_azure_pass(self, get_delete_api, get_working_environment_details_by_name, + wait_on_completion, get_token): + set_module_args(self.set_args_delete_cloudmanager_cvo_azure()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_cvo = { + 'name': 'Dummyname', + 'publicId': 'test'} + get_working_environment_details_by_name.return_value = my_cvo, None + get_delete_api.return_value = None, None, None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_cvo_azure_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + def test_change_cloudmanager_cvo_azure(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password, update_cvo_tags, + update_tier_level, update_instance_license_type, update_writing_speed_state, get_token): + data = self.set_default_args_pass_check() + data['svm_password'] = 'newpassword' + data['update_svm_password'] = True + data['ontap_version'] = 'ONTAP-9.10.1P3.T1.azure' + data['upgrade_ontap_version'] = True + data['instance_type'] = 'Standard_DS13_v2' + set_module_args(data) + + modify = ['svm_password', 'azure_tag', 'tier_level', 'ontap_version', 'instance_type', 'license_type'] + + my_cvo = { + 'name': 'TestA', + 'publicId': 'test', + 'svm_password': 'password', + 'isHA': False, + 'azure_tag': [{'tag_key': 'keya', 'tag_value': 'valuea'}, {'tag_key': 'keyb', 'tag_value': 'valueb'}], + } + get_cvo.return_value = my_cvo, None + + cvo_property = {'name': 'TestA', + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'ontapClusterProperties': { + 'capacityTierInfo': {'tierLevel': 'normal'}, + 'licensePackageName': 'Professional', + 'licenseType': {'capacityLimit': {'size': 2000.0, 'unit': 'TB'}, + 'name': 'Cloud Volumes ONTAP Capacity Based Charging'}, + 'ontapVersion': '9.10.0.T1.azure', + 'upgradeVersions': [{'autoUpdateAllowed': False, + 'imageVersion': 'ONTAP-9.10.1P3', + 'lastModified': 1634467078000}], + 'writingSpeedState': 'NORMAL'}, + 'providerProperties': { + 'cloudProviderAccountId': 'CloudProviderAccount-abcdwxyz', + 'regionName': 'westus', + 'instanceType': 'Standard_DS4_v2', + 'resourceGroup': { + 'name': 'TestA-rg', + 'location': 'westus', + 'tags': { + 'DeployedByOccm': 'true' + } + }, + 'vnetCidr': '10.0.0.0/24', + 'tags': { + 'DeployedByOccm': 'true' + }}, + 'tenantId': 'Tenant-abCdEfg1', + 'workingEnvironmentTyp': 'VSA' + } + get_property.return_value = cvo_property, None + cvo_details = {'cloudProviderName': 'Azure', + 'isHA': False, + 'name': 'TestA', + 'ontapClusterProperties': None, + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'userTags': {'DeployedByOccm': 'true', 'key1': 'value1'}, + 'workingEnvironmentType': 'VSA'} + get_details.return_value = cvo_details, None + get_token.return_value = 'test', 'test' + my_obj = my_module() + + for item in modify: + if item == 'svm_password': + update_svm_password.return_value = True, None + elif item == 'azure_tag': + update_cvo_tags.return_value = True, None + elif item == 'tier_level': + update_tier_level.return_value = True, None + elif item == 'ontap_version': + upgrade_ontap_image.return_value = True, None + elif item == 'writing_speed_state': + update_writing_speed_state.return_value = True, None + elif item == 'instance_type': + update_instance_license_type.return_value = True, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_change_cloudmanager_cvo_azure: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py new file mode 100644 index 000000000..1209d2b9e --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py @@ -0,0 +1,543 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_gcp \ + import NetAppCloudManagerCVOGCP as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + # self.token_type, self.token = self.get_token() + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'TestA', + 'client_id': 'test', + 'zone': 'us-west-1b', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False, + 'gcp_service_account': 'test_account', + 'data_encryption_type': 'GCP', + 'gcp_volume_type': 'pd-ssd', + 'gcp_volume_size': 500, + 'gcp_volume_size_unit': 'GB', + 'project_id': 'default-project', + 'tier_level': 'standard' + }) + + def set_args_create_cloudmanager_cvo_gcp(self): + return dict({ + 'state': 'present', + 'name': 'Dummyname', + 'client_id': 'test', + 'zone': 'us-west1-b', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'use_latest_version': False, + 'capacity_tier': 'cloudStorage', + 'ontap_version': 'ONTAP-9.10.0.T1.gcp', + 'is_ha': False, + 'gcp_service_account': 'test_account', + 'data_encryption_type': 'GCP', + 'gcp_volume_type': 'pd-ssd', + 'gcp_volume_size': 500, + 'gcp_volume_size_unit': 'GB', + 'gcp_labels': [{'label_key': 'key1', 'label_value': 'value1'}, {'label_key': 'keya', 'label_value': 'valuea'}], + 'project_id': 'default-project' + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + self.rest_api = MockCMConnection() + print('Info: %s' % exc.value.args[0]['msg']) + + def set_args_delete_cloudmanager_cvo_gcp(self): + return dict({ + 'state': 'absent', + 'name': 'Dummyname', + 'client_id': 'test', + 'zone': 'us-west-1', + 'vpc_id': 'vpc-test', + 'subnet_id': 'subnet-test', + 'svm_password': 'password', + 'refresh_token': 'myrefresh_token', + 'is_ha': False, + 'gcp_service_account': 'test_account', + 'data_encryption_type': 'GCP', + 'gcp_volume_type': 'pd-ssd', + 'gcp_volume_size': 500, + 'gcp_volume_size_unit': 'GB', + 'project_id': 'project-test' + }) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + set_module_args(self.set_args_create_cloudmanager_cvo_gcp()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['is_ha'] = True + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Essential' + data['subnet0_node_and_data_connectivity'] = 'default' + data['subnet1_cluster_connectivity'] = 'subnet2' + data['subnet2_ha_connectivity'] = 'subnet3' + data['subnet3_data_replication'] = 'subnet1' + data['vpc0_node_and_data_connectivity'] = 'default' + data['vpc1_cluster_connectivity'] = 'vpc2' + data['vpc2_ha_connectivity'] = 'vpc3' + data['vpc3_data_replication'] = 'vpc1' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_ha_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['license_type'] = 'capacity-paygo' + data['capacity_package_name'] = 'Essential' + set_module_args(data) + + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_ha_capacity_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['license_type'] = 'ha-capacity-paygo' + data['capacity_package_name'] = 'Essential' + data['is_ha'] = True + data['subnet0_node_and_data_connectivity'] = 'default' + data['subnet1_cluster_connectivity'] = 'subnet2' + data['subnet2_ha_connectivity'] = 'subnet3' + data['subnet3_data_replication'] = 'subnet1' + data['vpc0_node_and_data_connectivity'] = 'default' + data['vpc1_cluster_connectivity'] = 'vpc2' + data['vpc2_ha_connectivity'] = 'vpc3' + data['vpc3_data_replication'] = 'vpc1' + set_module_args(data) + + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['license_type'] = 'gcp-cot-premium-byol' + data['platform_serial_number'] = '12345678' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post') + def test_create_cloudmanager_cvo_gcp_ha_nodebase_license_pass(self, get_post_api, + get_working_environment_details_by_name, get_nss, + get_tenant, wait_on_completion, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['is_ha'] = True + data['subnet0_node_and_data_connectivity'] = 'default' + data['subnet1_cluster_connectivity'] = 'subnet2' + data['subnet2_ha_connectivity'] = 'subnet3' + data['subnet3_data_replication'] = 'subnet1' + data['vpc0_node_and_data_connectivity'] = 'default' + data['vpc1_cluster_connectivity'] = 'vpc2' + data['vpc2_ha_connectivity'] = 'vpc3' + data['vpc3_data_replication'] = 'vpc1' + data['platform_serial_number_node1'] = '12345678' + data['platform_serial_number_node2'] = '23456789' + data['license_type'] = 'gcp-ha-cot-premium-byol' + set_module_args(data) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'publicId': 'abcdefg12345'} + get_post_api.return_value = response, None, None + get_working_environment_details_by_name.return_value = None, None + get_nss.return_value = 'nss-test', None + get_tenant.return_value = 'test', None + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete') + def test_delete_cloudmanager_cvo_gcp_pass(self, get_delete_api, get_working_environment_details_by_name, + wait_on_completion, get_token): + set_module_args(self.set_args_delete_cloudmanager_cvo_gcp()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + my_cvo = { + 'name': 'Dummyname', + 'publicId': 'test'} + get_working_environment_details_by_name.return_value = my_cvo, None + + get_delete_api.return_value = None, None, 'test' + wait_on_completion.return_value = None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + def test_change_cloudmanager_cvo_gcp(self, get_cvo, get_property, get_details, update_svm_password, update_cvo_tags, + update_tier_level, update_instance_license_type, get_token): + set_module_args(self.set_args_create_cloudmanager_cvo_gcp()) + + modify = ['svm_password', 'gcp_labels', 'tier_level', 'instance_type'] + + my_cvo = { + 'name': 'TestA', + 'publicId': 'test', + 'cloudProviderName': 'GCP', + 'isHA': False, + 'svmName': 'svm_TestA', + 'svm_password': 'password', + 'tenantId': 'Tenant-test', + } + get_cvo.return_value = my_cvo, None + cvo_property = {'name': 'Dummyname', + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'ontapClusterProperties': { + 'capacityTierInfo': {'tierLevel': 'standard'}, + 'licenseType': {'capacityLimit': {'size': 10.0, 'unit': 'TB'}, + 'name': 'Cloud Volumes ONTAP Standard'}, + 'ontapVersion': '9.10.0.T1', + 'writingSpeedState': 'NORMAL'}, + 'providerProperties': { + 'regionName': 'us-west1', + 'zoneName': ['us-west1-b'], + 'instanceType': 'n1-standard-8', + 'labels': {'cloud-ontap-dm': 'anscvogcp-deployment', + 'cloud-ontap-version': '9_10_0_t1', + 'key1': 'value1', + 'platform-serial-number': '90920130000000001020', + 'working-environment-id': 'vsaworkingenvironment-cxxt6zwj'}, + 'subnetCidr': '10.150.0.0/20', + 'projectName': 'default-project'}, + 'svmName': 'svm_Dummyname', + 'tenantId': 'Tenant-test', + 'workingEnvironmentTyp': 'VSA' + } + get_property.return_value = cvo_property, None + cvo_details = {'cloudProviderName': 'GCP', + 'isHA': False, + 'name': 'Dummyname', + 'ontapClusterProperties': None, + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'userTags': {'key1': 'value1'}, + 'workingEnvironmentType': 'VSA'} + get_details.return_value = cvo_details, None + get_token.return_value = 'test', 'test' + my_obj = my_module() + + for item in modify: + if item == 'svm_password': + update_svm_password.return_value = True, None + elif item == 'gcp_labels': + update_cvo_tags.return_value = True, None + elif item == 'tier_level': + update_tier_level.return_value = True, None + elif item == 'instance_type': + update_instance_license_type.return_value = True, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_change_cloudmanager_cvo_gcp: %s' % repr(exc.value)) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + def test_change_cloudmanager_cvo_gcp_ha(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password, + update_cvo_tags, update_tier_level, update_instance_license_type, update_writing_speed_state, get_token): + data = self.set_args_create_cloudmanager_cvo_gcp() + data['is_ha'] = True + data['svm_password'] = 'newpassword' + data['update_svm_password'] = True + data['ontap_version'] = 'ONTAP-9.10.1P3.T1.gcpha' + data['upgrade_ontap_version'] = True + data['subnet0_node_and_data_connectivity'] = 'default' + data['subnet1_cluster_connectivity'] = 'subnet2' + data['subnet2_ha_connectivity'] = 'subnet3' + data['subnet3_data_replication'] = 'subnet1' + data['vpc0_node_and_data_connectivity'] = 'default' + data['vpc1_cluster_connectivity'] = 'vpc2' + data['vpc2_ha_connectivity'] = 'vpc3' + data['vpc3_data_replication'] = 'vpc1' + data['platform_serial_number_node1'] = '12345678' + data['platform_serial_number_node2'] = '23456789' + data['license_type'] = 'gcp-ha-cot-premium-byol' + data['instance_type'] = 'n1-standard-8' + set_module_args(data) + + modify = ['svm_password', 'gcp_labels', 'tier_level', 'ontap_version', 'instance_type', 'license_type'] + + my_cvo = { + 'name': 'TestA', + 'publicId': 'test', + 'cloudProviderName': 'GCP', + 'isHA': True, + 'svmName': 'svm_TestA', + 'svm_password': 'password', + 'tenantId': 'Tenant-test', + } + get_cvo.return_value = my_cvo, None + cvo_property = {'name': 'Dummyname', + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'ontapClusterProperties': { + 'capacityTierInfo': {'tierLevel': 'standard'}, + 'licenseType': {'capacityLimit': {'size': 10.0, 'unit': 'TB'}, + 'name': 'Cloud Volumes ONTAP Standard'}, + 'ontapVersion': '9.10.0.T1', + 'upgradeVersions': [{'autoUpdateAllowed': False, + 'imageVersion': 'ONTAP-9.10.1P3', + 'lastModified': 1634467078000}], + 'writingSpeedState': 'NORMAL'}, + 'providerProperties': { + 'regionName': 'us-west1', + 'zoneName': ['us-west1-b'], + 'instanceType': 'n1-standard-8', + 'labels': {'cloud-ontap-dm': 'anscvogcp-deployment', + 'cloud-ontap-version': '9_10_0_t1', + 'key1': 'value1', + 'platform-serial-number': '90920130000000001020', + 'working-environment-id': 'vsaworkingenvironment-cxxt6zwj'}, + 'subnetCidr': '10.150.0.0/20', + 'projectName': 'default-project'}, + 'svmName': 'svm_Dummyname', + 'tenantId': 'Tenant-test', + 'workingEnvironmentTyp': 'VSA' + } + get_property.return_value = cvo_property, None + cvo_details = {'cloudProviderName': 'GCP', + 'isHA': True, + 'name': 'Dummyname', + 'ontapClusterProperties': None, + 'publicId': 'test', + 'status': {'status': 'ON'}, + 'userTags': {'key1': 'value1', 'partner-platform-serial-number': '90920140000000001019', + 'gcp_resource_id': '14004944518802780827', 'count-down': '3'}, + 'workingEnvironmentType': 'VSA'} + get_details.return_value = cvo_details, None + get_token.return_value = 'test', 'test' + my_obj = my_module() + + for item in modify: + if item == 'svm_password': + update_svm_password.return_value = True, None + elif item == 'gcp_labels': + update_cvo_tags.return_value = True, None + elif item == 'tier_level': + update_tier_level.return_value = True, None + elif item == 'ontap_version': + upgrade_ontap_image.return_value = True, None + elif item == 'writing_speed_state': + update_writing_speed_state.return_value = True, None + elif item == 'instance_type': + update_instance_license_type.return_value = True, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_change_cloudmanager_cvo_gcp: %s' % repr(exc.value)) diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py new file mode 100644 index 000000000..9b417ed1b --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py @@ -0,0 +1,591 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests ONTAP Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_info \ + import NetAppCloudmanagerInfo as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +# using pytest natively, without unittest.TestCase +@pytest.fixture +def patch_ansible(): + with patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) as mocks: + yield mocks + + +def set_default_args_pass_check(patch_ansible): + return dict({ + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + }) + + +def set_args_get_cloudmanager_working_environments_info(): + args = { + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'gather_subsets': ['working_environments_info'] + } + return args + + +def set_args_get_cloudmanager_aggregates_info(): + args = { + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'gather_subsets': ['working_environments_info'] + } + return args + + +def set_args_get_accounts_info(): + args = { + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'myrefresh_token', + 'gather_subsets': ['accounts_info'] + } + return args + + +def test_module_fail_when_required_args_missing(patch_ansible): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environments_info') +def test_get_working_environments_info(working_environments_info, get_token, patch_ansible): + args = dict(set_args_get_cloudmanager_working_environments_info()) + set_module_args(args) + get_token.return_value = 'token_type', 'token' + working_environments_info.return_value = { + "azureVsaWorkingEnvironments": [ + { + "name": "testazure", + "cloudProviderName": "Azure", + "creatorUserEmail": "samlp|NetAppSAML|testuser", + "isHA": False, + "publicId": "VsaWorkingEnvironment-az123456", + "tenantId": "Tenant-2345", + "workingEnvironmentType": "VSA", + } + ], + "gcpVsaWorkingEnvironments": [], + "onPremWorkingEnvironments": [], + "vsaWorkingEnvironments": [ + { + "name": "testAWS", + "cloudProviderName": "Amazon", + "creatorUserEmail": "samlp|NetAppSAML|testuser", + "isHA": False, + "publicId": "VsaWorkingEnvironment-aws12345", + "tenantId": "Tenant-2345", + "workingEnvironmentType": "VSA", + }, + { + "name": "testAWSHA", + "cloudProviderName": "Amazon", + "creatorUserEmail": "samlp|NetAppSAML|testuser", + "isHA": True, + "publicId": "VsaWorkingEnvironment-awsha345", + "tenantId": "Tenant-2345", + "workingEnvironmentType": "VSA", + } + ] + }, None + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_info: %s' % repr(exc.value)) + assert not exc.value.args[0]['changed'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch( + 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_info.NetAppCloudmanagerInfo.get_aggregates_info') +def test_get_aggregates_info(aggregates_info, get_token, patch_ansible): + args = dict(set_args_get_cloudmanager_aggregates_info()) + set_module_args(args) + get_token.return_value = 'token_type', 'token' + aggregates_info.return_value = { + "azureVsaWorkingEnvironments": { + "VsaWorkingEnvironment-az123456": [ + { + "availableCapacity": { + "size": 430.0, + "unit": "GB" + }, + "disks": [ + { + "device": "LUN 3.1", + "name": "testazure-01-1", + "ownerNode": "testazure-01", + "position": "data", + } + ], + "encryptionType": "notEncrypted", + "homeNode": "testazure-01", + "isRoot": False, + "name": "aggr1", + "ownerNode": "testazure-01", + "providerVolumes": [ + { + "device": "1", + "diskType": "Premium_LRS", + "encrypted": False, + "instanceId": "testazureid", + "name": "testazuredatadisk1", + "size": { + "size": 500.0, + "unit": "GB" + }, + "state": "available" + } + ], + "sidlEnabled": False, + "snaplockType": "non_snaplock", + "state": "online", + "totalCapacity": { + "size": 500.0, + "unit": "GB" + }, + "usedCapacity": { + "size": 70.0, + "unit": "GB" + }, + "volumes": [ + { + "isClone": False, + "name": "svm_testazure_root", + "rootVolume": True, + "thinProvisioned": True, + "totalSize": { + "size": 1.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.000339508056640625, + "unit": "GB" + } + }, + { + "isClone": False, + "name": "azv1", + "rootVolume": False, + "thinProvisioned": True, + "totalSize": { + "size": 500.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.0, + "unit": "GB" + } + } + ] + }, + ] + }, + "gcpVsaWorkingEnvironments": {}, + "onPremWorkingEnvironments": {}, + "vsaWorkingEnvironments": { + "VsaWorkingEnvironment-aws12345": [ + { + "availableCapacity": { + "size": 430.0, + "unit": "GB" + }, + "disks": [ + { + "device": "xvdh vol-381", + "name": "testAWSHA-01-i-196h", + "ownerNode": "testAWSHA-01", + "position": "data", + }, + { + "device": "xvdh vol-382", + "name": "testAWSHA-01-i-195h", + "ownerNode": "testAWSHA-01", + "position": "data", + } + ], + "encryptionType": "cloudEncrypted", + "homeNode": "testAWSHA-01", + "isRoot": False, + "name": "aggr1", + "ownerNode": "testAWSHA-01", + "providerVolumes": [ + { + "device": "/dev/xvdh", + "diskType": "gp2", + "encrypted": True, + "id": "vol-381", + "instanceId": "i-196", + "name": "vol-381", + "size": { + "size": 500.0, + "unit": "GB" + }, + "state": "in-use" + }, + { + "device": "/dev/xvdh", + "diskType": "gp2", + "encrypted": True, + "id": "vol-382", + "instanceId": "i-195", + "name": "vol-382", + "size": { + "size": 500.0, + "unit": "GB" + }, + "state": "in-use" + } + ], + "sidlEnabled": True, + "snaplockType": "non_snaplock", + "state": "online", + "totalCapacity": { + "size": 500.0, + "unit": "GB" + }, + "usedCapacity": { + "size": 70.0, + "unit": "GB" + }, + "volumes": [ + { + "isClone": False, + "name": "svm_testAWSHA_root", + "rootVolume": True, + "thinProvisioned": True, + "totalSize": { + "size": 1.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.000339508056640625, + "unit": "GB" + } + }, + { + "isClone": False, + "name": "vha", + "rootVolume": False, + "thinProvisioned": True, + "totalSize": { + "size": 100.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.0, + "unit": "GB" + } + } + ] + } + ], + "VsaWorkingEnvironment-awsha345": [ + { + "availableCapacity": { + "size": 430.0, + "unit": "GB" + }, + "disks": [ + { + "device": "xvdg vol-369", + "name": "testAWS-01-i-190g", + "ownerNode": "testAWS-01", + "position": "data", + } + ], + "encryptionType": "cloudEncrypted", + "homeNode": "testAWS-01", + "isRoot": False, + "name": "aggr1", + "ownerNode": "testAWS-01", + "providerVolumes": [ + { + "device": "/dev/xvdg", + "diskType": "gp2", + "encrypted": True, + "id": "vol-369", + "instanceId": "i-190", + "name": "vol-369", + "size": { + "size": 500.0, + "unit": "GB" + }, + "state": "in-use" + } + ], + "sidlEnabled": True, + "snaplockType": "non_snaplock", + "state": "online", + "totalCapacity": { + "size": 500.0, + "unit": "GB" + }, + "usedCapacity": { + "size": 70.0, + "unit": "GB" + }, + "volumes": [ + { + "isClone": False, + "name": "svm_testAWS_root", + "rootVolume": True, + "thinProvisioned": True, + "totalSize": { + "size": 1.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.000339508056640625, + "unit": "GB" + } + }, + { + "isClone": False, + "name": "v1", + "rootVolume": False, + "thinProvisioned": True, + "totalSize": { + "size": 100.0, + "unit": "GB" + }, + "usedSize": { + "size": 0.0, + "unit": "GB" + } + } + ] + }, + { + "availableCapacity": { + "size": 86.0, + "unit": "GB" + }, + "disks": [ + { + "device": "xvdh vol-371", + "name": "testAWS-01-i-190h", + "ownerNode": "testAWS-01", + "position": "data", + } + ], + "encryptionType": "cloudEncrypted", + "homeNode": "testAWS-01", + "isRoot": False, + "name": "aggr2", + "ownerNode": "testAWS-01", + "providerVolumes": [ + { + "device": "/dev/xvdh", + "diskType": "gp2", + "encrypted": True, + "id": "vol-371", + "instanceId": "i-190", + "name": "vol-371", + "size": { + "size": 100.0, + "unit": "GB" + }, + "state": "in-use" + } + ], + "sidlEnabled": True, + "snaplockType": "non_snaplock", + "state": "online", + "totalCapacity": { + "size": 100.0, + "unit": "GB" + }, + "usedCapacity": { + "size": 0.0, + "unit": "GB" + }, + "volumes": [] + } + ] + } + } + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_info: %s' % repr(exc.value)) + assert not exc.value.args[0]['changed'] + + +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') +@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_accounts_info') +def test_get_accounts_info(accounts_info, get_token, patch_ansible): + args = dict(set_args_get_accounts_info()) + set_module_args(args) + get_token.return_value = 'token_type', 'token' + accounts_info.return_value = { + "awsAccounts": [ + { + "accessKey": "1", + "accountId": "123456789011", + "accountName": "tami", + "accountType": "AWS_KEYS", + "publicId": "CloudProviderAccount-Ekj6L9QX", + "subscriptionId": "hackExp10Days", + "vsaList": [] + }, + { + "accessKey": "", + "accountId": "123456789011", + "accountName": "Instance Profile", + "accountType": "INSTANCE_PROFILE", + "occmRole": "occmRole", + "publicId": "InstanceProfile", + "subscriptionId": "hackExp10Days", + "vsaList": [ + { + "name": "CVO_AWSCluster", + "publicId": "VsaWorkingEnvironment-9m3I6i3I", + "workingEnvironmentType": "AWS" + }, + { + "name": "testAWS1", + "publicId": "VsaWorkingEnvironment-JCzkA9OX", + "workingEnvironmentType": "AWS" + }, + ] + } + ], + "azureAccounts": [ + { + "accountName": "AzureKeys", + "accountType": "AZURE_KEYS", + "applicationId": "1", + "publicId": "CloudProviderAccount-T84ceMYu", + "tenantId": "1", + "vsaList": [ + { + "name": "testAZURE", + "publicId": "VsaWorkingEnvironment-jI0tbceH", + "workingEnvironmentType": "AZURE" + }, + { + "name": "test", + "publicId": "VsaWorkingEnvironment-00EnDcfB", + "workingEnvironmentType": "AZURE" + }, + ] + }, + { + "accountName": "s", + "accountType": "AZURE_KEYS", + "applicationId": "1", + "publicId": "CloudProviderAccount-XxbN95dj", + "tenantId": "1", + "vsaList": [] + } + ], + "gcpStorageAccounts": [], + "nssAccounts": [ + { + "accountName": "TESTCLOUD2", + "accountType": "NSS_KEYS", + "nssUserName": "TESTCLOUD2", + "publicId": "be2f3cac-352a-46b9-a341-a446c35b61c9", + "vsaList": [ + { + "name": "testAWS", + "publicId": "VsaWorkingEnvironment-3txYJOsX", + "workingEnvironmentType": "AWS" + }, + { + "name": "testAZURE", + "publicId": "VsaWorkingEnvironment-jI0tbceH", + "workingEnvironmentType": "AZURE" + }, + ] + }, + { + "accountName": "ntapitdemo", + "accountType": "NSS_KEYS", + "nssUserName": "ntapitdemo", + "publicId": "01e43a7d-cfc9-4682-aa12-15374ce81638", + "vsaList": [ + { + "name": "test", + "publicId": "VsaWorkingEnvironment-00EnDcfB", + "workingEnvironmentType": "AZURE" + } + ] + } + ] + }, None + my_obj = my_module() + my_obj.rest_api.api_root_path = "my_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_info: %s' % repr(exc.value)) + assert not exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py new file mode 100644 index 000000000..a9f41beed --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py @@ -0,0 +1,144 @@ +# (c) 2021, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account \ + import NetAppCloudmanagerNssAccount as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'test_nss_account', + 'username': 'username', + 'password': 'password', + 'client_id': 'client_id', + 'refresh_token': 'refrsh_token' + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.create_nss_account') + def test_create_nss_account_successfully(self, create, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = None + create.return_value = None + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.create_nss_account') + def test_create_nss_account_idempotency(self, create, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = { + 'name': 'test_nss_account', + 'username': 'TESTCLOUD1', + 'password': 'test_test', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'CvMJXRhz5V4dmxZqVg5LDRDlZyE - kbqRKT9YMcAsjmwFs' + } + create.return_value = None + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert not exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.delete_nss_account') + def test_create_nss_account_successfully(self, delete, get, get_token): + args = self.set_default_args_pass_check() + args['state'] = 'absent' + set_module_args(args) + get.return_value = { + 'name': 'test_nss_account', + 'username': 'TESTCLOUD1', + 'password': 'test_test', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'refresh_token': 'CvMJXRhz5V4dmxZqVg5LDRDlZyE - kbqRKT9YMcAsjmwFs' + } + delete.return_value = None + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py new file mode 100644 index 000000000..9d1189489 --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py @@ -0,0 +1,176 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) + +__metaclass__ = type + +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror \ + import NetAppCloudmanagerSnapmirror as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'source_working_environment_name': 'TestA', + 'destination_working_environment_name': 'TestB', + 'source_volume_name': 'source', + 'destination_volume_name': 'dest', + 'source_svm_name': 'source_svm', + 'destination_svm_name': 'dest_svm', + 'policy': 'MirrorAllSnapshots', + 'schedule': 'min', + 'max_transfer_rate': 102400, + 'client_id': 'client_id', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_create_cloudmanager_snapmirror(self): + return dict({ + 'state': 'present', + 'source_working_environment_name': 'TestA', + 'destination_working_environment_name': 'TestB', + 'source_volume_name': 'source', + 'destination_volume_name': 'dest', + 'source_svm_name': 'source_svm', + 'destination_svm_name': 'dest_svm', + 'policy': 'MirrorAllSnapshots', + 'schedule': 'min', + 'max_transfer_rate': 102400, + 'client_id': 'client_id', + 'refresh_token': 'myrefresh_token', + }) + + def set_args_delete_cloudmanager_snapmirror(self): + return dict({ + 'state': 'absent', + 'source_working_environment_name': 'TestA', + 'destination_working_environment_name': 'TestB', + 'source_volume_name': 'source', + 'destination_volume_name': 'dest', + 'client_id': 'client_id', + 'refresh_token': 'myrefresh_token', + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + def test_module_fail_when_required_args_present(self, get_token): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleExitJson) as exc: + set_module_args(self.set_default_args_pass_check()) + get_token.return_value = 'test', 'test' + my_module() + exit_json(changed=True, msg="TestCase Fail when required args are present") + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_detail_for_snapmirror') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_snapmirror') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.build_quote_request') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.quote_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_volumes') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_interclusterlifs') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_cloudmanager_snapmirror_create_pass(self, send_request, get_interclusterlifs, get_volumes, quote_volume, build_quote_request, + get_snapmirror, wait_on_completion, get_working_environment_detail_for_snapmirror, get_token): + set_module_args(self.set_args_create_cloudmanager_snapmirror()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + response = {'id': 'abcdefg12345'} + source_we_info = {'publicId': 'test1', 'workingEnvironmentType': 'AMAZON'} + dest_we_info = {'publicId': 'test2', 'workingEnvironmentType': 'AMAZON', 'svmName': 'source_svm', 'name': 'TestB'} + source_vol = [{'name': 'source', 'svmName': 'source_svm', 'providerVolumeType': 'abc'}] + quote_volume_response = {'numOfDisks': 10, 'aggregateName': 'aggr1'} + interclusterlifs_resp = {'interClusterLifs': [{'address': '10.10.10.10'}], 'peerInterClusterLifs': [{'address': '10.10.10.10'}]} + get_working_environment_detail_for_snapmirror.return_value = source_we_info, dest_we_info, None + send_request.return_value = response, None, None + wait_on_completion.return_value = None + get_snapmirror.return_value = None + get_volumes.return_value = source_vol + build_quote_request.return_value = {'name': 'test'} + quote_volume.return_value = quote_volume_response + get_interclusterlifs.return_value = interclusterlifs_resp + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_create_cloudmanager_snapmirror_create_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_snapmirror') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_delete_cloudmanager_snapmirror_delete_pass(self, send_request, get_snapmirror, get_token): + set_module_args(self.set_args_delete_cloudmanager_snapmirror()) + get_token.return_value = 'test', 'test' + my_obj = my_module() + + my_snapmirror = { + 'source_working_environment_id': '456', + 'destination_svm_name': 'dest_svm', + 'destination_working_environment_id': '123'} + get_snapmirror.return_value = my_snapmirror + send_request.return_value = None, None, None + + with pytest.raises(AnsibleExitJson) as exc: + my_obj.apply() + print('Info: test_delete_cloudmanager_snapmirror_delete_pass: %s' % repr(exc.value)) + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py new file mode 100644 index 000000000..15b4802df --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py @@ -0,0 +1,216 @@ +# (c) 2022, NetApp, Inc +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +''' unit tests Cloudmanager Ansible module: ''' + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type +import json +import sys +import pytest + +from ansible.module_utils import basic +from ansible.module_utils._text import to_bytes +from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest +from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock +import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils +from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume \ + import NetAppCloudmanagerVolume as my_module + +if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5): + pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7') + + +def set_module_args(args): + '''prepare arguments so that they will be picked up during module creation''' + args = json.dumps({'ANSIBLE_MODULE_ARGS': args}) + basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access + + +class AnsibleExitJson(Exception): + '''Exception class to be raised by module.exit_json and caught by the test case''' + + +class AnsibleFailJson(Exception): + '''Exception class to be raised by module.fail_json and caught by the test case''' + + +def exit_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over exit_json; package return data into an exception''' + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): # pylint: disable=unused-argument + '''function to patch over fail_json; package return data into an exception''' + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +class MockCMConnection(): + ''' Mock response of http connections ''' + def __init__(self, kind=None, parm1=None): + self.type = kind + self.parm1 = parm1 + self.xml_in = None + self.xml_out = None + + +class TestMyModule(unittest.TestCase): + ''' a group of related Unit Tests ''' + + def setUp(self): + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def set_default_args_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'testvol', + 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'svm_name': 'svm_justinaws', + 'snapshot_policy_name': 'default', + 'tiering_policy': 'auto', + 'export_policy_type': 'custom', + 'export_policy_ip': ["10.30.0.1/16"], + 'export_policy_nfs_version': ["nfs4"], + 'refresh_token': 'myrefresh_token', + 'size': 10, + }) + + def set_default_args_with_workingenv_name_pass_check(self): + return dict({ + 'state': 'present', + 'name': 'testvol', + 'working_environment_name': 'weone', + 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh', + 'svm_name': 'svm_justinaws', + 'snapshot_policy_name': 'default', + 'export_policy_type': 'custom', + 'export_policy_ip': ["10.30.0.1/16"], + 'export_policy_nfs_version': ["nfs4"], + 'refresh_token': 'myrefresh_token', + 'size': 10, + }) + + def test_module_fail_when_required_args_missing(self): + ''' required arguments are reported as errors ''' + with pytest.raises(AnsibleFailJson) as exc: + set_module_args({}) + my_module() + print('Info: %s' % exc.value.args[0]['msg']) + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.create_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_volume_successfully(self, send_request, create, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = None + create.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_volume_idempotency(self, send_request, get, get_token): + set_module_args(self.set_default_args_pass_check()) + get.return_value = { + 'name': 'testvol', + 'snapshot_policy_name': 'default', + 'export_policy_type': 'custom', + 'export_policy_ip': ["10.30.0.1/16"], + 'export_policy_nfs_version': ["nfs4"], + } + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert not exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.modify_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_update_volume_successfully(self, send_request, get, get_token, modify): + set_module_args(self.set_default_args_pass_check()) + get.return_value = { + 'name': 'testvol', + 'snapshot_policy_name': 'default', + 'tiering_policy': 'snapshot_only', + 'export_policy_type': 'custom', + 'export_policy_ip': ["10.30.0.1/16"], + 'export_policy_nfs_version': ["nfs3", "nfs4"], + } + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)] + get_token.return_value = ("type", "token") + modify.return_value = None + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.modify_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_update_volume_idempotency(self, send_request, get, get_token, modify): + set_module_args(self.set_default_args_pass_check()) + get.return_value = { + 'name': 'testvol', + 'snapshot_policy_name': 'default', + 'export_policy_type': 'custom', + 'export_policy_ip': ["10.30.0.1/16"], + 'export_policy_nfs_version': ["nfs4"], + } + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)] + get_token.return_value = ("type", "token") + modify.return_value = None + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert not exc.value.args[0]['changed'] + + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.create_volume') + @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request') + def test_create_volume_by_workingenv_name_successfully(self, send_request, create, get, get_we, get_token): + args = self.set_default_args_with_workingenv_name_pass_check() + my_we = { + 'name': 'test', + 'publicId': 'test', + 'cloudProviderName': 'Amazon'} + get_we.return_value = my_we, None + args['working_environment_id'] = my_we['publicId'] + set_module_args(args) + get.return_value = None + create.return_value = None + send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)] + get_token.return_value = ("type", "token") + obj = my_module() + obj.rest_api.api_root_path = "test_root_path" + + with pytest.raises(AnsibleExitJson) as exc: + obj.apply() + assert exc.value.args[0]['changed'] diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt b/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt new file mode 100644 index 000000000..484081daa --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt @@ -0,0 +1 @@ +cryptography>=3.2.0 ; python_version >= '3.5'
\ No newline at end of file diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt b/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt new file mode 100644 index 000000000..88c25079f --- /dev/null +++ b/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt @@ -0,0 +1,10 @@ +boto3 ; python_version >= '3.5' +botocore ; python_version >= '3.5' +azure-mgmt-compute ; python_version >= '3.5' +azure-mgmt-network ; python_version >= '3.5' +azure-mgmt-storage ; python_version >= '3.5' +azure-mgmt-resource ; python_version >= '3.5' +azure-cli-core ; python_version >= '3.5' +msrestazure ; python_version >= '3.5' +azure-common ; python_version >= '3.5' +google-auth ; python_version >= '3.5' |