diff options
Diffstat (limited to 'src/pybind/mgr/dashboard/tests/test_api_auditing.py')
-rw-r--r-- | src/pybind/mgr/dashboard/tests/test_api_auditing.py | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tests/test_api_auditing.py b/src/pybind/mgr/dashboard/tests/test_api_auditing.py new file mode 100644 index 000000000..854d76468 --- /dev/null +++ b/src/pybind/mgr/dashboard/tests/test_api_auditing.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import json +import re + +try: + import mock +except ImportError: + import unittest.mock as mock + +from .. import mgr +from ..controllers import RESTController, Router +from ..tests import ControllerTestCase, KVStoreMockMixin + + +# pylint: disable=W0613 +@Router('/foo', secure=False) +class FooResource(RESTController): + def create(self, password): + pass + + def get(self, key): + pass + + def delete(self, key): + pass + + def set(self, key, password, secret_key=None): + pass + + +class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin): + + _request_logging = True + + @classmethod + def setup_server(cls): + cls.setup_controllers([FooResource]) + + def setUp(self): + self.mock_kv_store() + mgr.cluster_log = mock.Mock() + mgr.set_module_option('AUDIT_API_ENABLED', True) + mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True) + + def _validate_cluster_log_msg(self, path, method, user, params): + channel, _, msg = mgr.cluster_log.call_args_list[0][0] + self.assertEqual(channel, 'audit') + pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \ + 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$' + m = re.match(pattern, msg) + self.assertEqual(m.group(2), path) + self.assertEqual(m.group(3), method) + self.assertEqual(m.group(4), user) + self.assertDictEqual(json.loads(m.group(5)), params) + + def test_no_audit(self): + mgr.set_module_option('AUDIT_API_ENABLED', False) + self._delete('/foo/test1') + mgr.cluster_log.assert_not_called() + + def test_no_payload(self): + mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False) + self._delete('/foo/test1') + _, _, msg = mgr.cluster_log.call_args_list[0][0] + self.assertNotIn('params=', msg) + + def test_no_audit_get(self): + self._get('/foo/test1') + mgr.cluster_log.assert_not_called() + + def test_audit_put(self): + self._put('/foo/test1', {'password': 'y', 'secret_key': 1234}) + mgr.cluster_log.assert_called_once() + self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None', + {'key': 'test1', + 'password': '***', + 'secret_key': '***'}) + + def test_audit_post(self): + with mock.patch('dashboard.services.auth.JwtManager.get_username', + return_value='hugo'): + self._post('/foo?password=1234') + mgr.cluster_log.assert_called_once() + self._validate_cluster_log_msg('/foo', 'POST', 'hugo', + {'password': '***'}) + + def test_audit_delete(self): + self._delete('/foo/test1') + mgr.cluster_log.assert_called_once() + self._validate_cluster_log_msg('/foo/test1', 'DELETE', + 'None', {'key': 'test1'}) |