From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- qa/tasks/mgr/dashboard/test_auth.py | 240 ++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 qa/tasks/mgr/dashboard/test_auth.py (limited to 'qa/tasks/mgr/dashboard/test_auth.py') diff --git a/qa/tasks/mgr/dashboard/test_auth.py b/qa/tasks/mgr/dashboard/test_auth.py new file mode 100644 index 00000000..df5485d4 --- /dev/null +++ b/qa/tasks/mgr/dashboard/test_auth.py @@ -0,0 +1,240 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import + +import time + +import jwt +from teuthology.orchestra.run import \ + CommandFailedError # pylint: disable=import-error + +from .helper import DashboardTestCase + + +class AuthTest(DashboardTestCase): + + AUTO_AUTHENTICATE = False + + def setUp(self): + super(AuthTest, self).setUp() + self.reset_session() + + def _validate_jwt_token(self, token, username, permissions): + payload = jwt.decode(token, options={'verify_signature': False}) + self.assertIn('username', payload) + self.assertEqual(payload['username'], username) + + for scope, perms in permissions.items(): + self.assertIsNotNone(scope) + self.assertIn('read', perms) + self.assertIn('update', perms) + self.assertIn('create', perms) + self.assertIn('delete', perms) + + def test_login_without_password(self): + with self.assertRaises(CommandFailedError): + self.create_user('admin2', '', ['administrator']) + + def test_a_set_login_credentials(self): + # test with Authorization header + self.create_user('admin2', 'admin2', ['administrator']) + self._post("/api/auth", {'username': 'admin2', 'password': 'admin2'}) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin2", data['permissions']) + self.delete_user('admin2') + + # test with Cookies set + self.create_user('admin2', 'admin2', ['administrator']) + self._post("/api/auth", {'username': 'admin2', 'password': 'admin2'}, set_cookies=True) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin2", data['permissions']) + self.delete_user('admin2') + + def test_login_valid(self): + # test with Authorization header + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin", data['permissions']) + + # test with Cookies set + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}, set_cookies=True) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin", data['permissions']) + + def test_login_invalid(self): + # test with Authorization header + self._post("/api/auth", {'username': 'admin', 'password': 'inval'}) + self.assertStatus(400) + self.assertJsonBody({ + "component": "auth", + "code": "invalid_credentials", + "detail": "Invalid credentials" + }) + + # test with Cookies set + self._post("/api/auth", {'username': 'admin', 'password': 'inval'}, set_cookies=True) + self.assertStatus(400) + self.assertJsonBody({ + "component": "auth", + "code": "invalid_credentials", + "detail": "Invalid credentials" + }) + + def test_logout(self): + # test with Authorization header + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin", data['permissions']) + self.set_jwt_token(data['token']) + self._post("/api/auth/logout") + self.assertStatus(200) + self.assertJsonBody({ + "redirect_url": "#/login" + }) + self._get("/api/host") + self.assertStatus(401) + self.set_jwt_token(None) + + # test with Cookies set + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}, set_cookies=True) + self.assertStatus(201) + data = self.jsonBody() + self._validate_jwt_token(data['token'], "admin", data['permissions']) + self.set_jwt_token(data['token']) + self._post("/api/auth/logout", set_cookies=True) + self.assertStatus(200) + self.assertJsonBody({ + "redirect_url": "#/login" + }) + self._get("/api/host", set_cookies=True) + self.assertStatus(401) + self.set_jwt_token(None) + + def test_token_ttl(self): + # test with Authorization header + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host") + self.assertStatus(200) + time.sleep(6) + self._get("/api/host") + self.assertStatus(401) + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) + self.set_jwt_token(None) + + # test with Cookies set + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}, set_cookies=True) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host", set_cookies=True) + self.assertStatus(200) + time.sleep(6) + self._get("/api/host") + self.assertStatus(401) + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) + self.set_jwt_token(None) + + def test_remove_from_blacklist(self): + # test with Authorization header + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + # the following call adds the token to the blacklist + self._post("/api/auth/logout") + self.assertStatus(200) + self._get("/api/host") + self.assertStatus(401) + time.sleep(6) + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) + self.set_jwt_token(None) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + # the following call removes expired tokens from the blacklist + self._post("/api/auth/logout") + self.assertStatus(200) + + # test with Cookies set + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}, set_cookies=True) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + # the following call adds the token to the blocklist + self._post("/api/auth/logout", set_cookies=True) + self.assertStatus(200) + self._get("/api/host", set_cookies=True) + self.assertStatus(401) + time.sleep(6) + self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) + self.set_jwt_token(None) + self._post("/api/auth", {'username': 'admin', 'password': 'admin'}, set_cookies=True) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + # the following call removes expired tokens from the blocklist + self._post("/api/auth/logout", set_cookies=True) + self.assertStatus(200) + + def test_unauthorized(self): + # test with Authorization header + self._get("/api/host") + self.assertStatus(401) + + # test with Cookies set + self._get("/api/host", set_cookies=True) + self.assertStatus(401) + + def test_invalidate_token_by_admin(self): + # test with Authorization header + self._get("/api/host") + self.assertStatus(401) + self.create_user('user', 'user', ['read-only']) + time.sleep(1) + self._post("/api/auth", {'username': 'user', 'password': 'user'}) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host") + self.assertStatus(200) + time.sleep(1) + self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password', 'user'], 'user2') + time.sleep(1) + self._get("/api/host") + self.assertStatus(401) + self.set_jwt_token(None) + self._post("/api/auth", {'username': 'user', 'password': 'user2'}) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host") + self.assertStatus(200) + self.delete_user("user") + + # test with Cookies set + self._get("/api/host", set_cookies=True) + self.assertStatus(401) + self.create_user('user', 'user', ['read-only']) + time.sleep(1) + self._post("/api/auth", {'username': 'user', 'password': 'user'}, set_cookies=True) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host", set_cookies=True) + self.assertStatus(200) + time.sleep(1) + self._ceph_cmd_with_secret(['dashboard', 'ac-user-set-password', 'user'], 'user2') + time.sleep(1) + self._get("/api/host", set_cookies=True) + self.assertStatus(401) + self.set_jwt_token(None) + self._post("/api/auth", {'username': 'user', 'password': 'user2'}, set_cookies=True) + self.assertStatus(201) + self.set_jwt_token(self.jsonBody()['token']) + self._get("/api/host", set_cookies=True) + self.assertStatus(200) + self.delete_user("user") -- cgit v1.2.3