summaryrefslogtreecommitdiffstats
path: root/ansible_collections/netapp/cloudmanager/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'ansible_collections/netapp/cloudmanager/tests/unit')
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py0
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py33
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py122
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py44
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py506
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py578
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py77
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py297
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py165
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py252
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py730
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py178
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py407
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py426
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py439
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py543
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py591
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py144
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py176
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py216
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt1
-rw-r--r--ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt10
22 files changed, 5935 insertions, 0 deletions
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/__init__.py
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py
new file mode 100644
index 000000000..f60ee6782
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/builtins.py
@@ -0,0 +1,33 @@
+# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+#
+# Compat for python2.7
+#
+
+# One unittest needs to import builtins via __import__() so we need to have
+# the string that represents it
+try:
+ import __builtin__
+except ImportError:
+ BUILTINS = 'builtins'
+else:
+ BUILTINS = '__builtin__'
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py
new file mode 100644
index 000000000..0972cd2e8
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/mock.py
@@ -0,0 +1,122 @@
+# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+'''
+Compat module for Python3.x's unittest.mock module
+'''
+import sys
+
+# Python 2.7
+
+# Note: Could use the pypi mock library on python3.x as well as python2.x. It
+# is the same as the python3 stdlib mock library
+
+try:
+ # Allow wildcard import because we really do want to import all of mock's
+ # symbols into this compat shim
+ # pylint: disable=wildcard-import,unused-wildcard-import
+ from unittest.mock import *
+except ImportError:
+ # Python 2
+ # pylint: disable=wildcard-import,unused-wildcard-import
+ try:
+ from mock import *
+ except ImportError:
+ print('You need the mock library installed on python2.x to run tests')
+
+
+# Prior to 3.4.4, mock_open cannot handle binary read_data
+if sys.version_info >= (3,) and sys.version_info < (3, 4, 4):
+ file_spec = None
+
+ def _iterate_read_data(read_data):
+ # Helper for mock_open:
+ # Retrieve lines from read_data via a generator so that separate calls to
+ # readline, read, and readlines are properly interleaved
+ sep = b'\n' if isinstance(read_data, bytes) else '\n'
+ data_as_list = [l + sep for l in read_data.split(sep)]
+
+ if data_as_list[-1] == sep:
+ # If the last line ended in a newline, the list comprehension will have an
+ # extra entry that's just a newline. Remove this.
+ data_as_list = data_as_list[:-1]
+ else:
+ # If there wasn't an extra newline by itself, then the file being
+ # emulated doesn't have a newline to end the last line remove the
+ # newline that our naive format() added
+ data_as_list[-1] = data_as_list[-1][:-1]
+
+ for line in data_as_list:
+ yield line
+
+ def mock_open(mock=None, read_data=''):
+ """
+ A helper function to create a mock to replace the use of `open`. It works
+ for `open` called directly or used as a context manager.
+
+ The `mock` argument is the mock object to configure. If `None` (the
+ default) then a `MagicMock` will be created for you, with the API limited
+ to methods or attributes available on standard file handles.
+
+ `read_data` is a string for the `read` methoddline`, and `readlines` of the
+ file handle to return. This is an empty string by default.
+ """
+ def _readlines_side_effect(*args, **kwargs):
+ if handle.readlines.return_value is not None:
+ return handle.readlines.return_value
+ return list(_data)
+
+ def _read_side_effect(*args, **kwargs):
+ if handle.read.return_value is not None:
+ return handle.read.return_value
+ return type(read_data)().join(_data)
+
+ def _readline_side_effect():
+ if handle.readline.return_value is not None:
+ while True:
+ yield handle.readline.return_value
+ for line in _data:
+ yield line
+
+ global file_spec
+ if file_spec is None:
+ import _io
+ file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
+
+ if mock is None:
+ mock = MagicMock(name='open', spec=open)
+
+ handle = MagicMock(spec=file_spec)
+ handle.__enter__.return_value = handle
+
+ _data = _iterate_read_data(read_data)
+
+ handle.write.return_value = None
+ handle.read.return_value = None
+ handle.readline.return_value = None
+ handle.readlines.return_value = None
+
+ handle.read.side_effect = _read_side_effect
+ handle.readline.side_effect = _readline_side_effect()
+ handle.readlines.side_effect = _readlines_side_effect
+
+ mock.return_value = handle
+ return mock
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py b/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py
new file mode 100644
index 000000000..73a20cf8c
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/compat/unittest.py
@@ -0,0 +1,44 @@
+# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
+#
+# This file is part of Ansible
+#
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
+
+# Make coding more python3-ish
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+'''
+Compat module for Python2.7's unittest module
+'''
+
+import sys
+
+import pytest
+
+# Allow wildcard import because we really do want to import all of
+# unittests's symbols into this compat shim
+# pylint: disable=wildcard-import,unused-wildcard-import
+if sys.version_info < (2, 7):
+ try:
+ # Need unittest2 on python2.6
+ from unittest2 import *
+ except ImportError:
+ print('You need unittest2 installed on python2.6.x to run tests')
+
+ class TestCase:
+ """ skip everything """
+ pytestmark = pytest.mark.skip('Skipping Unit Tests on 2.6 as unittest2 may not be available')
+else:
+ from unittest import *
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py
new file mode 100644
index 000000000..959cbaef5
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp.py
@@ -0,0 +1,506 @@
+# This code is part of Ansible, but is an independent component.
+# This particular file snippet, and this file snippet only, is BSD licensed.
+# Modules you write using this snippet, which is embedded dynamically by Ansible
+# still belong to the author of the module, and may assign their own license
+# to the complete work.
+#
+# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+""" unit tests for module_utils netapp.py
+
+ Provides wrappers for cloudmanager REST APIs
+"""
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
+# import copy # for deepcopy
+import json
+import pytest
+import sys
+try:
+ import requests.exceptions
+ HAS_REQUESTS_EXC = True
+except ImportError:
+ HAS_REQUESTS_EXC = False
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp import CloudManagerRestAPI
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+
+if (not netapp_utils.HAS_REQUESTS or not HAS_REQUESTS_EXC) and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ """prepare arguments so that they will be picked up during module creation"""
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleFailJson(Exception):
+ """Exception class to be raised by module.fail_json and caught by the test case"""
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ """function to patch over fail_json; package return data into an exception"""
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockModule():
+ ''' rough mock for an Ansible module class '''
+ def __init__(self):
+ self.params = {}
+
+ def fail_json(self, *args, **kwargs): # pylint: disable=unused-argument
+ """function to simulate fail_json: package return data into an exception"""
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class mockResponse:
+ def __init__(self, json_data, status_code, headers=None, raise_action=None):
+ self.json_data = json_data
+ self.status_code = status_code
+ self.content = json_data
+ self.headers = headers or {}
+ self.raise_action = raise_action
+
+ def raise_for_status(self):
+ pass
+
+ def json(self):
+ if self.raise_action == 'bad_json':
+ raise ValueError(self.raise_action)
+ return self.json_data
+
+
+def create_module(args):
+ argument_spec = netapp_utils.cloudmanager_host_argument_spec()
+ set_module_args(args)
+ module = basic.AnsibleModule(argument_spec)
+ module.fail_json = fail_json
+ return module
+
+
+def create_restapi_object(args):
+ module = create_module(args)
+ return netapp_utils.CloudManagerRestAPI(module)
+
+
+def mock_args(feature_flags=None, client_id=None):
+ args = {
+ 'refresh_token': 'ABCDEFGS'
+ }
+ if feature_flags is not None:
+ args['feature_flags'] = feature_flags
+ if client_id is not None:
+ args['client_id'] = client_id
+ return args
+
+
+TOKEN_DICT = {
+ 'access_token': 'access_token',
+ 'token_type': 'token_type'
+}
+
+
+def test_missing_params():
+ module = MockModule()
+ with pytest.raises(KeyError) as exc:
+ netapp_utils.CloudManagerRestAPI(module)
+ assert exc.match('refresh_token')
+
+
+@patch('requests.request')
+def test_get_token_refresh(mock_request):
+ ''' successfully get token using refresh token '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ ]
+ # get_token is called when the object is created
+ rest_api = create_restapi_object(mock_args())
+ print(rest_api.token_type, rest_api.token)
+ assert rest_api.token_type == TOKEN_DICT['token_type']
+ assert rest_api.token == TOKEN_DICT['access_token']
+
+
+@patch('requests.request')
+def test_negative_get_token_none(mock_request):
+ ''' missing refresh token and Service Account '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ ]
+ # get_token is called when the object is created
+ args = dict(mock_args())
+ args.pop('refresh_token')
+ # get_token is called when the object is created
+ with pytest.raises(AnsibleFailJson) as exc:
+ rest_api = create_restapi_object(args)
+ msg = 'Missing refresh_token or sa_client_id and sa_secret_key'
+ assert msg in exc.value.args[0]['msg']
+
+
+@patch('requests.request')
+def test_get_token_sa(mock_request):
+ ''' successfully get token using Service Account '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ ]
+ # get_token is called when the object is created
+ args = dict(mock_args())
+ args.pop('refresh_token')
+ args['sa_client_id'] = '123'
+ args['sa_secret_key'] = 'a1b2c3'
+ rest_api = create_restapi_object(args)
+ print(rest_api.token_type, rest_api.token)
+ assert rest_api.token_type == TOKEN_DICT['token_type']
+ assert rest_api.token == TOKEN_DICT['access_token']
+
+
+@patch('requests.request')
+def test_negative_get_token(mock_request):
+ ''' error on OAUTH request '''
+ mock_request.side_effect = [
+ mockResponse(json_data={'message': 'error message'}, status_code=206)
+ ]
+ # get_token is called when the object is created
+ with pytest.raises(AnsibleFailJson) as exc:
+ rest_api = create_restapi_object(mock_args())
+ msg = 'Error acquiring token: error message'
+ assert msg in exc.value.args[0]['msg']
+
+
+@patch('requests.request')
+def test_get_json(mock_request):
+ ''' get with no data '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'})
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message == {'key': 'value'}
+ assert error is None
+ assert ocr == 'OCR_id'
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_get_retries(mock_request, dont_sleep):
+ ''' get with no data '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ requests.exceptions.ConnectionError('Max retries exceeded with url:'),
+ requests.exceptions.ConnectionError('Max retries exceeded with url:'),
+ mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'})
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message == {'key': 'value'}
+ assert error is None
+ assert ocr == 'OCR_id'
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_get_retries_exceeded(mock_request, dont_sleep):
+ ''' get with no data '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ requests.exceptions.ConnectionError('Max retries exceeded with url:'),
+ requests.exceptions.ConnectionError('Max retries exceeded with url:'),
+ requests.exceptions.ConnectionError('Max retries exceeded with url:'),
+ mockResponse(json_data={'key': 'value'}, status_code=200, headers={'OnCloud-Request-Id': 'OCR_id'})
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert 'Max retries exceeded with url:' in error
+
+
+@patch('requests.request')
+def test_empty_get_sent_bad_json(mock_request):
+ ''' get with invalid json '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data='anything', status_code=200, raise_action='bad_json')
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message is None
+ assert error is None
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_empty_get_sent_203(mock_request):
+ ''' get with no data and 203 status code '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={}, status_code=203)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message == {}
+ assert error is None
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_negative_get_sent_203(mock_request):
+ ''' get with 203 status code - not sure we should error out here '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'message': 'error message'}, status_code=203)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message == {'message': 'error message'}
+ assert error == 'error message'
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_negative_get_sent_300(mock_request):
+ ''' get with 300 status code - 300 indicates an error '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={}, status_code=300)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message == {}
+ assert error == '300'
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_negative_get_raise_http_exc(mock_request):
+ ''' get with HTTPError exception '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ requests.exceptions.HTTPError('some exception')
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message is None
+ assert error == 'some exception'
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_negative_get_raise_conn_exc(mock_request):
+ ''' get with ConnectionError exception '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ requests.exceptions.ConnectionError('some exception')
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message is None
+ assert error == 'some exception'
+ assert ocr is None
+
+
+@patch('requests.request')
+def test_negative_get_raise_oserror_exc(mock_request):
+ ''' get with a general exception '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception')
+ ]
+ rest_api = create_restapi_object(mock_args())
+ message, error, ocr = rest_api.get('api', None)
+ print(message, error, ocr)
+ assert message is None
+ assert error == 'some exception'
+ assert ocr is None
+
+
+def test_has_feature_success_default():
+ ''' existing feature_flag with default '''
+ flag = 'show_modified'
+ module = create_module(mock_args())
+ value = netapp_utils.has_feature(module, flag)
+ assert value
+
+
+def test_has_feature_success_user_true():
+ ''' existing feature_flag with value set to True '''
+ flag = 'user_deprecation_warning'
+ args = dict(mock_args({flag: True}))
+ module = create_module(args)
+ value = netapp_utils.has_feature(module, flag)
+ assert value
+
+
+def test_has_feature_success_user_false():
+ ''' existing feature_flag with value set to False '''
+ flag = 'user_deprecation_warning'
+ args = dict(mock_args({flag: False}))
+ print(args)
+ module = create_module(args)
+ value = netapp_utils.has_feature(module, flag)
+ assert not value
+
+
+def test_has_feature_invalid_key():
+ ''' existing feature_flag with unknown key '''
+ flag = 'deprecation_warning_bad_key'
+ module = create_module(mock_args())
+ with pytest.raises(AnsibleFailJson) as exc:
+ netapp_utils.has_feature(module, flag)
+ msg = 'Internal error: unexpected feature flag: %s' % flag
+ assert exc.value.args[0]['msg'] == msg
+
+
+def test_has_feature_invalid_bool():
+ ''' existing feature_flag with non boolean value '''
+ flag = 'deprecation_warning_key'
+ module = create_module(mock_args({flag: 'str'}))
+ with pytest.raises(AnsibleFailJson) as exc:
+ netapp_utils.has_feature(module, flag)
+ msg = "Error: expected bool type for feature flag"
+ assert msg in exc.value.args[0]['msg']
+
+
+STATUS_DICT = {
+ 'status': 1,
+ 'error': None
+}
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_check_task_status(mock_request, mock_sleep):
+ ''' successful get with 2 retries '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ mockResponse(json_data=STATUS_DICT, status_code=200)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ status, error_msg, error = rest_api.check_task_status('api')
+ assert status == STATUS_DICT['status']
+ assert error_msg == STATUS_DICT['error']
+ assert error is None
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_negative_check_task_status(mock_request, mock_sleep):
+ ''' get with 4 failed retries '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ requests.exceptions.HTTPError('some exception'),
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ status, error_msg, error = rest_api.check_task_status('api')
+ assert status == 0
+ assert error_msg == ''
+ assert error == 'some exception'
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_wait_on_completion(mock_request, mock_sleep):
+ ''' successful get with 2 retries '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ mockResponse(json_data=STATUS_DICT, status_code=200)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1)
+ assert error is None
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_negative_wait_on_completion_failure(mock_request, mock_sleep):
+ ''' successful get with 2 retries, but status is -1 '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ mockResponse(json_data={'status': -1, 'error': 'task_error'}, status_code=200)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1)
+ assert error == 'Failed to task action, error: task_error'
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_negative_wait_on_completion_error(mock_request, mock_sleep):
+ ''' get with 4 failed retries '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ requests.exceptions.HTTPError('some http exception'),
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1)
+ assert error == 'some http exception'
+
+
+@patch('time.sleep')
+@patch('requests.request')
+def test_negative_wait_on_completion_timeout(mock_request, mock_sleep):
+ ''' successful get with 2 retries, but status is 0 '''
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ OSError('some exception'),
+ requests.exceptions.ConnectionError('some exception'),
+ mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200),
+ mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200),
+ mockResponse(json_data={'status': 0, 'error': 'task_error'}, status_code=200)
+ ]
+ rest_api = create_restapi_object(mock_args())
+ rest_api.module.params['client_id'] = '123'
+ error = rest_api.wait_on_completion('api', 'action', 'task', 2, 1)
+ assert error == 'Taking too long for action to task or not properly setup'
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py
new file mode 100644
index 000000000..33041f64f
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module.py
@@ -0,0 +1,578 @@
+# This code is part of Ansible, but is an independent component.
+# This particular file snippet, and this file snippet only, is BSD licensed.
+# Modules you write using this snippet, which is embedded dynamically by Ansible
+# still belong to the author of the module, and may assign their own license
+# to the complete work.
+#
+# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+""" unit tests for module_utils netapp.py
+
+ Provides wrappers for cloudmanager REST APIs
+"""
+
+from __future__ import (absolute_import, division, print_function)
+from logging import error
+__metaclass__ = type
+
+# import copy # for deepcopy
+import json
+import sys
+import pytest
+try:
+ import requests.exceptions
+ HAS_REQUESTS_EXC = True
+except ImportError:
+ HAS_REQUESTS_EXC = False
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp import CloudManagerRestAPI
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module import cmp as nm_cmp, NetAppModule
+if (not netapp_utils.HAS_REQUESTS or not HAS_REQUESTS_EXC) and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ """prepare arguments so that they will be picked up during module creation"""
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class mockResponse:
+ def __init__(self, json_data, status_code, headers=None):
+ self.json_data = json_data
+ self.status_code = status_code
+ self.content = json_data
+ self.headers = headers or {}
+
+ def json(self):
+ return self.json_data
+
+
+def create_module(args):
+ argument_spec = netapp_utils.cloudmanager_host_argument_spec()
+ set_module_args(args)
+ module = basic.AnsibleModule(argument_spec)
+ return module
+
+
+def create_restapi_object(args):
+ module = create_module(args)
+ return netapp_utils.CloudManagerRestAPI(module)
+
+
+def mock_args(feature_flags=None, client_id=None):
+ args = {
+ 'refresh_token': 'ABCDEFGS'
+ }
+ return args
+
+
+TOKEN_DICT = {
+ 'access_token': 'access_token',
+ 'token_type': 'token_type'
+}
+
+
+def test_cmp():
+ assert nm_cmp(None, 'x') == -1
+ assert nm_cmp('y', 'x') == 1
+ assert nm_cmp('y', 'X') == 1
+ assert nm_cmp(['x', 'y'], ['x', 'X']) == 1
+ assert nm_cmp(['x', 'x'], ['x', 'X']) == 0
+
+
+def test_set_parameters():
+ helper = NetAppModule()
+ helper.set_parameters({'a': None, 'b': 'b'})
+ assert 'a' not in helper.parameters
+ assert 'b' in helper.parameters
+
+
+def test_cd_action():
+ desired = {}
+ helper = NetAppModule()
+ assert helper.get_cd_action(None, desired) == 'create'
+ desired['state'] = 'present'
+ assert helper.get_cd_action(None, desired) == 'create'
+ assert helper.get_cd_action({}, desired) is None
+ desired['state'] = 'absent'
+ assert helper.get_cd_action(None, desired) is None
+ assert helper.get_cd_action({}, desired) == 'delete'
+
+
+def test_compare_and_update_values():
+ current = {'a': 'a', 'b': 'b'}
+ desired = {}
+ desired_key = []
+ helper = NetAppModule()
+ assert helper.compare_and_update_values(current, desired, desired_key) == ({}, False)
+ desired_key = ['a']
+ assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'a'}, False)
+ desired = {'a': 'a'}
+ assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'a'}, False)
+ desired = {'a': 'c'}
+ assert helper.compare_and_update_values(current, desired, desired_key) == ({'a': 'c'}, True)
+
+
+@patch('requests.request')
+def test_get_working_environments_info(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'a': 'b'}, status_code=200),
+ mockResponse(json_data={'c': 'd'}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_working_environments_info(rest_api, '') == ({'a': 'b'}, None)
+ assert helper.get_working_environments_info(rest_api, '') == ({'c': 'd'}, '500')
+
+
+def test_look_up_working_environment_by_name_in_list():
+ we_list = [{'name': 'bob', 'b': 'b'}, {'name': 'chuck', 'c': 'c'}]
+ helper = NetAppModule()
+ assert helper.look_up_working_environment_by_name_in_list(we_list, 'bob') == (we_list[0], None)
+ error = "look_up_working_environment_by_name_in_list: Working environment not found"
+ assert helper.look_up_working_environment_by_name_in_list(we_list, 'alice') == (None, error)
+
+
+@patch('requests.request')
+def test_get_working_environment_details_by_name(mock_request):
+ we_list = [{'name': 'bob', 'b': 'b'}, {'name': 'chuck', 'c': 'c'}]
+ json_data = {'onPremWorkingEnvironments': [],
+ 'gcpVsaWorkingEnvironments': [],
+ 'azureVsaWorkingEnvironments': [],
+ 'vsaWorkingEnvironments': []
+ }
+ json_data_onprem = dict(json_data)
+ json_data_onprem['onPremWorkingEnvironments'] = we_list
+ json_data_gcp = dict(json_data)
+ json_data_gcp['gcpVsaWorkingEnvironments'] = we_list
+ json_data_azure = dict(json_data)
+ json_data_azure['azureVsaWorkingEnvironments'] = we_list
+ json_data_aws = dict(json_data)
+ json_data_aws['vsaWorkingEnvironments'] = we_list
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'a': 'b'}, status_code=500), # exists
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data={'c': 'd'}, status_code=400), # get all
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data=json_data_onprem, status_code=200), # get all
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data=json_data_gcp, status_code=200), # get all
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data=json_data_azure, status_code=200), # get all
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data=json_data_aws, status_code=200), # get all
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists
+ mockResponse(json_data=json_data, status_code=200), # get all
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'name') == (None, '500')
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'name') == (None, '400')
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None)
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None)
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None)
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (we_list[0], None)
+ error = "get_working_environment_details_by_name: Working environment not found"
+ assert helper.get_working_environment_details_by_name(rest_api, '', 'bob') == (None, error)
+
+
+@patch('requests.request')
+def test_get_working_environment_details(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'key': [{'a': 'b'}]}, status_code=200),
+ mockResponse(json_data={'key': [{'c': 'd'}]}, status_code=500)
+ ]
+ helper = NetAppModule()
+ args = dict(mock_args())
+ rest_api = create_restapi_object(args)
+ helper.parameters['working_environment_id'] = 'test_we'
+ assert helper.get_working_environment_details(rest_api, '') == ({'key': [{'a': 'b'}]}, None)
+ error = "Error: get_working_environment_details 500"
+ assert helper.get_working_environment_details(rest_api, '') == (None, error)
+
+
+@patch('requests.request')
+def test_get_working_environment_detail_for_snapmirror(mock_request):
+ json_data = {'onPremWorkingEnvironments': [],
+ 'gcpVsaWorkingEnvironments': [],
+ 'azureVsaWorkingEnvironments': [],
+ 'vsaWorkingEnvironments': []
+ }
+ json_data_source = dict(json_data)
+ json_data_source['onPremWorkingEnvironments'] = [{'name': 'test_we_s'}]
+ json_data_destination = dict(json_data)
+ json_data_destination['onPremWorkingEnvironments'] = [{'name': 'test_we_d'}]
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ # by id, first test
+ mockResponse(json_data={'key': [{'publicId': 'test_we_s'}]}, status_code=200), # env details source
+ mockResponse(json_data={'key': [{'publicId': 'test_we_d'}]}, status_code=200), # env details dest
+ # by id, second test
+ mockResponse(json_data={'key': [{'c': 'd'}]}, status_code=500), # error source
+ # by id, third test
+ mockResponse(json_data={'key': [{'publicId': 'test_we_s'}]}, status_code=200), # env details source
+ mockResponse(json_data={'key': [{'e': 'f'}]}, status_code=500), # error source
+ # by name, first test
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists source
+ mockResponse(json_data=json_data_source, status_code=200), # env details source
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists dest
+ mockResponse(json_data=json_data_destination, status_code=200), # env details dest
+ # by name, second test
+ mockResponse(json_data={'key': {'c': 'd'}}, status_code=500), # error source
+ # by name, third test
+ mockResponse(json_data={'a': 'b'}, status_code=200), # exists source
+ mockResponse(json_data=json_data_source, status_code=200), # env details source
+ mockResponse(json_data={'key': {'e': 'f'}}, status_code=500), # error source
+ ]
+ helper = NetAppModule()
+ args = dict(mock_args())
+ rest_api = create_restapi_object(args)
+ # search by id
+ helper.parameters['source_working_environment_id'] = 'test_we_s'
+ helper.parameters['destination_working_environment_id'] = 'test_we_d'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == ({'publicId': 'test_we_s'}, {'publicId': 'test_we_d'}, None)
+ error = "Error getting WE info: 500: {'key': [{'c': 'd'}]}"
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+ error = "Error getting WE info: 500: {'key': [{'e': 'f'}]}"
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+ # search by name
+ del helper.parameters['source_working_environment_id']
+ del helper.parameters['destination_working_environment_id']
+ helper.parameters['source_working_environment_name'] = 'test_we_s'
+ helper.parameters['destination_working_environment_name'] = 'test_we_d'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == ({'name': 'test_we_s'}, {'name': 'test_we_d'}, None)
+ error = '500'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+ error = '500'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+ # no destination id nor name
+ del helper.parameters['destination_working_environment_name']
+ error = 'Cannot find working environment by destination_working_environment_id or destination_working_environment_name'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+ # no source id nor name
+ del helper.parameters['source_working_environment_name']
+ error = 'Cannot find working environment by source_working_environment_id or source_working_environment_name'
+ assert helper.get_working_environment_detail_for_snapmirror(rest_api, '') == (None, None, error)
+
+
+def test_create_account():
+ helper = NetAppModule()
+ error = "Error: creating an account is not supported."
+ assert helper.create_account("rest_api") == (None, error)
+
+
+@patch('requests.request')
+def test_get_or_create_account(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200),
+ mockResponse(json_data=[], status_code=200),
+ mockResponse(json_data={'c': 'd'}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_or_create_account(rest_api) == ('account_id', None)
+ error = 'Error: account cannot be located - check credentials or provide account_id.'
+ assert helper.get_or_create_account(rest_api) == (None, error)
+ error = '500'
+ assert helper.get_or_create_account(rest_api) == (None, error)
+
+
+@patch('requests.request')
+def test_get_account_info(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200),
+ mockResponse(json_data=[], status_code=200),
+ mockResponse(json_data={'c': 'd'}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_account_info(rest_api, '') == ([{'accountPublicId': 'account_id'}], None)
+ assert helper.get_account_info(rest_api, '') == ([], None)
+ assert helper.get_account_info(rest_api, '') == (None, '500')
+
+
+@patch('requests.request')
+def test_get_account_id(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200),
+ mockResponse(json_data=[], status_code=200),
+ mockResponse(json_data={'c': 'd'}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_account_id(rest_api) == ('account_id', None)
+ error = 'Error: no account found - check credentials or provide account_id.'
+ assert helper.get_account_id(rest_api) == (None, error)
+ error = '500'
+ assert helper.get_account_id(rest_api) == (None, error)
+
+
+@patch('requests.request')
+def test_get_accounts_info(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200),
+ mockResponse(json_data={'c': 'd'}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_accounts_info(rest_api, '') == ([{'accountPublicId': 'account_id'}], None)
+ error = '500'
+ assert helper.get_accounts_info(rest_api, '') == (None, error)
+
+
+def test_set_api_root_path():
+ helper = NetAppModule()
+ helper.parameters['working_environment_id'] = 'abc'
+ working_environment_details = {'cloudProviderName': 'Amazon', 'isHA': False}
+ helper.set_api_root_path(working_environment_details, helper)
+ assert helper.api_root_path == '/occm/api/vsa'
+ working_environment_details = {'cloudProviderName': 'Other', 'isHA': False}
+ helper.set_api_root_path(working_environment_details, helper)
+ assert helper.api_root_path == '/occm/api/other/vsa'
+ working_environment_details = {'cloudProviderName': 'Other', 'isHA': True}
+ helper.set_api_root_path(working_environment_details, helper)
+ assert helper.api_root_path == '/occm/api/other/ha'
+
+
+@patch('requests.request')
+def test_get_occm_agents_by_account(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'a': 'b'}], status_code=200),
+ mockResponse(json_data=[{'c': 'd'}], status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_occm_agents_by_account(rest_api, '') == ([{'a': 'b'}], None)
+ error = '500'
+ assert helper.get_occm_agents_by_account(rest_api, '') == ([{'c': 'd'}], error)
+
+
+@patch('requests.request')
+def test_get_occm_agents_by_name(mock_request):
+ json_data = {'agents':
+ [{'name': '', 'provider': ''},
+ {'name': 'a1', 'provider': 'p1'},
+ {'name': 'a1', 'provider': 'p1'},
+ {'name': 'a1', 'provider': 'p2'},
+ {'name': 'a2', 'provider': 'p1'},
+ ]}
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=json_data, status_code=200),
+ mockResponse(json_data=json_data, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ expected = [agent for agent in json_data['agents'] if agent['name'] == 'a1' and agent['provider'] == 'p1']
+ assert helper.get_occm_agents_by_name(rest_api, 'account', 'a1', 'p1') == (expected, None)
+ error = '500'
+ assert helper.get_occm_agents_by_name(rest_api, 'account', 'a1', 'p1') == (expected, error)
+
+
+@patch('requests.request')
+def test_get_agents_info(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id
+ mockResponse(json_data=[{'a': 'b'}], status_code=200),
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id
+ mockResponse(json_data=[{'c': 'd'}], status_code=500),
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=400), # get account_id
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_agents_info(rest_api, '') == ([{'a': 'b'}], None)
+ error = '500'
+ assert helper.get_agents_info(rest_api, '') == ([{'c': 'd'}], error)
+ error = '400'
+ assert helper.get_agents_info(rest_api, '') == (None, error)
+
+
+@patch('requests.request')
+def test_get_active_agents_info(mock_request):
+ json_data = {'agents':
+ [{'name': '', 'provider': '', 'agentId': 1, 'status': ''},
+ {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'},
+ {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': ''},
+ {'name': 'a1', 'provider': 'p2', 'agentId': 1, 'status': 'active'},
+ {'name': 'a2', 'provider': 'p1', 'agentId': 1, 'status': 'active'},
+ ]}
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id
+ mockResponse(json_data=json_data, status_code=200),
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=200), # get account_id
+ mockResponse(json_data=json_data, status_code=500),
+ mockResponse(json_data=[{'accountPublicId': 'account_id'}], status_code=400), # get account_id
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ active = [agent for agent in json_data['agents'] if agent['status'] == 'active']
+ expected = [{'name': agent['name'], 'client_id': agent['agentId'], 'provider': agent['provider']} for agent in active]
+ assert helper.get_active_agents_info(rest_api, '') == (expected, None)
+ error = '500'
+ assert helper.get_active_agents_info(rest_api, '') == (expected, error)
+ error = '400'
+ assert helper.get_active_agents_info(rest_api, '') == (None, error)
+
+
+@patch('requests.request')
+def test_get_occm_agent_by_id(mock_request):
+ json_data = {'agent':
+ {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'}
+ }
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=json_data, status_code=200),
+ mockResponse(json_data=json_data, status_code=500),
+ mockResponse(json_data={'a': 'b'}, status_code=500),
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ expected = json_data['agent']
+ assert helper.get_occm_agent_by_id(rest_api, '') == (expected, None)
+ error = '500'
+ assert helper.get_occm_agent_by_id(rest_api, '') == (expected, error)
+ assert helper.get_occm_agent_by_id(rest_api, '') == ({'a': 'b'}, error)
+
+
+@patch('requests.request')
+def test_check_occm_status(mock_request):
+ json_data = {'agent':
+ {'name': 'a1', 'provider': 'p1', 'agentId': 1, 'status': 'active'}
+ }
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=json_data, status_code=200),
+ mockResponse(json_data=json_data, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ expected = json_data
+ assert helper.check_occm_status(rest_api, '') == (expected, None)
+ error = '500'
+ assert helper.check_occm_status(rest_api, '') == (expected, error)
+
+
+@patch('requests.request')
+def test_register_agent_to_service(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={}, status_code=200),
+ mockResponse(json_data={}, status_code=200),
+ mockResponse(json_data={}, status_code=500)
+ ]
+ helper = NetAppModule()
+ rest_api = create_restapi_object(mock_args())
+ helper.parameters['account_id'] = 'account_id'
+ helper.parameters['company'] = 'company'
+ helper.parameters['region'] = 'region'
+ helper.parameters['subnet_id'] = 'subnet_id'
+ expected = {}
+ assert helper.register_agent_to_service(rest_api, 'provider', 'vpc') == (expected, None)
+ args, kwargs = mock_request.call_args
+ body = kwargs['json']
+ assert 'placement' in body
+ assert 'network' in body['placement']
+ assert body['placement']['network'] == 'vpc'
+ body_other = body
+ assert helper.register_agent_to_service(rest_api, 'AWS', 'vpc') == (expected, None)
+ args, kwargs = mock_request.call_args
+ body = kwargs['json']
+ assert 'placement' in body
+ assert 'network' in body['placement']
+ assert body['placement']['network'] == 'vpc'
+ assert body_other != body
+ body['placement']['provider'] = 'provider'
+ assert body_other == body
+ error = '500'
+ assert helper.register_agent_to_service(rest_api, 'provider', 'vpc') == (expected, error)
+
+
+@patch('requests.request')
+def test_delete_occm(mock_request):
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'result': 'any'}, status_code=200),
+ mockResponse(json_data={'result': 'any'}, status_code=500),
+ ]
+ helper = NetAppModule()
+ helper.parameters['account_id'] = 'account_id'
+ rest_api = create_restapi_object(mock_args())
+ assert helper.delete_occm(rest_api, '') == ({'result': 'any'}, None)
+ error = '500'
+ assert helper.delete_occm(rest_api, '') == ({'result': 'any'}, error)
+
+
+@patch('requests.request')
+def test_delete_occm_agents(mock_request):
+ agents = [{'agentId': 'a1'},
+ {'agentId': 'a2'}]
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data={'result': 'any'}, status_code=200), # a1
+ mockResponse(json_data={'result': 'any'}, status_code=200), # a2
+ mockResponse(json_data={'result': 'any'}, status_code=500), # a1
+ mockResponse(json_data={'result': 'any'}, status_code=200), # a2
+ mockResponse(json_data={'result': 'any'}, status_code=200), # a1
+ mockResponse(json_data={'result': 'any'}, status_code=200), # a2
+ ]
+ helper = NetAppModule()
+ helper.parameters['account_id'] = 'account_id'
+ rest_api = create_restapi_object(mock_args())
+ assert helper.delete_occm_agents(rest_api, agents) == []
+ error = '500'
+ assert helper.delete_occm_agents(rest_api, agents) == [({'result': 'any'}, error)]
+ agents.append({'a': 'b'})
+ error = "unexpected agent contents: {'a': 'b'}"
+ assert helper.delete_occm_agents(rest_api, agents) == [(None, error)]
+
+
+@patch('requests.request')
+def test_get_tenant(mock_request):
+ tenants = [{'publicId': 'a1'},
+ {'publicId': 'a2'}]
+ mock_request.side_effect = [
+ mockResponse(json_data=TOKEN_DICT, status_code=200), # OAUTH
+ mockResponse(json_data=tenants, status_code=200), # get success
+ mockResponse(json_data={'result': 'any'}, status_code=500), # get error
+ ]
+ helper = NetAppModule()
+ # helper.parameters['account_id'] = 'account_id'
+ rest_api = create_restapi_object(mock_args())
+ assert helper.get_tenant(rest_api, '') == ('a1', None)
+ error = "Error: unexpected response on getting tenant for cvo: 500, {'result': 'any'}"
+ assert helper.get_tenant(rest_api, '') == (None, error)
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py
new file mode 100644
index 000000000..b24778f47
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/module_utils/test_netapp_module_open.py
@@ -0,0 +1,77 @@
+# This code is part of Ansible, but is an independent component.
+# This particular file snippet, and this file snippet only, is BSD licensed.
+# Modules you write using this snippet, which is embedded dynamically by Ansible
+# still belong to the author of the module, and may assign their own license
+# to the complete work.
+#
+# Copyright (c) 2021, Laurent Nicolas <laurentn@netapp.com>
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
+# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+""" unit tests for module_utils netapp_module.py
+
+ Provides utility functions for cloudmanager REST APIs
+"""
+
+from __future__ import (absolute_import, division, print_function)
+from logging import error
+__metaclass__ = type
+
+import pytest
+import sys
+
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+from ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module import NetAppModule
+
+if sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as builtins not defined for 2.6 and 2.7')
+
+
+@patch('builtins.open')
+def test_certificates(open):
+ open.return_value = OPEN(data=b'1234')
+ helper = NetAppModule()
+ assert helper.encode_certificates('test') == ('MTIzNA==', None)
+ open.return_value = OPEN(data=b'')
+ helper = NetAppModule()
+ assert helper.encode_certificates('test') == (None, 'Error: file is empty')
+ open.return_value = OPEN(raise_exception=True)
+ helper = NetAppModule()
+ assert helper.encode_certificates('test') == (None, 'intentional error')
+
+
+class OPEN:
+ '''we could use mock_open but I'm not sure it's available in every python version '''
+ def __init__(self, data=b'abcd', raise_exception=False):
+ self.data = data
+ self.raise_exception = raise_exception
+
+ def read(self):
+ return self.data
+ # the following two methods are associated with "with" in with open ...
+
+ def __enter__(self):
+ if self.raise_exception:
+ raise OSError('intentional error')
+ return self
+
+ def __exit__(self, *args):
+ pass
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py
new file mode 100644
index 000000000..db30ada89
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aggregate.py
@@ -0,0 +1,297 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests ONTAP Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate \
+ import NetAppCloudmanagerAggregate as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 2,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_create_cloudmanager_aggregate(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 2,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_create_cloudmanager_aggregate_by_workingenv_name(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'working_environment_name': 'wkone',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 2,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_delete_cloudmanager_aggregate(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 2,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_delete_cloudmanager_aggregate_by_workingenv_name(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 2,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_update_cloudmanager_aggregate(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestCMAggregate',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'number_of_disks': 3,
+ 'disk_size_size': 100,
+ 'disk_size_unit': 'GB',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required ars are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch(
+ 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_aggregate_pass(self, get_post_api, get_aggregate_api, get_token):
+ set_module_args(self.set_args_create_cloudmanager_aggregate())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ get_aggregate_api.return_value = None
+ get_post_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_aggregate: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch(
+ 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_aggregate_pass(self, get_delete_api, get_aggregate_api, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_aggregate())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ my_aggregate = {
+ 'name': 'Dummyname',
+ 'state': 'online',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'},
+ {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}],
+ 'homeNode': 'testAWSa-01',
+ }
+ get_aggregate_api.return_value = my_aggregate
+ get_delete_api.return_value = 'Aggregated Deleted', None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_aggregate: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_update_cloudmanager_aggregate_pass(self, get_post_api, get_aggregate_api, get_token):
+ set_module_args(self.set_args_update_cloudmanager_aggregate())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ my_aggregate = {
+ 'name': 'Dummyname',
+ 'state': 'online',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'},
+ {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}],
+ 'homeNode': 'testAWSa-01',
+ }
+ get_aggregate_api.return_value = my_aggregate
+ get_post_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_update_cloudmanager_aggregate: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_aggregate_by_workingenv_name_pass(self, get_post_api, get_we, get_aggregate_api, get_token):
+ data = self.set_args_create_cloudmanager_aggregate_by_workingenv_name()
+ get_token.return_value = 'test', 'test'
+ my_we = {
+ 'name': 'test',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon'}
+ get_we.return_value = my_we, None
+ data['working_environment_id'] = my_we['publicId']
+ set_module_args(data)
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+ get_aggregate_api.return_value = None
+ get_post_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_aggregate: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aggregate.NetAppCloudmanagerAggregate.get_aggregate')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_aggregate_by_workingenv_name_pass(self, get_delete_api, get_we, get_aggregate_api, get_token):
+ data = self.set_args_delete_cloudmanager_aggregate_by_workingenv_name()
+ my_we = {
+ 'name': 'test',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon'}
+ get_we.return_value = my_we, None
+ data['working_environment_id'] = my_we['publicId']
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ my_aggregate = {
+ 'name': 'Dummyname',
+ 'state': 'online',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'disks': [{'device': 'xvdh vol-313', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12h'},
+ {'device': 'xvdi vol-314', 'position': 'data', 'vmDiskProperties': None,
+ 'ownerNode': 'testAWSa-01', 'name': 'testAWSa-01-i-12i'}],
+ 'homeNode': 'testAWSa-01',
+ }
+ get_aggregate_api.return_value = my_aggregate
+ get_delete_api.return_value = 'Aggregated Deleted', None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_aggregate: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py
new file mode 100644
index 000000000..cee1e439c
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_aws_fsx.py
@@ -0,0 +1,165 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx \
+ import NetAppCloudManagerAWSFSX as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'workspace_id': 'test',
+ 'region': 'us-west-1',
+ 'tenant_id': 'account-test',
+ 'storage_capacity_size': 1024,
+ 'throughput_capacity': 512,
+ 'storage_capacity_size_unit': 'TiB',
+ 'aws_credentials_name': 'test',
+ 'primary_subnet_id': 'test',
+ 'secondary_subnet_id': 'test',
+ 'fsx_admin_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_create_cloudmanager_aws_fsx(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'workspace_id': 'test',
+ 'region': 'us-west-1',
+ 'tenant_id': 'account-test',
+ 'storage_capacity_size': 1024,
+ 'storage_capacity_size_unit': 'TiB',
+ 'throughput_capacity': 512,
+ 'aws_credentials_name': 'test',
+ 'primary_subnet_id': 'test',
+ 'secondary_subnet_id': 'test',
+ 'fsx_admin_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_delete_cloudmanager_aws_fsx(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'tenant_id': 'account-test',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.get_aws_credentials_id')
+ def test_module_fail_when_required_args_present(self, get_aws_credentials_id, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_aws_credentials_id.return_value = '123', None
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_aws_fsx_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.wait_on_completion_for_fsx')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.check_task_status_for_fsx')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_aws_fsx.NetAppCloudManagerAWSFSX.get_aws_credentials_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_aws_fsx_pass(self, get_post_api, get_aws_credentials_id, check_task_status_for_fsx,
+ wait_on_completion_for_fsx, get_aws_fsx_details, get_token):
+ set_module_args(self.set_args_create_cloudmanager_aws_fsx())
+ get_token.return_value = 'test', 'test'
+ get_aws_credentials_id.return_value = '123', None
+ my_obj = my_module()
+
+ response = {'id': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ check_task_status_for_fsx.return_value = {'providerDetails': {'status': {'status': 'ON', 'lifecycle': 'AVAILABLE'}}}, None
+ wait_on_completion_for_fsx.return_value = None
+ get_aws_fsx_details.return_value = None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_aws_fsx_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_aws_fsx_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_aws_fsx_pass(self, get_delete_api, get_aws_fsx_details, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_aws_fsx())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_fsx = {
+ 'name': 'test',
+ 'id': 'test'}
+ get_aws_fsx_details.return_value = my_fsx, None
+ get_delete_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_aws_fsx_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py
new file mode 100644
index 000000000..023f993af
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cifs_server.py
@@ -0,0 +1,252 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server \
+ import NetAppCloudmanagerCifsServer as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'refreshToken',
+ 'domain': 'test.com',
+ 'username': 'admin',
+ 'password': 'abcde',
+ 'dns_domain': 'test.com',
+ 'ip_addresses': '["1.0.0.1"]',
+ 'netbios': 'cvoname',
+ 'organizational_unit': 'CN=Computers',
+ })
+
+ def set_default_args_with_workingenv_name_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'working_environment_name': 'weone',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'refreshToken',
+ 'domain': 'test.com',
+ 'username': 'admin',
+ 'password': 'abcde',
+ 'dns_domain': 'test.com',
+ 'ip_addresses': '["1.0.0.1"]',
+ 'netbios': 'cvoname',
+ 'organizational_unit': 'CN=Computers',
+ })
+
+ def set_using_workgroup_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'refreshToken',
+ 'is_workgroup': True,
+ 'server_name': 'abc',
+ 'workgroup_name': 'wk',
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_cifs_server_successfully(self, send_request, create, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = None
+ create.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_cifs_server_idempotency(self, send_request, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = {
+ 'domain': 'test.com',
+ 'dns_domain': 'test.com',
+ 'ip_addresses': ['1.0.0.1'],
+ 'netbios': 'cvoname',
+ 'organizational_unit': 'CN=Computers',
+ }
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert not exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_cifs_server_using_workgroup_successfully(self, send_request, create, get, get_token):
+ set_module_args(self.set_using_workgroup_args_pass_check())
+ get.return_value = None
+ create.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.delete_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_delete_cifs_server_successfully(self, send_request, delete, get, get_token):
+ args = self.set_default_args_pass_check()
+ args['state'] = 'absent'
+ set_module_args(args)
+ get.return_value = {
+ 'domain': 'test.com',
+ 'dns_domain': 'test.com',
+ 'ip_addresses': ['1.0.0.1'],
+ 'netbios': 'cvoname',
+ 'organizational_unit': 'CN=Computers',
+ }
+ delete.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.create_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_cifs_server_successfully(self, send_request, create, get, get_we, get_token):
+ args = self.set_default_args_with_workingenv_name_pass_check()
+ my_we = {
+ 'name': 'test',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon'}
+ get_we.return_value = my_we, None
+ args['working_environment_id'] = my_we['publicId']
+ set_module_args(args)
+ get.return_value = None
+ create.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.get_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cifs_server.NetAppCloudmanagerCifsServer.delete_cifs_server')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_delete_cifs_server_with_workingenv_name_successfully(self, send_request, delete, get, get_we, get_token):
+ args = self.set_default_args_with_workingenv_name_pass_check()
+ args['state'] = 'absent'
+ my_we = {
+ 'name': 'test',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon'}
+ get_we.return_value = my_we, None
+ args['working_environment_id'] = my_we['publicId']
+ set_module_args(args)
+ get.return_value = {
+ 'domain': 'test.com',
+ 'dns_domain': 'test.com',
+ 'ip_addresses': ['1.0.0.1'],
+ 'netbios': 'cvoname',
+ 'organizational_unit': 'CN=Computers',
+ }
+ delete.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, 'dummy')]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py
new file mode 100644
index 000000000..dab9cde66
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_aws.py
@@ -0,0 +1,730 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+from logging import exception
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+HAS_BOTOCORE = True
+try:
+ from botocore.exceptions import ClientError
+except ImportError:
+ HAS_BOTOCORE = False
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws \
+ import NetAppCloudManagerConnectorAWS as my_module, IMPORT_EXCEPTION, main as my_main
+
+if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION)
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'region': 'us-west-1',
+ 'key_name': 'dev_automation',
+ 'subnet_id': 'subnet-test',
+ 'ami': 'ami-test',
+ 'security_group_ids': ['sg-test'],
+ 'refresh_token': 'myrefresh_token',
+ 'iam_instance_profile_name': 'OCCM_AUTOMATION',
+ 'account_id': 'account-test',
+ 'company': 'NetApp'
+ })
+
+ def set_args_create_cloudmanager_connector_aws(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'region': 'us-west-1',
+ 'key_name': 'dev_automation',
+ 'subnet_id': 'subnet-test',
+ 'ami': 'ami-test',
+ 'security_group_ids': ['sg-test'],
+ 'refresh_token': 'myrefresh_token',
+ 'iam_instance_profile_name': 'OCCM_AUTOMATION',
+ 'account_id': 'account-test',
+ 'company': 'NetApp'
+ })
+
+ def set_args_delete_cloudmanager_connector_aws(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'instance_id': 'test',
+ 'region': 'us-west-1',
+ 'account_id': 'account-test',
+ 'refresh_token': 'myrefresh_token',
+ 'company': 'NetApp'
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.create_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.register_agent_to_service')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_ami')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_connector_aws_pass(self, get_post_api, get_ami, register_agent_to_service, get_vpc, create_instance, get_instance, get_token):
+ set_module_args(self.set_args_create_cloudmanager_connector_aws())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ get_post_api.return_value = None, None, None
+ get_ami.return_value = 'ami-test'
+ register_agent_to_service.return_value = 'test', 'test'
+ get_vpc.return_value = 'test'
+ create_instance.return_value = 'test', 'test'
+ get_instance.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_connector_aws: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm')
+ def test_delete_cloudmanager_connector_aws_pass(self, delete_occm, get_occm_agent_by_id, delete_api, get_instance, delete_instance, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_connector_aws())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_instance = {
+ 'InstanceId': 'instance_id_1'
+ }
+ get_instance.return_value = my_instance
+ get_occm_agent_by_id.return_value = {'agentId': 'test', 'state': 'active'}, None
+ delete_api.return_value = None, None, None
+ delete_instance.return_value = None
+ delete_occm.return_value = None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm')
+ def test_delete_cloudmanager_connector_aws_pass_no_ids(self, delete_occm, get_occm_agents, delete_api, get_instance, delete_instance, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_connector_aws = {
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'refresh_token': 'myrefresh_token',
+ }
+ my_instance = {
+ 'InstanceId': 'instance_id_1'
+ }
+ # get_connector_aws.return_value = my_connector_aws
+ get_instance.return_value = my_instance
+ delete_api.return_value = None, None, None
+ delete_instance.return_value = None
+ get_occm_agents.return_value = [{'agentId': 'test', 'status': 'active'}], None
+ delete_occm.return_value = None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print()
+ print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.delete_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_aws.NetAppCloudManagerConnectorAWS.get_instance')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm')
+ def test_delete_cloudmanager_connector_aws_negative_no_instance(self, delete_occm, get_occm_agents, delete_api, get_instance, delete_instance, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_connector_aws = {
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'refresh_token': 'myrefresh_token',
+ }
+ my_instance = None
+ # get_connector_aws.return_value = my_connector_aws
+ get_instance.return_value = my_instance
+ delete_api.return_value = None, None, None
+ delete_instance.return_value = None
+ get_occm_agents.return_value = [{'agentId': 'test', 'status': 'active'}], None
+ delete_occm.return_value = None, "some error on delete occm"
+
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.apply()
+ print()
+ print('Info: test_delete_cloudmanager_connector_aws: %s' % repr(exc.value))
+ msg = "Error: deleting OCCM agent(s): [(None, 'some error on delete occm')]"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_empty(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2()
+ my_obj = my_module()
+ instance = my_obj.get_instance()
+ print('instance', instance)
+ assert instance is None
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_one(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'}])
+ my_obj = my_module()
+ instance = my_obj.get_instance()
+ print('instance', instance)
+ assert instance
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_many_terminated(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ my_obj = my_module()
+ instance = my_obj.get_instance()
+ print('instance', instance)
+ assert instance is None
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_many_but_only_one_active(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ my_obj = my_module()
+ instance = my_obj.get_instance()
+ print('instance', instance)
+ assert instance
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_many_but_only_one_active(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'active', 'name': 'xxxx'}])
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.get_instance()
+ msg = "Error: found multiple instances for name"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_get_instance_exception(self, get_boto3_client, get_token):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ args.pop('instance_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2(raise_exc=True)
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.get_instance()
+ msg = "An error occurred (test_only) when calling the describe_instances operation: forced error in unit testing"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_create_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None
+ get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, None
+ my_obj = my_module()
+ instance = my_obj.create_instance()
+ print('instance', instance)
+ assert instance
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.encode_certificates')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance_no_ami_with_tags(self, get_boto3_client, register, get_token, get_occm_agent_by_id, get_account, encode_cert, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP, no account id '''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args.pop('account_id')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ args['proxy_certificates'] = ['cert1', 'cert2']
+ set_module_args(args)
+ get_account.return_value = 'account_id', None
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ encode_cert.return_value = 'base64', None
+ register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None
+ get_occm_agent_by_id.side_effect = [
+ ({'agentId': 'test', 'status': 'pending'}, None),
+ ({'agentId': 'test', 'status': 'pending'}, None),
+ ({'agentId': 'test', 'status': 'active'}, None)]
+ my_obj = my_module()
+ instance = my_obj.create_instance()
+ print('instance', instance)
+ assert instance
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance_timeout(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP'''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None
+ get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'pending'}, None
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.create_instance()
+ msg = "Error: taking too long for OCCM agent to be active or not properly setup"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance_error_in_get_agent(self, get_boto3_client, register, get_token, get_occm_agent_by_id, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP'''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None
+ get_occm_agent_by_id.return_value = 'forcing an error', 'intentional error'
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.create_instance()
+ msg = "Error: not able to get occm status: intentional error, forcing an error"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_create_instance_error_in_get_account(self, get_boto3_client, get_token, get_account, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP, no account id '''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args.pop('account_id')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ set_module_args(args)
+ get_account.return_value = 'forcing an error', 'intentional error'
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.create_instance()
+ msg = "Error: failed to get account: intentional error."
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance_error_in_register(self, get_boto3_client, register, get_token, get_account, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP, no account id '''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args.pop('account_id')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ set_module_args(args)
+ get_account.return_value = 'account_id', None
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ register.return_value = 'forcing an error', 'intentional error', 'dummy'
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.create_instance()
+ msg = "Error: unexpected response on connector setup: intentional error, forcing an error"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.encode_certificates')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_or_create_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ @patch('boto3.client')
+ def test_create_instance_error_in_open(self, get_boto3_client, register, get_token, get_account, encode_cert, dont_sleep):
+ ''' additional paths: get_ami, add tags, no public IP, no account id '''
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args.pop('ami')
+ args.pop('account_id')
+ args['aws_tag'] = [{'tag_key': 'tkey', 'tag_value': 'tvalue'}]
+ args['associate_public_ip_address'] = False
+ args['proxy_certificates'] = ['cert1', 'cert2']
+ set_module_args(args)
+ get_account.return_value = 'account_id', None
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'terminated'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ register.return_value = {'clientId': 'xxx', 'clientSecret': 'yyy'}, None, None
+ encode_cert.return_value = None, 'intentional error'
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.create_instance()
+ msg = "Error: could not open/read file 'cert1' of proxy_certificates: intentional error"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.side_effect = [
+ ({'agentId': 'test', 'status': 'active'}, None),
+ ({'agentId': 'test', 'status': 'active'}, None),
+ ({'agentId': 'test', 'status': 'terminated'}, None)]
+ my_obj = my_module()
+ error = my_obj.delete_instance()
+ assert not error
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.delete_occm_agents')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_no_client(self, get_boto3_client, get_token, get_occm_agent_by_id, get_occm_agents_by_name, delete_occm_agents, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('client_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.side_effect = [
+ ({'agentId': 'test', 'status': 'active'}, None),
+ ({'agentId': 'test', 'status': 'active'}, None),
+ ({'agentId': 'test', 'status': 'terminated'}, None)]
+ get_occm_agents_by_name.return_value = [], None
+ delete_occm_agents.return_value = None
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_main()
+ assert not get_occm_agent_by_id.called
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance_timeout(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, None
+ my_obj = my_module()
+ error = my_obj.delete_instance()
+ assert 'Error: taking too long for instance to finish terminating.' == error
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance_error_on_agent(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.return_value = {'agentId': 'test', 'status': 'active'}, 'intentional error'
+ my_obj = my_module()
+ error = my_obj.delete_instance()
+ assert 'Error: not able to get occm agent status after deleting instance: intentional error,' in error
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance_client_id_not_found_403(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.return_value = 'Action not allowed for user', '403'
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.apply()
+ msg = "Error: not able to get occm agent status after deleting instance: 403,"
+ assert msg in exc.value.args[0]['msg']
+ print(exc.value.args[0])
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance_client_id_not_found_other(self, get_boto3_client, get_token, get_occm_agent_by_id, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agent_by_id.return_value = 'Other error', '404'
+ my_obj = my_module()
+ with pytest.raises(AnsibleFailJson) as exc:
+ my_obj.apply()
+ msg = "Error: getting OCCM agents: 404,"
+ assert msg in exc.value.args[0]['msg']
+
+ @patch('time.sleep')
+ # @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agent_by_id')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_delete_instance_account_id_not_found(self, get_boto3_client, get_token, dont_sleep):
+ args = self.set_args_delete_cloudmanager_connector_aws()
+ args.pop('account_id')
+ args.pop('client_id')
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ # get_occm_agent_by_id.return_value = 'Other error', '404'
+ my_obj = my_module()
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ # msg = "Error: getting OCCM agents: 404,"
+ assert exc.value.args[0]['account_id'] is None
+ assert exc.value.args[0]['client_id'] is None
+
+ @patch('time.sleep')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_occm_agents_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('boto3.client')
+ def test_modify_instance(self, get_boto3_client, get_token, get_occm_agents_by_name, dont_sleep):
+ args = self.set_args_create_cloudmanager_connector_aws()
+ args['instance_type'] = 't3.large'
+ set_module_args(args)
+ get_token.return_value = 'test', 'test'
+ get_boto3_client.return_value = EC2([{'state': 'active'},
+ {'state': 'terminated', 'reservation': '2'},
+ {'state': 'terminated', 'name': 'xxxx'}])
+ get_occm_agents_by_name.return_value = [{'agentId': 'test', 'status': 'active'}], None
+ my_obj = my_module()
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ msg = "Note: modifying an existing connector is not supported at this time."
+ assert msg == exc.value.args[0]['modify']
+
+
+class EC2:
+ def __init__(self, get_instances=None, create_instance=True, raise_exc=False):
+ ''' list of instances as dictionaries:
+ name, state are optional, and used to build an instance
+ reservation is optional and defaults to 'default'
+ '''
+ self.get_instances = get_instances if get_instances is not None else []
+ self.create_instance = create_instance if create_instance is not None else []
+ self.raise_exc = raise_exc
+
+ def describe_instances(self, Filters=None, InstanceIds=None):
+ ''' return a list of reservations, each reservation is a list of instances
+ '''
+ if self.raise_exc and HAS_BOTOCORE:
+ raise ClientError({'Error': {'Message': 'forced error in unit testing', 'Code': 'test_only'}}, 'describe_instances')
+ print('ec2', Filters)
+ print('ec2', InstanceIds)
+ return self._build_reservations()
+
+ def describe_images(self, Filters=None, Owners=None):
+ ''' AMI '''
+ return {'Images': [{'CreationDate': 'yyyyy', 'ImageId': 'image_id'},
+ {'CreationDate': 'xxxxx', 'ImageId': 'image_id'},
+ {'CreationDate': 'zzzzz', 'ImageId': 'image_id'}]}
+
+ def describe_subnets(self, SubnetIds=None):
+ ''' subnets '''
+ return {'Subnets': [{'VpcId': 'vpc_id'}]}
+
+ def run_instances(self, **kwargs):
+ ''' create and start an instance'''
+ if self.create_instance:
+ return {'Instances': [{'InstanceId': 'instance_id'}]}
+ return {'Instances': []}
+
+ def terminate_instances(self, **kwargs):
+ ''' terminate an instance'''
+ return
+
+ def _build_reservations(self):
+ ''' return a list of reservations, each reservation is a list of instances
+ '''
+ reservations = {}
+ for instance in self.get_instances:
+ reservation = instance.get('reservation', 'default')
+ if reservation not in reservations:
+ reservations[reservation] = []
+ # provide default values for name or state if one is present
+ name, state = None, None
+ if 'name' in instance:
+ name = instance['name']
+ state = instance.get('state', 'active')
+ elif 'state' in instance:
+ name = instance.get('name', 'd_name')
+ state = instance['state']
+ instance_id = instance.get('instance_id', '12345')
+ instance_type = instance.get('instance_type', 't3.xlarge')
+ if name:
+ reservations[reservation].append({'Name': name, 'State': {'Name': state}, 'InstanceId': instance_id, 'InstanceType': instance_type})
+ return {
+ 'Reservations': [
+ {'Instances': instances} for instances in reservations.values()
+ ]
+ }
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py
new file mode 100644
index 000000000..37a93a291
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_azure.py
@@ -0,0 +1,178 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure \
+ import NetAppCloudManagerConnectorAzure as my_module, IMPORT_EXCEPTION
+
+if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION)
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection:
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+
+
+# using pytest natively, without unittest.TestCase
+@pytest.fixture
+def patch_ansible():
+ with patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json) as mocks:
+ yield mocks
+
+
+def set_default_args_pass_check():
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'location': 'westus',
+ 'resource_group': 'occm_group_westus',
+ 'subnet_id': 'Subnet1',
+ 'vnet_id': 'Vnet1',
+ 'subscription_id': 'subscriptionId-test',
+ 'refresh_token': 'myrefresh_token',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'admin_username': 'test',
+ 'admin_password': 'test',
+ 'network_security_group_name': 'test'
+ })
+
+
+def set_args_create_cloudmanager_connector_azure():
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'location': 'westus',
+ 'resource_group': 'occm_group_westus',
+ 'subnet_id': 'Subnet1',
+ 'vnet_id': 'Vnet1',
+ 'subscription_id': 'subscriptionId-test',
+ 'refresh_token': 'myrefresh_token',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'admin_username': 'test',
+ 'admin_password': 'test',
+ 'network_security_group_name': 'test'
+ })
+
+
+def set_args_delete_cloudmanager_connector_azure():
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'location': 'westus',
+ 'resource_group': 'occm_group_westus',
+ 'subnet_id': 'Subnet1',
+ 'vnet_id': 'Vnet1',
+ 'subscription_id': 'subscriptionId-test',
+ 'refresh_token': 'myrefresh_token',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'admin_username': 'test',
+ 'admin_password': 'test',
+ 'network_security_group_name': 'test'
+ })
+
+
+def test_module_fail_when_required_args_missing(patch_ansible):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+def test_module_fail_when_required_args_present(get_token, patch_ansible):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.deploy_azure')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.register_agent_to_service')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+def test_create_cloudmanager_connector_azure_pass(get_post_api, register_agent_to_service, deploy_azure, get_token, patch_ansible):
+ set_module_args(set_args_create_cloudmanager_connector_azure())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ get_post_api.return_value = None, None, None
+ register_agent_to_service.return_value = 'test', 'test'
+ deploy_azure.return_value = None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_connector_azure: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.get_deploy_azure_vm')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_azure.NetAppCloudManagerConnectorAzure.delete_azure_occm')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+def test_delete_cloudmanager_connector_azure_pass(get_delete_api, delete_azure_occm, get_deploy_azure_vm, get_token, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_azure())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ get_deploy_azure_vm.return_value = True
+ delete_azure_occm.return_value = None
+ get_delete_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_connector_azure: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py
new file mode 100644
index 000000000..9d74af2d7
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_connector_gcp.py
@@ -0,0 +1,407 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp \
+ import NetAppCloudManagerConnectorGCP as my_module
+
+IMPORT_ERRORS = []
+HAS_GCP_COLLECTION = False
+
+try:
+ from google import auth
+ from google.auth.transport import requests
+ from google.oauth2 import service_account
+ import yaml
+ HAS_GCP_COLLECTION = True
+except ImportError as exc:
+ IMPORT_ERRORS.append(str(exc))
+
+if not HAS_GCP_COLLECTION and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required google packages on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+# using pytest natively, without unittest.TestCase
+@pytest.fixture(name='patch_ansible')
+def fixture_patch_ansible():
+ with patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json) as mocks:
+ yield mocks
+
+
+def set_default_args_pass_check():
+ return dict({
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'my_refresh_token',
+ 'state': 'present',
+ 'name': 'CxName',
+ 'project_id': 'tlv-support',
+ 'zone': 'us-west-1',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com',
+ })
+
+
+def set_args_create_cloudmanager_connector_gcp():
+ return dict({
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'my_refresh_token',
+ 'state': 'present',
+ 'name': 'CxName',
+ 'project_id': 'tlv-support',
+ 'zone': 'us-west-1',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com',
+ 'service_account_path': 'test.json',
+ })
+
+
+def set_args_delete_cloudmanager_connector_gcp():
+ return dict({
+ 'client_id': 'test',
+ 'refresh_token': 'my_refresh_token',
+ 'state': 'absent',
+ 'name': 'CxName',
+ 'project_id': 'tlv-support',
+ 'zone': 'us-west-1',
+ 'account_id': 'account-test',
+ 'company': 'NetApp',
+ 'service_account_email': 'terraform-user@tlv-support.iam.gserviceaccount.com',
+ 'service_account_path': 'test.json',
+ })
+
+
+def test_module_fail_when_required_args_missing(patch_ansible):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+def test_module_fail_when_required_args_present(get_token, get_gcp_token, patch_ansible):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(set_default_args_pass_check())
+ get_token.return_value = 'bearer', 'test'
+ get_gcp_token.return_value = 'token', None
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.deploy_gcp_vm')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_custom_data_for_gcp')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.create_occm_gcp')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_deploy_vm')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+def test_create_cloudmanager_connector_gcp_pass(get_post_api, get_vm, create_occm_gcp, get_custom_data_for_gcp,
+ deploy_gcp_vm, get_gcp_token, get_token, patch_ansible):
+ set_module_args(set_args_create_cloudmanager_connector_gcp())
+ get_token.return_value = 'bearer', 'test'
+ get_gcp_token.return_value = 'test', None
+ my_obj = my_module()
+
+ get_vm.return_value = None
+ deploy_gcp_vm.return_value = None, 'test', None
+ get_custom_data_for_gcp.return_value = 'test', 'test', None
+ create_occm_gcp.return_value = 'test'
+ get_post_api.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_connector_gcp: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed'], create_occm_gcp.return_value[1]
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.delete_occm_gcp')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_deploy_vm')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_occm_agents')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+def test_delete_cloudmanager_connector_gcp_pass(get_delete_api, get_agents, get_deploy_vm, delete_occm_gcp, get_gcp_token, get_token, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_gcp())
+ get_token.return_value = 'bearer', 'test'
+ get_gcp_token.return_value = 'test', None
+ my_obj = my_module()
+
+ my_connector_gcp = {
+ 'name': 'Dummyname-vm-boot-deployment',
+ 'client_id': 'test',
+ 'refresh_token': 'my_refresh_token',
+ 'operation': {'status': 'active'}
+ }
+ get_deploy_vm.return_value = my_connector_gcp
+ get_agents.return_value = []
+ get_delete_api.return_value = None, None, None
+ delete_occm_gcp.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_connector_gcp: %s' % repr(exc.value))
+
+ assert exc.value.args[0]['changed']
+
+
+TOKEN_DICT = {
+ 'access_token': 'access_token',
+ 'token_type': 'token_type'
+}
+
+
+AGENT_DICTS = {
+ 'active': {
+ 'agent': {'status': 'active'},
+ },
+ 'pending': {
+ 'agent': {'status': 'pending'},
+ },
+ 'other': {
+ 'agent': {'status': 'pending', 'agentId': 'agent11', 'name': 'CxName', 'provider': 'GCP'},
+ }
+}
+
+
+CLIENT_DICT = {
+ 'clientId': '12345',
+ 'clientSecret': 'a1b2c3'
+}
+
+SRR = {
+ # common responses (json_dict, error, ocr_id)
+ 'empty_good': ({}, None, None),
+ 'zero_record': ({'records': []}, None, None),
+ 'get_token': (TOKEN_DICT, None, None),
+ 'get_gcp_token': (TOKEN_DICT, None, None),
+ 'get_agent_status_active': (AGENT_DICTS['active'], None, None),
+ 'get_agent_status_pending': (AGENT_DICTS['pending'], None, None),
+ 'get_agent_status_other': (AGENT_DICTS['other'], None, None),
+ 'get_agents': ({'agents': [AGENT_DICTS['other']['agent']]}, None, None),
+ 'get_agents_empty': ({'agents': []}, None, None),
+ 'get_agent_not_found': (b"{'message': 'Action not allowed for user'}", '403', None),
+ 'get_vm': ({'operation': {'status': 'active'}}, None, None),
+ 'get_vm_not_found': (b"{'message': 'is not found'}", '404', None),
+ 'register_agent': (CLIENT_DICT, None, None),
+ 'end_of_sequence': (None, "Unexpected call to send_request", None),
+ 'generic_error': (None, "Expected error", None),
+}
+
+
+@patch('time.sleep')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_delete_occm_gcp_pass(mock_request, get_gcp_token, ignore_sleep, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['empty_good'], # delete
+ SRR['get_agent_status_active'], # status
+ SRR['get_agent_status_pending'], # status
+ SRR['get_agent_status_other'], # status
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ error = my_obj.delete_occm_gcp()
+ print(error)
+ print(mock_request.mock_calls)
+ assert error is None
+
+
+@patch('time.sleep')
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_create_occm_gcp_pass(mock_request, get_gcp_token, ignore_sleep, patch_ansible):
+ set_module_args(set_args_create_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['register_agent'], # register
+ SRR['empty_good'], # deploy
+ SRR['get_agent_status_pending'], # status
+ SRR['get_agent_status_active'], # status
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ client_id = my_obj.create_occm_gcp()
+ print(client_id)
+ print(mock_request.mock_calls)
+ assert client_id == '12345'
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_get_deploy_vm_pass(mock_request, get_gcp_token, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['get_vm'], # get
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ vm = my_obj.get_deploy_vm()
+ print(vm)
+ print(mock_request.mock_calls)
+ assert vm == SRR['get_vm'][0]
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_get_occm_agents_absent_pass(mock_request, get_gcp_token, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['get_agent_status_active'], # get
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ agents = my_obj.get_occm_agents()
+ print(agents)
+ print(mock_request.mock_calls)
+ assert agents == [SRR['get_agent_status_active'][0]['agent']]
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_get_occm_agents_present_pass(mock_request, get_gcp_token, patch_ansible):
+ set_module_args(set_args_create_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['get_agents'], # get
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ agents = my_obj.get_occm_agents()
+ print(agents)
+ print(mock_request.mock_calls)
+ assert agents == SRR['get_agents'][0]['agents']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_create_idempotent(mock_request, get_gcp_token, patch_ansible):
+ set_module_args(set_args_create_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['get_vm'], # get
+ SRR['get_agents'], # get
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print(mock_request.mock_calls)
+ print(exc)
+ assert not exc.value.args[0]['changed']
+ assert exc.value.args[0]['client_id'] == SRR['get_agents'][0]['agents'][0]['agentId']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+def test_delete_idempotent(mock_request, get_gcp_token, patch_ansible):
+ set_module_args(set_args_delete_cloudmanager_connector_gcp())
+ get_gcp_token.return_value = 'test', None
+ mock_request.side_effect = [
+ SRR['get_token'], # OAUTH
+ SRR['get_vm_not_found'], # get vn
+ SRR['get_agent_not_found'], # get agents
+ SRR['end_of_sequence'],
+ ]
+ my_obj = my_module()
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print(mock_request.mock_calls)
+ print(exc)
+ assert not exc.value.args[0]['changed']
+ assert exc.value.args[0]['client_id'] == ""
+
+
+# @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_connector_gcp.NetAppCloudManagerConnectorGCP.get_gcp_token')
+# @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+# def test_delete_idempotent(mock_request, get_gcp_token, patch_ansible):
+# set_module_args(set_args_delete_cloudmanager_connector_gcp())
+# get_gcp_token.return_value = 'test', None
+# mock_request.side_effect = [
+# SRR['get_token'], # OAUTH
+# SRR['get_vm_not_found'], # get vn
+# SRR['get_agents'], # get
+# SRR['end_of_sequence'],
+# ]
+# my_obj = my_module()
+
+# with pytest.raises(AnsibleExitJson) as exc:
+# my_obj.apply()
+# print(mock_request.mock_calls)
+# print(exc)
+# assert not exc.value.args[0]['changed']
+# assert exc.value.args[0]['client_id'] == SRR['get_agents'][0][0]['agentId']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py
new file mode 100644
index 000000000..e3dc685d4
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_aws.py
@@ -0,0 +1,426 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws \
+ import NetAppCloudManagerCVOAWS as my_module, IMPORT_EXCEPTION
+
+if IMPORT_EXCEPTION is not None and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7: %s' % IMPORT_EXCEPTION)
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'client_id': 'test',
+ 'region': 'us-west-1',
+ 'use_latest_version': False,
+ 'ontap_version': 'ONTAP-9.10.0.T1',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'instance_type': 'm5.xlarge',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_create_cloudmanager_cvo_aws(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'region': 'us-west-1',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_delete_cloudmanager_cvo_aws(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'region': 'us-west-1',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_create_bynode_cloudmanager_cvo_aws(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'region': 'us-west-1',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'license_type': 'cot-premium-byol',
+ 'platform_serial_number': '12345678',
+ 'is_ha': False
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ set_module_args(self.set_args_create_cloudmanager_cvo_aws())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_aws()
+ data['is_ha'] = True
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ data.pop('subnet_id')
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_ha_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_aws()
+ data['license_type'] = 'capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_ha_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_aws()
+ data['is_ha'] = True
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ data.pop('subnet_id')
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_ha_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ data = self.set_args_create_bynode_cloudmanager_cvo_aws()
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_aws.NetAppCloudManagerCVOAWS.get_vpc')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_aws_ha_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, get_vpc, wait_on_completion, get_token):
+ data = self.set_args_create_bynode_cloudmanager_cvo_aws()
+ data['license_type'] = 'ha-cot-premium-byol'
+ data['platform_serial_number_node1'] = '12345678'
+ data['platform_serial_number_node2'] = '23456789'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_working_environment_details_by_name.return_value = None, None
+ get_post_api.return_value = response, None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ get_vpc.return_value = 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_aws_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_cvo_aws_pass(self, get_delete_api, get_working_environment_details_by_name,
+ wait_on_completion, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_cvo_aws())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_cvo = {
+ 'name': 'test',
+ 'publicId': 'test'}
+ get_working_environment_details_by_name.return_value = my_cvo, None
+ get_delete_api.return_value = None, None, None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_cvo_aws_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ def test_change_cloudmanager_cvo_aws(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password, update_cvo_tags,
+ update_tier_level, update_instance_license_type, update_writing_speed_state, get_token):
+ data = self.set_default_args_pass_check()
+ data['svm_password'] = 'newpassword'
+ data['update_svm_password'] = True
+ data['ontap_version'] = 'ONTAP-9.10.1P3.T1'
+ data['upgrade_ontap_version'] = True
+ set_module_args(data)
+
+ modify = ['svm_password', 'aws_tag', 'tier_level', 'ontap_version', 'instance_type', 'license_type', 'writing_speed_state']
+
+ my_cvo = {
+ 'name': 'TestA',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon',
+ 'svm_password': 'password',
+ 'isHa': False,
+ 'svmName': 'svm_TestA',
+ 'tenantId': 'Tenant-test',
+ 'workingEnvironmentType': 'VSA',
+ }
+ get_cvo.return_value = my_cvo, None
+ cvo_property = {'name': 'TestA',
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'ontapClusterProperties': {
+ 'capacityTierInfo': {'tierLevel': 'normal'},
+ 'licenseType': {'capacityLimit': {'size': 2.0, 'unit': 'TB'},
+ 'name': 'Cloud Volumes ONTAP Capacity Based Charging'},
+ 'ontapVersion': '9.10.0',
+ 'upgradeVersions': [{'autoUpdateAllowed': False,
+ 'imageVersion': 'ONTAP-9.10.1P3',
+ 'lastModified': 1634467078000}],
+ 'writingSpeedState': 'NORMAL'},
+ 'awsProperties': {'accountId': u'123456789011',
+ 'availabilityZones': [u'us-east-1b'],
+ 'bootDiskSize': None,
+ 'cloudProviderAccountId': None,
+ 'coreDiskExists': True,
+ 'instances': [{'availabilityZone': 'us-east-1b',
+ 'id': 'i-31',
+ 'imageId': 'ami-01a6f1234cb1ec375',
+ 'instanceProfileId': 'SimFabricPoolInstanceProfileId',
+ 'instanceType': 'm5.2xlarge',
+ 'isOCCMInstance': False,
+ 'isVsaInstance': True,
+ }],
+ 'regionName': 'us-west-1',
+ }
+ }
+ get_property.return_value = cvo_property, None
+ cvo_details = {'cloudProviderName': 'Amazon',
+ 'isHA': False,
+ 'name': 'TestA',
+ 'ontapClusterProperties': None,
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'userTags': {'key1': 'value1'},
+ 'workingEnvironmentType': 'VSA'}
+ get_details.return_value = cvo_details, None
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ for item in modify:
+ if item == 'svm_password':
+ update_svm_password.return_value = True, None
+ elif item == 'aws_tag':
+ update_cvo_tags.return_value = True, None
+ elif item == 'tier_level':
+ update_tier_level.return_value = True, None
+ elif item == 'ontap_version':
+ upgrade_ontap_image.return_value = True, None
+ elif item == 'writing_speed_state':
+ update_writing_speed_state.return_value = True, None
+ elif item == 'instance_type' or item == 'license_type':
+ update_instance_license_type.return_value = True, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_change_cloudmanager_cvo_aws: %s' % repr(exc.value))
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py
new file mode 100644
index 000000000..f3e072bdb
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_azure.py
@@ -0,0 +1,439 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_azure \
+ import NetAppCloudManagerCVOAZURE as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ # self.token_type, self.token = self.get_token()
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'client_id': 'test',
+ 'location': 'westus',
+ 'use_latest_version': False,
+ 'ontap_version': 'ONTAP-9.10.0.T1.azure',
+ 'vnet_id': 'vpc-test',
+ 'resource_group': 'test',
+ 'subnet_id': 'subnet-test',
+ 'subscription_id': 'test',
+ 'cidr': '10.0.0.0/24',
+ 'svm_password': 'password',
+ 'license_type': 'azure-cot-standard-paygo',
+ 'instance_type': 'Standard_DS4_v2',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_create_cloudmanager_cvo_azure(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'location': 'westus',
+ 'vnet_id': 'vpc-test',
+ 'resource_group': 'test',
+ 'subscription_id': 'test',
+ 'cidr': '10.0.0.0/24',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_delete_cloudmanager_cvo_azure(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'location': 'westus',
+ 'vnet_id': 'vpc-test',
+ 'resource_group': 'test',
+ 'subscription_id': 'test',
+ 'cidr': '10.0.0.0/24',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False
+ })
+
+ def set_args_create_bynode_cloudmanager_cvo_azure(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'location': 'westus',
+ 'vnet_id': 'vpc-test',
+ 'resource_group': 'test',
+ 'subscription_id': 'test',
+ 'cidr': '10.0.0.0/24',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'license_type': 'azure-cot-premium-byol',
+ 'serial_number': '12345678',
+ 'is_ha': False
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ self.rest_api = MockCMConnection()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ set_module_args(self.set_args_create_cloudmanager_cvo_azure())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_azure()
+ data['license_type'] = 'capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_capacity_license_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_ha_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_azure()
+ data['is_ha'] = True
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Professional'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_ha_capacity_license_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_bynode_cloudmanager_cvo_azure()
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_nodebase_license_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_ha_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_bynode_cloudmanager_cvo_azure()
+ data['is_ha'] = True
+ data['license_type'] = 'azure-ha-cot-premium-byol'
+ data['platform_serial_number_node1'] = '12345678'
+ data['platform_serial_number_node2'] = '23456789'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_azure_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_azure()
+ data['is_ha'] = True
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_azure_ha_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_cvo_azure_pass(self, get_delete_api, get_working_environment_details_by_name,
+ wait_on_completion, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_cvo_azure())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_cvo = {
+ 'name': 'Dummyname',
+ 'publicId': 'test'}
+ get_working_environment_details_by_name.return_value = my_cvo, None
+ get_delete_api.return_value = None, None, None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_cvo_azure_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ def test_change_cloudmanager_cvo_azure(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password, update_cvo_tags,
+ update_tier_level, update_instance_license_type, update_writing_speed_state, get_token):
+ data = self.set_default_args_pass_check()
+ data['svm_password'] = 'newpassword'
+ data['update_svm_password'] = True
+ data['ontap_version'] = 'ONTAP-9.10.1P3.T1.azure'
+ data['upgrade_ontap_version'] = True
+ data['instance_type'] = 'Standard_DS13_v2'
+ set_module_args(data)
+
+ modify = ['svm_password', 'azure_tag', 'tier_level', 'ontap_version', 'instance_type', 'license_type']
+
+ my_cvo = {
+ 'name': 'TestA',
+ 'publicId': 'test',
+ 'svm_password': 'password',
+ 'isHA': False,
+ 'azure_tag': [{'tag_key': 'keya', 'tag_value': 'valuea'}, {'tag_key': 'keyb', 'tag_value': 'valueb'}],
+ }
+ get_cvo.return_value = my_cvo, None
+
+ cvo_property = {'name': 'TestA',
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'ontapClusterProperties': {
+ 'capacityTierInfo': {'tierLevel': 'normal'},
+ 'licensePackageName': 'Professional',
+ 'licenseType': {'capacityLimit': {'size': 2000.0, 'unit': 'TB'},
+ 'name': 'Cloud Volumes ONTAP Capacity Based Charging'},
+ 'ontapVersion': '9.10.0.T1.azure',
+ 'upgradeVersions': [{'autoUpdateAllowed': False,
+ 'imageVersion': 'ONTAP-9.10.1P3',
+ 'lastModified': 1634467078000}],
+ 'writingSpeedState': 'NORMAL'},
+ 'providerProperties': {
+ 'cloudProviderAccountId': 'CloudProviderAccount-abcdwxyz',
+ 'regionName': 'westus',
+ 'instanceType': 'Standard_DS4_v2',
+ 'resourceGroup': {
+ 'name': 'TestA-rg',
+ 'location': 'westus',
+ 'tags': {
+ 'DeployedByOccm': 'true'
+ }
+ },
+ 'vnetCidr': '10.0.0.0/24',
+ 'tags': {
+ 'DeployedByOccm': 'true'
+ }},
+ 'tenantId': 'Tenant-abCdEfg1',
+ 'workingEnvironmentTyp': 'VSA'
+ }
+ get_property.return_value = cvo_property, None
+ cvo_details = {'cloudProviderName': 'Azure',
+ 'isHA': False,
+ 'name': 'TestA',
+ 'ontapClusterProperties': None,
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'userTags': {'DeployedByOccm': 'true', 'key1': 'value1'},
+ 'workingEnvironmentType': 'VSA'}
+ get_details.return_value = cvo_details, None
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ for item in modify:
+ if item == 'svm_password':
+ update_svm_password.return_value = True, None
+ elif item == 'azure_tag':
+ update_cvo_tags.return_value = True, None
+ elif item == 'tier_level':
+ update_tier_level.return_value = True, None
+ elif item == 'ontap_version':
+ upgrade_ontap_image.return_value = True, None
+ elif item == 'writing_speed_state':
+ update_writing_speed_state.return_value = True, None
+ elif item == 'instance_type':
+ update_instance_license_type.return_value = True, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_change_cloudmanager_cvo_azure: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py
new file mode 100644
index 000000000..1209d2b9e
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_cvo_gcp.py
@@ -0,0 +1,543 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_cvo_gcp \
+ import NetAppCloudManagerCVOGCP as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ # self.token_type, self.token = self.get_token()
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'TestA',
+ 'client_id': 'test',
+ 'zone': 'us-west-1b',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False,
+ 'gcp_service_account': 'test_account',
+ 'data_encryption_type': 'GCP',
+ 'gcp_volume_type': 'pd-ssd',
+ 'gcp_volume_size': 500,
+ 'gcp_volume_size_unit': 'GB',
+ 'project_id': 'default-project',
+ 'tier_level': 'standard'
+ })
+
+ def set_args_create_cloudmanager_cvo_gcp(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'zone': 'us-west1-b',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'use_latest_version': False,
+ 'capacity_tier': 'cloudStorage',
+ 'ontap_version': 'ONTAP-9.10.0.T1.gcp',
+ 'is_ha': False,
+ 'gcp_service_account': 'test_account',
+ 'data_encryption_type': 'GCP',
+ 'gcp_volume_type': 'pd-ssd',
+ 'gcp_volume_size': 500,
+ 'gcp_volume_size_unit': 'GB',
+ 'gcp_labels': [{'label_key': 'key1', 'label_value': 'value1'}, {'label_key': 'keya', 'label_value': 'valuea'}],
+ 'project_id': 'default-project'
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ self.rest_api = MockCMConnection()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ def set_args_delete_cloudmanager_cvo_gcp(self):
+ return dict({
+ 'state': 'absent',
+ 'name': 'Dummyname',
+ 'client_id': 'test',
+ 'zone': 'us-west-1',
+ 'vpc_id': 'vpc-test',
+ 'subnet_id': 'subnet-test',
+ 'svm_password': 'password',
+ 'refresh_token': 'myrefresh_token',
+ 'is_ha': False,
+ 'gcp_service_account': 'test_account',
+ 'data_encryption_type': 'GCP',
+ 'gcp_volume_type': 'pd-ssd',
+ 'gcp_volume_size': 500,
+ 'gcp_volume_size_unit': 'GB',
+ 'project_id': 'project-test'
+ })
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ set_module_args(self.set_args_create_cloudmanager_cvo_gcp())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_ha_pass(self, get_post_api, get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['is_ha'] = True
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ data['subnet0_node_and_data_connectivity'] = 'default'
+ data['subnet1_cluster_connectivity'] = 'subnet2'
+ data['subnet2_ha_connectivity'] = 'subnet3'
+ data['subnet3_data_replication'] = 'subnet1'
+ data['vpc0_node_and_data_connectivity'] = 'default'
+ data['vpc1_cluster_connectivity'] = 'vpc2'
+ data['vpc2_ha_connectivity'] = 'vpc3'
+ data['vpc3_data_replication'] = 'vpc1'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_ha_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['license_type'] = 'capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ set_module_args(data)
+
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_ha_capacity_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['license_type'] = 'ha-capacity-paygo'
+ data['capacity_package_name'] = 'Essential'
+ data['is_ha'] = True
+ data['subnet0_node_and_data_connectivity'] = 'default'
+ data['subnet1_cluster_connectivity'] = 'subnet2'
+ data['subnet2_ha_connectivity'] = 'subnet3'
+ data['subnet3_data_replication'] = 'subnet1'
+ data['vpc0_node_and_data_connectivity'] = 'default'
+ data['vpc1_cluster_connectivity'] = 'vpc2'
+ data['vpc2_ha_connectivity'] = 'vpc3'
+ data['vpc3_data_replication'] = 'vpc1'
+ set_module_args(data)
+
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['license_type'] = 'gcp-cot-premium-byol'
+ data['platform_serial_number'] = '12345678'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_tenant')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_nss')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.post')
+ def test_create_cloudmanager_cvo_gcp_ha_nodebase_license_pass(self, get_post_api,
+ get_working_environment_details_by_name, get_nss,
+ get_tenant, wait_on_completion, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['is_ha'] = True
+ data['subnet0_node_and_data_connectivity'] = 'default'
+ data['subnet1_cluster_connectivity'] = 'subnet2'
+ data['subnet2_ha_connectivity'] = 'subnet3'
+ data['subnet3_data_replication'] = 'subnet1'
+ data['vpc0_node_and_data_connectivity'] = 'default'
+ data['vpc1_cluster_connectivity'] = 'vpc2'
+ data['vpc2_ha_connectivity'] = 'vpc3'
+ data['vpc3_data_replication'] = 'vpc1'
+ data['platform_serial_number_node1'] = '12345678'
+ data['platform_serial_number_node2'] = '23456789'
+ data['license_type'] = 'gcp-ha-cot-premium-byol'
+ set_module_args(data)
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'publicId': 'abcdefg12345'}
+ get_post_api.return_value = response, None, None
+ get_working_environment_details_by_name.return_value = None, None
+ get_nss.return_value = 'nss-test', None
+ get_tenant.return_value = 'test', None
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.delete')
+ def test_delete_cloudmanager_cvo_gcp_pass(self, get_delete_api, get_working_environment_details_by_name,
+ wait_on_completion, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_cvo_gcp())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+ my_cvo = {
+ 'name': 'Dummyname',
+ 'publicId': 'test'}
+ get_working_environment_details_by_name.return_value = my_cvo, None
+
+ get_delete_api.return_value = None, None, 'test'
+ wait_on_completion.return_value = None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_cvo_gcp_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ def test_change_cloudmanager_cvo_gcp(self, get_cvo, get_property, get_details, update_svm_password, update_cvo_tags,
+ update_tier_level, update_instance_license_type, get_token):
+ set_module_args(self.set_args_create_cloudmanager_cvo_gcp())
+
+ modify = ['svm_password', 'gcp_labels', 'tier_level', 'instance_type']
+
+ my_cvo = {
+ 'name': 'TestA',
+ 'publicId': 'test',
+ 'cloudProviderName': 'GCP',
+ 'isHA': False,
+ 'svmName': 'svm_TestA',
+ 'svm_password': 'password',
+ 'tenantId': 'Tenant-test',
+ }
+ get_cvo.return_value = my_cvo, None
+ cvo_property = {'name': 'Dummyname',
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'ontapClusterProperties': {
+ 'capacityTierInfo': {'tierLevel': 'standard'},
+ 'licenseType': {'capacityLimit': {'size': 10.0, 'unit': 'TB'},
+ 'name': 'Cloud Volumes ONTAP Standard'},
+ 'ontapVersion': '9.10.0.T1',
+ 'writingSpeedState': 'NORMAL'},
+ 'providerProperties': {
+ 'regionName': 'us-west1',
+ 'zoneName': ['us-west1-b'],
+ 'instanceType': 'n1-standard-8',
+ 'labels': {'cloud-ontap-dm': 'anscvogcp-deployment',
+ 'cloud-ontap-version': '9_10_0_t1',
+ 'key1': 'value1',
+ 'platform-serial-number': '90920130000000001020',
+ 'working-environment-id': 'vsaworkingenvironment-cxxt6zwj'},
+ 'subnetCidr': '10.150.0.0/20',
+ 'projectName': 'default-project'},
+ 'svmName': 'svm_Dummyname',
+ 'tenantId': 'Tenant-test',
+ 'workingEnvironmentTyp': 'VSA'
+ }
+ get_property.return_value = cvo_property, None
+ cvo_details = {'cloudProviderName': 'GCP',
+ 'isHA': False,
+ 'name': 'Dummyname',
+ 'ontapClusterProperties': None,
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'userTags': {'key1': 'value1'},
+ 'workingEnvironmentType': 'VSA'}
+ get_details.return_value = cvo_details, None
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ for item in modify:
+ if item == 'svm_password':
+ update_svm_password.return_value = True, None
+ elif item == 'gcp_labels':
+ update_cvo_tags.return_value = True, None
+ elif item == 'tier_level':
+ update_tier_level.return_value = True, None
+ elif item == 'instance_type':
+ update_instance_license_type.return_value = True, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_change_cloudmanager_cvo_gcp: %s' % repr(exc.value))
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_writing_speed_state')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_instance_license_type')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_tier_level')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_cvo_tags')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.update_svm_password')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.upgrade_ontap_image')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_property')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ def test_change_cloudmanager_cvo_gcp_ha(self, get_cvo, get_property, get_details, upgrade_ontap_image, update_svm_password,
+ update_cvo_tags, update_tier_level, update_instance_license_type, update_writing_speed_state, get_token):
+ data = self.set_args_create_cloudmanager_cvo_gcp()
+ data['is_ha'] = True
+ data['svm_password'] = 'newpassword'
+ data['update_svm_password'] = True
+ data['ontap_version'] = 'ONTAP-9.10.1P3.T1.gcpha'
+ data['upgrade_ontap_version'] = True
+ data['subnet0_node_and_data_connectivity'] = 'default'
+ data['subnet1_cluster_connectivity'] = 'subnet2'
+ data['subnet2_ha_connectivity'] = 'subnet3'
+ data['subnet3_data_replication'] = 'subnet1'
+ data['vpc0_node_and_data_connectivity'] = 'default'
+ data['vpc1_cluster_connectivity'] = 'vpc2'
+ data['vpc2_ha_connectivity'] = 'vpc3'
+ data['vpc3_data_replication'] = 'vpc1'
+ data['platform_serial_number_node1'] = '12345678'
+ data['platform_serial_number_node2'] = '23456789'
+ data['license_type'] = 'gcp-ha-cot-premium-byol'
+ data['instance_type'] = 'n1-standard-8'
+ set_module_args(data)
+
+ modify = ['svm_password', 'gcp_labels', 'tier_level', 'ontap_version', 'instance_type', 'license_type']
+
+ my_cvo = {
+ 'name': 'TestA',
+ 'publicId': 'test',
+ 'cloudProviderName': 'GCP',
+ 'isHA': True,
+ 'svmName': 'svm_TestA',
+ 'svm_password': 'password',
+ 'tenantId': 'Tenant-test',
+ }
+ get_cvo.return_value = my_cvo, None
+ cvo_property = {'name': 'Dummyname',
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'ontapClusterProperties': {
+ 'capacityTierInfo': {'tierLevel': 'standard'},
+ 'licenseType': {'capacityLimit': {'size': 10.0, 'unit': 'TB'},
+ 'name': 'Cloud Volumes ONTAP Standard'},
+ 'ontapVersion': '9.10.0.T1',
+ 'upgradeVersions': [{'autoUpdateAllowed': False,
+ 'imageVersion': 'ONTAP-9.10.1P3',
+ 'lastModified': 1634467078000}],
+ 'writingSpeedState': 'NORMAL'},
+ 'providerProperties': {
+ 'regionName': 'us-west1',
+ 'zoneName': ['us-west1-b'],
+ 'instanceType': 'n1-standard-8',
+ 'labels': {'cloud-ontap-dm': 'anscvogcp-deployment',
+ 'cloud-ontap-version': '9_10_0_t1',
+ 'key1': 'value1',
+ 'platform-serial-number': '90920130000000001020',
+ 'working-environment-id': 'vsaworkingenvironment-cxxt6zwj'},
+ 'subnetCidr': '10.150.0.0/20',
+ 'projectName': 'default-project'},
+ 'svmName': 'svm_Dummyname',
+ 'tenantId': 'Tenant-test',
+ 'workingEnvironmentTyp': 'VSA'
+ }
+ get_property.return_value = cvo_property, None
+ cvo_details = {'cloudProviderName': 'GCP',
+ 'isHA': True,
+ 'name': 'Dummyname',
+ 'ontapClusterProperties': None,
+ 'publicId': 'test',
+ 'status': {'status': 'ON'},
+ 'userTags': {'key1': 'value1', 'partner-platform-serial-number': '90920140000000001019',
+ 'gcp_resource_id': '14004944518802780827', 'count-down': '3'},
+ 'workingEnvironmentType': 'VSA'}
+ get_details.return_value = cvo_details, None
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ for item in modify:
+ if item == 'svm_password':
+ update_svm_password.return_value = True, None
+ elif item == 'gcp_labels':
+ update_cvo_tags.return_value = True, None
+ elif item == 'tier_level':
+ update_tier_level.return_value = True, None
+ elif item == 'ontap_version':
+ upgrade_ontap_image.return_value = True, None
+ elif item == 'writing_speed_state':
+ update_writing_speed_state.return_value = True, None
+ elif item == 'instance_type':
+ update_instance_license_type.return_value = True, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_change_cloudmanager_cvo_gcp: %s' % repr(exc.value))
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py
new file mode 100644
index 000000000..9b417ed1b
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_info.py
@@ -0,0 +1,591 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests ONTAP Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_info \
+ import NetAppCloudmanagerInfo as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+# using pytest natively, without unittest.TestCase
+@pytest.fixture
+def patch_ansible():
+ with patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json) as mocks:
+ yield mocks
+
+
+def set_default_args_pass_check(patch_ansible):
+ return dict({
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+
+def set_args_get_cloudmanager_working_environments_info():
+ args = {
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'gather_subsets': ['working_environments_info']
+ }
+ return args
+
+
+def set_args_get_cloudmanager_aggregates_info():
+ args = {
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'gather_subsets': ['working_environments_info']
+ }
+ return args
+
+
+def set_args_get_accounts_info():
+ args = {
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'myrefresh_token',
+ 'gather_subsets': ['accounts_info']
+ }
+ return args
+
+
+def test_module_fail_when_required_args_missing(patch_ansible):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environments_info')
+def test_get_working_environments_info(working_environments_info, get_token, patch_ansible):
+ args = dict(set_args_get_cloudmanager_working_environments_info())
+ set_module_args(args)
+ get_token.return_value = 'token_type', 'token'
+ working_environments_info.return_value = {
+ "azureVsaWorkingEnvironments": [
+ {
+ "name": "testazure",
+ "cloudProviderName": "Azure",
+ "creatorUserEmail": "samlp|NetAppSAML|testuser",
+ "isHA": False,
+ "publicId": "VsaWorkingEnvironment-az123456",
+ "tenantId": "Tenant-2345",
+ "workingEnvironmentType": "VSA",
+ }
+ ],
+ "gcpVsaWorkingEnvironments": [],
+ "onPremWorkingEnvironments": [],
+ "vsaWorkingEnvironments": [
+ {
+ "name": "testAWS",
+ "cloudProviderName": "Amazon",
+ "creatorUserEmail": "samlp|NetAppSAML|testuser",
+ "isHA": False,
+ "publicId": "VsaWorkingEnvironment-aws12345",
+ "tenantId": "Tenant-2345",
+ "workingEnvironmentType": "VSA",
+ },
+ {
+ "name": "testAWSHA",
+ "cloudProviderName": "Amazon",
+ "creatorUserEmail": "samlp|NetAppSAML|testuser",
+ "isHA": True,
+ "publicId": "VsaWorkingEnvironment-awsha345",
+ "tenantId": "Tenant-2345",
+ "workingEnvironmentType": "VSA",
+ }
+ ]
+ }, None
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_info: %s' % repr(exc.value))
+ assert not exc.value.args[0]['changed']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch(
+ 'ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_info.NetAppCloudmanagerInfo.get_aggregates_info')
+def test_get_aggregates_info(aggregates_info, get_token, patch_ansible):
+ args = dict(set_args_get_cloudmanager_aggregates_info())
+ set_module_args(args)
+ get_token.return_value = 'token_type', 'token'
+ aggregates_info.return_value = {
+ "azureVsaWorkingEnvironments": {
+ "VsaWorkingEnvironment-az123456": [
+ {
+ "availableCapacity": {
+ "size": 430.0,
+ "unit": "GB"
+ },
+ "disks": [
+ {
+ "device": "LUN 3.1",
+ "name": "testazure-01-1",
+ "ownerNode": "testazure-01",
+ "position": "data",
+ }
+ ],
+ "encryptionType": "notEncrypted",
+ "homeNode": "testazure-01",
+ "isRoot": False,
+ "name": "aggr1",
+ "ownerNode": "testazure-01",
+ "providerVolumes": [
+ {
+ "device": "1",
+ "diskType": "Premium_LRS",
+ "encrypted": False,
+ "instanceId": "testazureid",
+ "name": "testazuredatadisk1",
+ "size": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "state": "available"
+ }
+ ],
+ "sidlEnabled": False,
+ "snaplockType": "non_snaplock",
+ "state": "online",
+ "totalCapacity": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "usedCapacity": {
+ "size": 70.0,
+ "unit": "GB"
+ },
+ "volumes": [
+ {
+ "isClone": False,
+ "name": "svm_testazure_root",
+ "rootVolume": True,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 1.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.000339508056640625,
+ "unit": "GB"
+ }
+ },
+ {
+ "isClone": False,
+ "name": "azv1",
+ "rootVolume": False,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.0,
+ "unit": "GB"
+ }
+ }
+ ]
+ },
+ ]
+ },
+ "gcpVsaWorkingEnvironments": {},
+ "onPremWorkingEnvironments": {},
+ "vsaWorkingEnvironments": {
+ "VsaWorkingEnvironment-aws12345": [
+ {
+ "availableCapacity": {
+ "size": 430.0,
+ "unit": "GB"
+ },
+ "disks": [
+ {
+ "device": "xvdh vol-381",
+ "name": "testAWSHA-01-i-196h",
+ "ownerNode": "testAWSHA-01",
+ "position": "data",
+ },
+ {
+ "device": "xvdh vol-382",
+ "name": "testAWSHA-01-i-195h",
+ "ownerNode": "testAWSHA-01",
+ "position": "data",
+ }
+ ],
+ "encryptionType": "cloudEncrypted",
+ "homeNode": "testAWSHA-01",
+ "isRoot": False,
+ "name": "aggr1",
+ "ownerNode": "testAWSHA-01",
+ "providerVolumes": [
+ {
+ "device": "/dev/xvdh",
+ "diskType": "gp2",
+ "encrypted": True,
+ "id": "vol-381",
+ "instanceId": "i-196",
+ "name": "vol-381",
+ "size": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "state": "in-use"
+ },
+ {
+ "device": "/dev/xvdh",
+ "diskType": "gp2",
+ "encrypted": True,
+ "id": "vol-382",
+ "instanceId": "i-195",
+ "name": "vol-382",
+ "size": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "state": "in-use"
+ }
+ ],
+ "sidlEnabled": True,
+ "snaplockType": "non_snaplock",
+ "state": "online",
+ "totalCapacity": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "usedCapacity": {
+ "size": 70.0,
+ "unit": "GB"
+ },
+ "volumes": [
+ {
+ "isClone": False,
+ "name": "svm_testAWSHA_root",
+ "rootVolume": True,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 1.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.000339508056640625,
+ "unit": "GB"
+ }
+ },
+ {
+ "isClone": False,
+ "name": "vha",
+ "rootVolume": False,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 100.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.0,
+ "unit": "GB"
+ }
+ }
+ ]
+ }
+ ],
+ "VsaWorkingEnvironment-awsha345": [
+ {
+ "availableCapacity": {
+ "size": 430.0,
+ "unit": "GB"
+ },
+ "disks": [
+ {
+ "device": "xvdg vol-369",
+ "name": "testAWS-01-i-190g",
+ "ownerNode": "testAWS-01",
+ "position": "data",
+ }
+ ],
+ "encryptionType": "cloudEncrypted",
+ "homeNode": "testAWS-01",
+ "isRoot": False,
+ "name": "aggr1",
+ "ownerNode": "testAWS-01",
+ "providerVolumes": [
+ {
+ "device": "/dev/xvdg",
+ "diskType": "gp2",
+ "encrypted": True,
+ "id": "vol-369",
+ "instanceId": "i-190",
+ "name": "vol-369",
+ "size": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "state": "in-use"
+ }
+ ],
+ "sidlEnabled": True,
+ "snaplockType": "non_snaplock",
+ "state": "online",
+ "totalCapacity": {
+ "size": 500.0,
+ "unit": "GB"
+ },
+ "usedCapacity": {
+ "size": 70.0,
+ "unit": "GB"
+ },
+ "volumes": [
+ {
+ "isClone": False,
+ "name": "svm_testAWS_root",
+ "rootVolume": True,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 1.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.000339508056640625,
+ "unit": "GB"
+ }
+ },
+ {
+ "isClone": False,
+ "name": "v1",
+ "rootVolume": False,
+ "thinProvisioned": True,
+ "totalSize": {
+ "size": 100.0,
+ "unit": "GB"
+ },
+ "usedSize": {
+ "size": 0.0,
+ "unit": "GB"
+ }
+ }
+ ]
+ },
+ {
+ "availableCapacity": {
+ "size": 86.0,
+ "unit": "GB"
+ },
+ "disks": [
+ {
+ "device": "xvdh vol-371",
+ "name": "testAWS-01-i-190h",
+ "ownerNode": "testAWS-01",
+ "position": "data",
+ }
+ ],
+ "encryptionType": "cloudEncrypted",
+ "homeNode": "testAWS-01",
+ "isRoot": False,
+ "name": "aggr2",
+ "ownerNode": "testAWS-01",
+ "providerVolumes": [
+ {
+ "device": "/dev/xvdh",
+ "diskType": "gp2",
+ "encrypted": True,
+ "id": "vol-371",
+ "instanceId": "i-190",
+ "name": "vol-371",
+ "size": {
+ "size": 100.0,
+ "unit": "GB"
+ },
+ "state": "in-use"
+ }
+ ],
+ "sidlEnabled": True,
+ "snaplockType": "non_snaplock",
+ "state": "online",
+ "totalCapacity": {
+ "size": 100.0,
+ "unit": "GB"
+ },
+ "usedCapacity": {
+ "size": 0.0,
+ "unit": "GB"
+ },
+ "volumes": []
+ }
+ ]
+ }
+ }
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_info: %s' % repr(exc.value))
+ assert not exc.value.args[0]['changed']
+
+
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+@patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_accounts_info')
+def test_get_accounts_info(accounts_info, get_token, patch_ansible):
+ args = dict(set_args_get_accounts_info())
+ set_module_args(args)
+ get_token.return_value = 'token_type', 'token'
+ accounts_info.return_value = {
+ "awsAccounts": [
+ {
+ "accessKey": "1",
+ "accountId": "123456789011",
+ "accountName": "tami",
+ "accountType": "AWS_KEYS",
+ "publicId": "CloudProviderAccount-Ekj6L9QX",
+ "subscriptionId": "hackExp10Days",
+ "vsaList": []
+ },
+ {
+ "accessKey": "",
+ "accountId": "123456789011",
+ "accountName": "Instance Profile",
+ "accountType": "INSTANCE_PROFILE",
+ "occmRole": "occmRole",
+ "publicId": "InstanceProfile",
+ "subscriptionId": "hackExp10Days",
+ "vsaList": [
+ {
+ "name": "CVO_AWSCluster",
+ "publicId": "VsaWorkingEnvironment-9m3I6i3I",
+ "workingEnvironmentType": "AWS"
+ },
+ {
+ "name": "testAWS1",
+ "publicId": "VsaWorkingEnvironment-JCzkA9OX",
+ "workingEnvironmentType": "AWS"
+ },
+ ]
+ }
+ ],
+ "azureAccounts": [
+ {
+ "accountName": "AzureKeys",
+ "accountType": "AZURE_KEYS",
+ "applicationId": "1",
+ "publicId": "CloudProviderAccount-T84ceMYu",
+ "tenantId": "1",
+ "vsaList": [
+ {
+ "name": "testAZURE",
+ "publicId": "VsaWorkingEnvironment-jI0tbceH",
+ "workingEnvironmentType": "AZURE"
+ },
+ {
+ "name": "test",
+ "publicId": "VsaWorkingEnvironment-00EnDcfB",
+ "workingEnvironmentType": "AZURE"
+ },
+ ]
+ },
+ {
+ "accountName": "s",
+ "accountType": "AZURE_KEYS",
+ "applicationId": "1",
+ "publicId": "CloudProviderAccount-XxbN95dj",
+ "tenantId": "1",
+ "vsaList": []
+ }
+ ],
+ "gcpStorageAccounts": [],
+ "nssAccounts": [
+ {
+ "accountName": "TESTCLOUD2",
+ "accountType": "NSS_KEYS",
+ "nssUserName": "TESTCLOUD2",
+ "publicId": "be2f3cac-352a-46b9-a341-a446c35b61c9",
+ "vsaList": [
+ {
+ "name": "testAWS",
+ "publicId": "VsaWorkingEnvironment-3txYJOsX",
+ "workingEnvironmentType": "AWS"
+ },
+ {
+ "name": "testAZURE",
+ "publicId": "VsaWorkingEnvironment-jI0tbceH",
+ "workingEnvironmentType": "AZURE"
+ },
+ ]
+ },
+ {
+ "accountName": "ntapitdemo",
+ "accountType": "NSS_KEYS",
+ "nssUserName": "ntapitdemo",
+ "publicId": "01e43a7d-cfc9-4682-aa12-15374ce81638",
+ "vsaList": [
+ {
+ "name": "test",
+ "publicId": "VsaWorkingEnvironment-00EnDcfB",
+ "workingEnvironmentType": "AZURE"
+ }
+ ]
+ }
+ ]
+ }, None
+ my_obj = my_module()
+ my_obj.rest_api.api_root_path = "my_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_info: %s' % repr(exc.value))
+ assert not exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py
new file mode 100644
index 000000000..a9f41beed
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_nss_account.py
@@ -0,0 +1,144 @@
+# (c) 2021, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account \
+ import NetAppCloudmanagerNssAccount as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'test_nss_account',
+ 'username': 'username',
+ 'password': 'password',
+ 'client_id': 'client_id',
+ 'refresh_token': 'refrsh_token'
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.create_nss_account')
+ def test_create_nss_account_successfully(self, create, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = None
+ create.return_value = None
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.create_nss_account')
+ def test_create_nss_account_idempotency(self, create, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = {
+ 'name': 'test_nss_account',
+ 'username': 'TESTCLOUD1',
+ 'password': 'test_test',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'CvMJXRhz5V4dmxZqVg5LDRDlZyE - kbqRKT9YMcAsjmwFs'
+ }
+ create.return_value = None
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert not exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.get_nss_account')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_nss_account.NetAppCloudmanagerNssAccount.delete_nss_account')
+ def test_create_nss_account_successfully(self, delete, get, get_token):
+ args = self.set_default_args_pass_check()
+ args['state'] = 'absent'
+ set_module_args(args)
+ get.return_value = {
+ 'name': 'test_nss_account',
+ 'username': 'TESTCLOUD1',
+ 'password': 'test_test',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'refresh_token': 'CvMJXRhz5V4dmxZqVg5LDRDlZyE - kbqRKT9YMcAsjmwFs'
+ }
+ delete.return_value = None
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py
new file mode 100644
index 000000000..9d1189489
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_snapmirror.py
@@ -0,0 +1,176 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+
+__metaclass__ = type
+
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror \
+ import NetAppCloudmanagerSnapmirror as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'source_working_environment_name': 'TestA',
+ 'destination_working_environment_name': 'TestB',
+ 'source_volume_name': 'source',
+ 'destination_volume_name': 'dest',
+ 'source_svm_name': 'source_svm',
+ 'destination_svm_name': 'dest_svm',
+ 'policy': 'MirrorAllSnapshots',
+ 'schedule': 'min',
+ 'max_transfer_rate': 102400,
+ 'client_id': 'client_id',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_create_cloudmanager_snapmirror(self):
+ return dict({
+ 'state': 'present',
+ 'source_working_environment_name': 'TestA',
+ 'destination_working_environment_name': 'TestB',
+ 'source_volume_name': 'source',
+ 'destination_volume_name': 'dest',
+ 'source_svm_name': 'source_svm',
+ 'destination_svm_name': 'dest_svm',
+ 'policy': 'MirrorAllSnapshots',
+ 'schedule': 'min',
+ 'max_transfer_rate': 102400,
+ 'client_id': 'client_id',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def set_args_delete_cloudmanager_snapmirror(self):
+ return dict({
+ 'state': 'absent',
+ 'source_working_environment_name': 'TestA',
+ 'destination_working_environment_name': 'TestB',
+ 'source_volume_name': 'source',
+ 'destination_volume_name': 'dest',
+ 'client_id': 'client_id',
+ 'refresh_token': 'myrefresh_token',
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ def test_module_fail_when_required_args_present(self, get_token):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleExitJson) as exc:
+ set_module_args(self.set_default_args_pass_check())
+ get_token.return_value = 'test', 'test'
+ my_module()
+ exit_json(changed=True, msg="TestCase Fail when required args are present")
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_detail_for_snapmirror')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.wait_on_completion')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_snapmirror')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.build_quote_request')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.quote_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_volumes')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_interclusterlifs')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_cloudmanager_snapmirror_create_pass(self, send_request, get_interclusterlifs, get_volumes, quote_volume, build_quote_request,
+ get_snapmirror, wait_on_completion, get_working_environment_detail_for_snapmirror, get_token):
+ set_module_args(self.set_args_create_cloudmanager_snapmirror())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ response = {'id': 'abcdefg12345'}
+ source_we_info = {'publicId': 'test1', 'workingEnvironmentType': 'AMAZON'}
+ dest_we_info = {'publicId': 'test2', 'workingEnvironmentType': 'AMAZON', 'svmName': 'source_svm', 'name': 'TestB'}
+ source_vol = [{'name': 'source', 'svmName': 'source_svm', 'providerVolumeType': 'abc'}]
+ quote_volume_response = {'numOfDisks': 10, 'aggregateName': 'aggr1'}
+ interclusterlifs_resp = {'interClusterLifs': [{'address': '10.10.10.10'}], 'peerInterClusterLifs': [{'address': '10.10.10.10'}]}
+ get_working_environment_detail_for_snapmirror.return_value = source_we_info, dest_we_info, None
+ send_request.return_value = response, None, None
+ wait_on_completion.return_value = None
+ get_snapmirror.return_value = None
+ get_volumes.return_value = source_vol
+ build_quote_request.return_value = {'name': 'test'}
+ quote_volume.return_value = quote_volume_response
+ get_interclusterlifs.return_value = interclusterlifs_resp
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_create_cloudmanager_snapmirror_create_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_snapmirror.NetAppCloudmanagerSnapmirror.get_snapmirror')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_delete_cloudmanager_snapmirror_delete_pass(self, send_request, get_snapmirror, get_token):
+ set_module_args(self.set_args_delete_cloudmanager_snapmirror())
+ get_token.return_value = 'test', 'test'
+ my_obj = my_module()
+
+ my_snapmirror = {
+ 'source_working_environment_id': '456',
+ 'destination_svm_name': 'dest_svm',
+ 'destination_working_environment_id': '123'}
+ get_snapmirror.return_value = my_snapmirror
+ send_request.return_value = None, None, None
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ my_obj.apply()
+ print('Info: test_delete_cloudmanager_snapmirror_delete_pass: %s' % repr(exc.value))
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py
new file mode 100644
index 000000000..15b4802df
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/plugins/modules/test_na_cloudmanager_volume.py
@@ -0,0 +1,216 @@
+# (c) 2022, NetApp, Inc
+# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+''' unit tests Cloudmanager Ansible module: '''
+
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+import json
+import sys
+import pytest
+
+from ansible.module_utils import basic
+from ansible.module_utils._text import to_bytes
+from ansible_collections.netapp.cloudmanager.tests.unit.compat import unittest
+from ansible_collections.netapp.cloudmanager.tests.unit.compat.mock import patch, Mock
+import ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp as netapp_utils
+from ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume \
+ import NetAppCloudmanagerVolume as my_module
+
+if not netapp_utils.HAS_REQUESTS and sys.version_info < (3, 5):
+ pytestmark = pytest.mark.skip('skipping as missing required imports on 2.6 and 2.7')
+
+
+def set_module_args(args):
+ '''prepare arguments so that they will be picked up during module creation'''
+ args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
+ basic._ANSIBLE_ARGS = to_bytes(args) # pylint: disable=protected-access
+
+
+class AnsibleExitJson(Exception):
+ '''Exception class to be raised by module.exit_json and caught by the test case'''
+
+
+class AnsibleFailJson(Exception):
+ '''Exception class to be raised by module.fail_json and caught by the test case'''
+
+
+def exit_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over exit_json; package return data into an exception'''
+ if 'changed' not in kwargs:
+ kwargs['changed'] = False
+ raise AnsibleExitJson(kwargs)
+
+
+def fail_json(*args, **kwargs): # pylint: disable=unused-argument
+ '''function to patch over fail_json; package return data into an exception'''
+ kwargs['failed'] = True
+ raise AnsibleFailJson(kwargs)
+
+
+class MockCMConnection():
+ ''' Mock response of http connections '''
+ def __init__(self, kind=None, parm1=None):
+ self.type = kind
+ self.parm1 = parm1
+ self.xml_in = None
+ self.xml_out = None
+
+
+class TestMyModule(unittest.TestCase):
+ ''' a group of related Unit Tests '''
+
+ def setUp(self):
+ self.mock_module_helper = patch.multiple(basic.AnsibleModule,
+ exit_json=exit_json,
+ fail_json=fail_json)
+ self.mock_module_helper.start()
+ self.addCleanup(self.mock_module_helper.stop)
+
+ def set_default_args_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'testvol',
+ 'working_environment_id': 'VsaWorkingEnvironment-abcdefg12345',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'svm_name': 'svm_justinaws',
+ 'snapshot_policy_name': 'default',
+ 'tiering_policy': 'auto',
+ 'export_policy_type': 'custom',
+ 'export_policy_ip': ["10.30.0.1/16"],
+ 'export_policy_nfs_version': ["nfs4"],
+ 'refresh_token': 'myrefresh_token',
+ 'size': 10,
+ })
+
+ def set_default_args_with_workingenv_name_pass_check(self):
+ return dict({
+ 'state': 'present',
+ 'name': 'testvol',
+ 'working_environment_name': 'weone',
+ 'client_id': 'Nw4Q2O1kdnLtvhwegGalFnodEHUfPJWh',
+ 'svm_name': 'svm_justinaws',
+ 'snapshot_policy_name': 'default',
+ 'export_policy_type': 'custom',
+ 'export_policy_ip': ["10.30.0.1/16"],
+ 'export_policy_nfs_version': ["nfs4"],
+ 'refresh_token': 'myrefresh_token',
+ 'size': 10,
+ })
+
+ def test_module_fail_when_required_args_missing(self):
+ ''' required arguments are reported as errors '''
+ with pytest.raises(AnsibleFailJson) as exc:
+ set_module_args({})
+ my_module()
+ print('Info: %s' % exc.value.args[0]['msg'])
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.create_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_volume_successfully(self, send_request, create, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = None
+ create.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_volume_idempotency(self, send_request, get, get_token):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = {
+ 'name': 'testvol',
+ 'snapshot_policy_name': 'default',
+ 'export_policy_type': 'custom',
+ 'export_policy_ip': ["10.30.0.1/16"],
+ 'export_policy_nfs_version': ["nfs4"],
+ }
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert not exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.modify_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_update_volume_successfully(self, send_request, get, get_token, modify):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = {
+ 'name': 'testvol',
+ 'snapshot_policy_name': 'default',
+ 'tiering_policy': 'snapshot_only',
+ 'export_policy_type': 'custom',
+ 'export_policy_ip': ["10.30.0.1/16"],
+ 'export_policy_nfs_version': ["nfs3", "nfs4"],
+ }
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)]
+ get_token.return_value = ("type", "token")
+ modify.return_value = None
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.modify_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_update_volume_idempotency(self, send_request, get, get_token, modify):
+ set_module_args(self.set_default_args_pass_check())
+ get.return_value = {
+ 'name': 'testvol',
+ 'snapshot_policy_name': 'default',
+ 'export_policy_type': 'custom',
+ 'export_policy_ip': ["10.30.0.1/16"],
+ 'export_policy_nfs_version': ["nfs4"],
+ }
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)]
+ get_token.return_value = ("type", "token")
+ modify.return_value = None
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert not exc.value.args[0]['changed']
+
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.get_token')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp_module.NetAppModule.get_working_environment_details_by_name')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.get_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.modules.na_cloudmanager_volume.NetAppCloudmanagerVolume.create_volume')
+ @patch('ansible_collections.netapp.cloudmanager.plugins.module_utils.netapp.CloudManagerRestAPI.send_request')
+ def test_create_volume_by_workingenv_name_successfully(self, send_request, create, get, get_we, get_token):
+ args = self.set_default_args_with_workingenv_name_pass_check()
+ my_we = {
+ 'name': 'test',
+ 'publicId': 'test',
+ 'cloudProviderName': 'Amazon'}
+ get_we.return_value = my_we, None
+ args['working_environment_id'] = my_we['publicId']
+ set_module_args(args)
+ get.return_value = None
+ create.return_value = None
+ send_request.side_effect = [({'publicId': 'id', 'svmName': 'svm_name', 'cloudProviderName': "aws", 'isHA': False}, None, None)]
+ get_token.return_value = ("type", "token")
+ obj = my_module()
+ obj.rest_api.api_root_path = "test_root_path"
+
+ with pytest.raises(AnsibleExitJson) as exc:
+ obj.apply()
+ assert exc.value.args[0]['changed']
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt b/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt
new file mode 100644
index 000000000..484081daa
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/requirements-azure.txt
@@ -0,0 +1 @@
+cryptography>=3.2.0 ; python_version >= '3.5' \ No newline at end of file
diff --git a/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt b/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt
new file mode 100644
index 000000000..88c25079f
--- /dev/null
+++ b/ansible_collections/netapp/cloudmanager/tests/unit/requirements.txt
@@ -0,0 +1,10 @@
+boto3 ; python_version >= '3.5'
+botocore ; python_version >= '3.5'
+azure-mgmt-compute ; python_version >= '3.5'
+azure-mgmt-network ; python_version >= '3.5'
+azure-mgmt-storage ; python_version >= '3.5'
+azure-mgmt-resource ; python_version >= '3.5'
+azure-cli-core ; python_version >= '3.5'
+msrestazure ; python_version >= '3.5'
+azure-common ; python_version >= '3.5'
+google-auth ; python_version >= '3.5'