summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tests/test_api_auditing.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/dashboard/tests/test_api_auditing.py')
-rw-r--r--src/pybind/mgr/dashboard/tests/test_api_auditing.py93
1 files changed, 93 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tests/test_api_auditing.py b/src/pybind/mgr/dashboard/tests/test_api_auditing.py
new file mode 100644
index 000000000..854d76468
--- /dev/null
+++ b/src/pybind/mgr/dashboard/tests/test_api_auditing.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import json
+import re
+
+try:
+ import mock
+except ImportError:
+ import unittest.mock as mock
+
+from .. import mgr
+from ..controllers import RESTController, Router
+from ..tests import ControllerTestCase, KVStoreMockMixin
+
+
+# pylint: disable=W0613
+@Router('/foo', secure=False)
+class FooResource(RESTController):
+ def create(self, password):
+ pass
+
+ def get(self, key):
+ pass
+
+ def delete(self, key):
+ pass
+
+ def set(self, key, password, secret_key=None):
+ pass
+
+
+class ApiAuditingTest(ControllerTestCase, KVStoreMockMixin):
+
+ _request_logging = True
+
+ @classmethod
+ def setup_server(cls):
+ cls.setup_controllers([FooResource])
+
+ def setUp(self):
+ self.mock_kv_store()
+ mgr.cluster_log = mock.Mock()
+ mgr.set_module_option('AUDIT_API_ENABLED', True)
+ mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', True)
+
+ def _validate_cluster_log_msg(self, path, method, user, params):
+ channel, _, msg = mgr.cluster_log.call_args_list[0][0]
+ self.assertEqual(channel, 'audit')
+ pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \
+ 'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$'
+ m = re.match(pattern, msg)
+ self.assertEqual(m.group(2), path)
+ self.assertEqual(m.group(3), method)
+ self.assertEqual(m.group(4), user)
+ self.assertDictEqual(json.loads(m.group(5)), params)
+
+ def test_no_audit(self):
+ mgr.set_module_option('AUDIT_API_ENABLED', False)
+ self._delete('/foo/test1')
+ mgr.cluster_log.assert_not_called()
+
+ def test_no_payload(self):
+ mgr.set_module_option('AUDIT_API_LOG_PAYLOAD', False)
+ self._delete('/foo/test1')
+ _, _, msg = mgr.cluster_log.call_args_list[0][0]
+ self.assertNotIn('params=', msg)
+
+ def test_no_audit_get(self):
+ self._get('/foo/test1')
+ mgr.cluster_log.assert_not_called()
+
+ def test_audit_put(self):
+ self._put('/foo/test1', {'password': 'y', 'secret_key': 1234})
+ mgr.cluster_log.assert_called_once()
+ self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None',
+ {'key': 'test1',
+ 'password': '***',
+ 'secret_key': '***'})
+
+ def test_audit_post(self):
+ with mock.patch('dashboard.services.auth.JwtManager.get_username',
+ return_value='hugo'):
+ self._post('/foo?password=1234')
+ mgr.cluster_log.assert_called_once()
+ self._validate_cluster_log_msg('/foo', 'POST', 'hugo',
+ {'password': '***'})
+
+ def test_audit_delete(self):
+ self._delete('/foo/test1')
+ mgr.cluster_log.assert_called_once()
+ self._validate_cluster_log_msg('/foo/test1', 'DELETE',
+ 'None', {'key': 'test1'})