From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../mgr/dashboard/tests/test_access_control.py | 871 +++++++++++++++++++++ 1 file changed, 871 insertions(+) create mode 100644 src/pybind/mgr/dashboard/tests/test_access_control.py (limited to 'src/pybind/mgr/dashboard/tests/test_access_control.py') diff --git a/src/pybind/mgr/dashboard/tests/test_access_control.py b/src/pybind/mgr/dashboard/tests/test_access_control.py new file mode 100644 index 000000000..361f65e60 --- /dev/null +++ b/src/pybind/mgr/dashboard/tests/test_access_control.py @@ -0,0 +1,871 @@ +# -*- coding: utf-8 -*- +# pylint: disable=dangerous-default-value,too-many-public-methods +from __future__ import absolute_import + +import errno +import json +import tempfile +import time +import unittest +from datetime import datetime, timedelta + +from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE + +from .. import mgr +from ..security import Permission, Scope +from ..services.access_control import SYSTEM_ROLES, AccessControlDB, \ + PasswordPolicy, load_access_control_db, password_hash +from ..settings import Settings +from ..tests import CLICommandTestMixin, CmdException + + +class AccessControlTest(unittest.TestCase, CLICommandTestMixin): + + @classmethod + def setUpClass(cls): + cls.mock_kv_store() + mgr.ACCESS_CONTROL_DB = None + + def setUp(self): + self.CONFIG_KEY_DICT.clear() + load_access_control_db() + + def load_persistent_db(self): + config_key = AccessControlDB.accessdb_config_key() + self.assertIn(config_key, self.CONFIG_KEY_DICT) + db_json = self.CONFIG_KEY_DICT[config_key] + db = json.loads(db_json) + return db + + # The DB is written to persistent storage the first time it is saved. + # However, should an operation fail due to , we may end up in + # a state where we have a completely empty CONFIG_KEY_DICT (our mock + # equivalent to the persistent state). While this works for most of the + # tests in this class, that would prevent us from testing things like + # "run a command that is expected to fail, and then ensure nothing + # happened", because we'd be asserting in `load_persistent_db()` due to + # the map being empty. + # + # This function will therefore force state to be written to our mock + # persistent state. We could have added this extra step to + # `load_persistent_db()` directly, but that would conflict with the + # upgrade tests. This way, we can selectively enforce this requirement + # where we believe it to be necessary; generically speaking, this should + # not be needed unless we're testing very specific behaviors. + # + def setup_and_load_persistent_db(self): + mgr.ACCESS_CTRL_DB.save() + self.load_persistent_db() + + def validate_persistent_role(self, rolename, scopes_permissions, + description=None): + db = self.load_persistent_db() + self.assertIn('roles', db) + self.assertIn(rolename, db['roles']) + self.assertEqual(db['roles'][rolename]['name'], rolename) + self.assertEqual(db['roles'][rolename]['description'], description) + self.assertDictEqual(db['roles'][rolename]['scopes_permissions'], + scopes_permissions) + + def validate_persistent_no_role(self, rolename): + db = self.load_persistent_db() + self.assertIn('roles', db) + self.assertNotIn(rolename, db['roles']) + + def validate_persistent_user(self, username, roles, password=None, + name=None, email=None, last_update=None, + enabled=True, pwdExpirationDate=None): + db = self.load_persistent_db() + self.assertIn('users', db) + self.assertIn(username, db['users']) + self.assertEqual(db['users'][username]['username'], username) + self.assertListEqual(db['users'][username]['roles'], roles) + if password: + self.assertEqual(db['users'][username]['password'], password) + if name: + self.assertEqual(db['users'][username]['name'], name) + if email: + self.assertEqual(db['users'][username]['email'], email) + if last_update: + self.assertEqual(db['users'][username]['lastUpdate'], last_update) + if pwdExpirationDate: + self.assertEqual(db['users'][username]['pwdExpirationDate'], pwdExpirationDate) + self.assertEqual(db['users'][username]['enabled'], enabled) + + def validate_persistent_no_user(self, username): + db = self.load_persistent_db() + self.assertIn('users', db) + self.assertNotIn(username, db['users']) + + def test_create_role(self): + role = self.exec_cmd('ac-role-create', rolename='test_role') + self.assertDictEqual(role, {'name': 'test_role', 'description': None, + 'scopes_permissions': {}}) + self.validate_persistent_role('test_role', {}) + + def test_create_role_with_desc(self): + role = self.exec_cmd('ac-role-create', rolename='test_role', + description='Test Role') + self.assertDictEqual(role, {'name': 'test_role', + 'description': 'Test Role', + 'scopes_permissions': {}}) + self.validate_persistent_role('test_role', {}, 'Test Role') + + def test_create_duplicate_role(self): + self.test_create_role() + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-create', rolename='test_role') + + self.assertEqual(ctx.exception.retcode, -errno.EEXIST) + self.assertEqual(str(ctx.exception), "Role 'test_role' already exists") + + def test_delete_role(self): + self.test_create_role() + out = self.exec_cmd('ac-role-delete', rolename='test_role') + self.assertEqual(out, "Role 'test_role' deleted") + self.validate_persistent_no_role('test_role') + + def test_delete_nonexistent_role(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-delete', rolename='test_role') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") + + def test_show_single_role(self): + self.test_create_role() + role = self.exec_cmd('ac-role-show', rolename='test_role') + self.assertDictEqual(role, {'name': 'test_role', 'description': None, + 'scopes_permissions': {}}) + + def test_show_nonexistent_role(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-show', rolename='test_role') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") + + def test_show_system_roles(self): + roles = self.exec_cmd('ac-role-show') + self.assertEqual(len(roles), len(SYSTEM_ROLES)) + for role in roles: + self.assertIn(role, SYSTEM_ROLES) + + def test_show_system_role(self): + role = self.exec_cmd('ac-role-show', rolename="read-only") + self.assertEqual(role['name'], 'read-only') + self.assertEqual( + role['description'], + 'allows read permission for all security scope except dashboard settings and config-opt' + ) + + def test_delete_system_role(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-delete', rolename='administrator') + + self.assertEqual(ctx.exception.retcode, -errno.EPERM) + self.assertEqual(str(ctx.exception), + "Cannot delete system role 'administrator'") + + def test_add_role_scope_perms(self): + self.test_create_role() + self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', + scopename=Scope.POOL, + permissions=[Permission.READ, Permission.DELETE]) + role = self.exec_cmd('ac-role-show', rolename='test_role') + self.assertDictEqual(role, {'name': 'test_role', + 'description': None, + 'scopes_permissions': { + Scope.POOL: [Permission.DELETE, + Permission.READ] + }}) + self.validate_persistent_role('test_role', { + Scope.POOL: [Permission.DELETE, Permission.READ] + }) + + def test_del_role_scope_perms(self): + self.test_add_role_scope_perms() + self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', + scopename=Scope.MONITOR, + permissions=[Permission.READ, Permission.CREATE]) + self.validate_persistent_role('test_role', { + Scope.POOL: [Permission.DELETE, Permission.READ], + Scope.MONITOR: [Permission.CREATE, Permission.READ] + }) + self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', + scopename=Scope.POOL) + role = self.exec_cmd('ac-role-show', rolename='test_role') + self.assertDictEqual(role, {'name': 'test_role', + 'description': None, + 'scopes_permissions': { + Scope.MONITOR: [Permission.CREATE, + Permission.READ] + }}) + self.validate_persistent_role('test_role', { + Scope.MONITOR: [Permission.CREATE, Permission.READ] + }) + + def test_add_role_scope_perms_nonexistent_role(self): + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', + scopename='pool', + permissions=['read', 'delete']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") + + def test_add_role_invalid_scope_perms(self): + self.test_create_role() + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', + scopename='invalidscope', + permissions=['read', 'delete']) + + self.assertEqual(ctx.exception.retcode, -errno.EINVAL) + self.assertEqual(str(ctx.exception), + "Scope 'invalidscope' is not valid\n Possible values: " + "{}".format(Scope.all_scopes())) + + def test_add_role_scope_invalid_perms(self): + self.test_create_role() + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', + scopename='pool', permissions=['invalidperm']) + + self.assertEqual(ctx.exception.retcode, -errno.EINVAL) + self.assertEqual(str(ctx.exception), + "Permission 'invalidperm' is not valid\n Possible " + "values: {}".format(Permission.all_permissions())) + + def test_del_role_scope_perms_nonexistent_role(self): + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', + scopename='pool') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") + + def test_del_role_nonexistent_scope_perms(self): + self.test_add_role_scope_perms() + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', + scopename='nonexistentscope') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), + "There are no permissions for scope 'nonexistentscope' " + "in role 'test_role'") + + def test_not_permitted_add_role_scope_perms(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-add-scope-perms', rolename='read-only', + scopename='pool', permissions=['read', 'delete']) + + self.assertEqual(ctx.exception.retcode, -errno.EPERM) + self.assertEqual(str(ctx.exception), + "Cannot update system role 'read-only'") + + def test_not_permitted_del_role_scope_perms(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-del-scope-perms', rolename='read-only', + scopename='pool') + + self.assertEqual(ctx.exception.retcode, -errno.EPERM) + self.assertEqual(str(ctx.exception), + "Cannot update system role 'read-only'") + + def test_create_user(self, username='admin', rolename=None, enabled=True, + pwdExpirationDate=None): + user = self.exec_cmd('ac-user-create', username=username, + rolename=rolename, inbuf='admin', + name='{} User'.format(username), + email='{}@user.com'.format(username), + enabled=enabled, force_password=True, + pwd_expiration_date=pwdExpirationDate) + + pass_hash = password_hash('admin', user['password']) + self.assertDictEqual(user, { + 'username': username, + 'password': pass_hash, + 'pwdExpirationDate': pwdExpirationDate, + 'pwdUpdateRequired': False, + 'lastUpdate': user['lastUpdate'], + 'name': '{} User'.format(username), + 'email': '{}@user.com'.format(username), + 'roles': [rolename] if rolename else [], + 'enabled': enabled + }) + self.validate_persistent_user(username, [rolename] if rolename else [], + pass_hash, '{} User'.format(username), + '{}@user.com'.format(username), + user['lastUpdate'], enabled) + return user + + def test_create_disabled_user(self): + self.test_create_user(enabled=False) + + def test_create_user_pwd_expiration_date(self): + expiration_date = datetime.utcnow() + timedelta(days=10) + expiration_date = int(time.mktime(expiration_date.timetuple())) + self.test_create_user(pwdExpirationDate=expiration_date) + + def test_create_user_with_role(self): + self.test_add_role_scope_perms() + self.test_create_user(rolename='test_role') + + def test_create_user_with_system_role(self): + self.test_create_user(rolename='administrator') + + def test_delete_user(self): + self.test_create_user() + out = self.exec_cmd('ac-user-delete', username='admin') + self.assertEqual(out, "User 'admin' deleted") + users = self.exec_cmd('ac-user-show') + self.assertEqual(len(users), 0) + self.validate_persistent_no_user('admin') + + def test_create_duplicate_user(self): + self.test_create_user() + ret = self.exec_cmd('ac-user-create', username='admin', inbuf='admin', + force_password=True) + self.assertEqual(ret, "User 'admin' already exists") + + def test_create_users_with_dne_role(self): + # one time call to setup our persistent db + self.setup_and_load_persistent_db() + + # create a user with a role that does not exist; expect a failure + try: + self.exec_cmd('ac-user-create', username='foo', + rolename='dne_role', inbuf='foopass', + name='foo User', email='foo@user.com', + force_password=True) + except CmdException as e: + self.assertEqual(e.retcode, -errno.ENOENT) + + db = self.load_persistent_db() + if 'users' in db: + self.assertNotIn('foo', db['users']) + + # We could just finish our test here, given we ensured that the user + # with a non-existent role is not in persistent storage. However, + # we're going to test the database's consistency, making sure that + # side-effects are not written to persistent storage once we commit + # an unrelated operation. To ensure this, we'll issue another + # operation that is sharing the same code path, and will check whether + # the next operation commits dirty state. + + # create a role (this will be 'test_role') + self.test_create_role() + self.exec_cmd('ac-user-create', username='bar', + rolename='test_role', inbuf='barpass', + name='bar User', email='bar@user.com', + force_password=True) + + # validate db: + # user 'foo' should not exist + # user 'bar' should exist and have role 'test_role' + self.validate_persistent_user('bar', ['test_role']) + + db = self.load_persistent_db() + self.assertIn('users', db) + self.assertNotIn('foo', db['users']) + + def test_delete_nonexistent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-delete', username='admin') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_add_user_roles(self, username='admin', + roles=['pool-manager', 'block-manager']): + user_orig = self.test_create_user(username) + uroles = [] + for role in roles: + uroles.append(role) + uroles.sort() + user = self.exec_cmd('ac-user-add-roles', username=username, + roles=[role]) + self.assertLessEqual(uroles, user['roles']) + self.validate_persistent_user(username, uroles) + self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) + + def test_add_user_roles2(self): + user_orig = self.test_create_user() + user = self.exec_cmd('ac-user-add-roles', username="admin", + roles=['pool-manager', 'block-manager']) + self.assertLessEqual(['block-manager', 'pool-manager'], + user['roles']) + self.validate_persistent_user('admin', ['block-manager', + 'pool-manager']) + self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) + + def test_add_user_roles_not_existent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-add-roles', username="admin", + roles=['pool-manager', 'block-manager']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_add_user_roles_not_existent_role(self): + self.test_create_user() + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-add-roles', username="admin", + roles=['Invalid Role']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), + "Role 'Invalid Role' does not exist") + + def test_set_user_roles(self): + user_orig = self.test_create_user() + user = self.exec_cmd('ac-user-add-roles', username="admin", + roles=['pool-manager']) + self.assertLessEqual(['pool-manager'], user['roles']) + self.validate_persistent_user('admin', ['pool-manager']) + self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) + user2 = self.exec_cmd('ac-user-set-roles', username="admin", + roles=['rgw-manager', 'block-manager']) + self.assertLessEqual(['block-manager', 'rgw-manager'], + user2['roles']) + self.validate_persistent_user('admin', ['block-manager', + 'rgw-manager']) + self.assertGreaterEqual(user2['lastUpdate'], user['lastUpdate']) + + def test_set_user_roles_not_existent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-roles', username="admin", + roles=['pool-manager', 'block-manager']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_set_user_roles_not_existent_role(self): + self.test_create_user() + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-roles', username="admin", + roles=['Invalid Role']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), + "Role 'Invalid Role' does not exist") + + def test_del_user_roles(self): + self.test_add_user_roles() + user = self.exec_cmd('ac-user-del-roles', username="admin", + roles=['pool-manager']) + self.assertLessEqual(['block-manager'], user['roles']) + self.validate_persistent_user('admin', ['block-manager']) + + def test_del_user_roles_not_existent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-del-roles', username="admin", + roles=['pool-manager', 'block-manager']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_del_user_roles_not_existent_role(self): + self.test_create_user() + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-del-roles', username="admin", + roles=['Invalid Role']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), + "Role 'Invalid Role' does not exist") + + def test_del_user_roles_not_associated_role(self): + self.test_create_user() + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-del-roles', username="admin", + roles=['rgw-manager']) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), + "Role 'rgw-manager' is not associated with user " + "'admin'") + + def test_show_user(self): + self.test_add_user_roles() + user = self.exec_cmd('ac-user-show', username='admin') + pass_hash = password_hash('admin', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'lastUpdate': user['lastUpdate'], + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'roles': ['block-manager', 'pool-manager'], + 'enabled': True + }) + + def test_show_nonexistent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-show', username='admin') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_show_all_users(self): + self.test_add_user_roles('admin', ['administrator']) + self.test_add_user_roles('guest', ['read-only']) + users = self.exec_cmd('ac-user-show') + self.assertEqual(len(users), 2) + for user in users: + self.assertIn(user, ['admin', 'guest']) + + def test_del_role_associated_with_user(self): + self.test_create_role() + self.test_add_user_roles('guest', ['test_role']) + + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-role-delete', rolename='test_role') + + self.assertEqual(ctx.exception.retcode, -errno.EPERM) + self.assertEqual(str(ctx.exception), + "Role 'test_role' is still associated with user " + "'guest'") + + def test_set_user_info(self): + user_orig = self.test_create_user() + user = self.exec_cmd('ac-user-set-info', username='admin', + name='Admin Name', email='admin@admin.com') + pass_hash = password_hash('admin', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'Admin Name', + 'email': 'admin@admin.com', + 'lastUpdate': user['lastUpdate'], + 'roles': [], + 'enabled': True + }) + self.validate_persistent_user('admin', [], pass_hash, 'Admin Name', + 'admin@admin.com') + self.assertEqual(user['lastUpdate'], user_orig['lastUpdate']) + + def test_set_user_info_nonexistent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-info', username='admin', + name='Admin Name', email='admin@admin.com') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_set_user_password(self): + user_orig = self.test_create_user() + user = self.exec_cmd('ac-user-set-password', username='admin', + inbuf='newpass', force_password=True) + pass_hash = password_hash('newpass', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'lastUpdate': user['lastUpdate'], + 'roles': [], + 'enabled': True + }) + self.validate_persistent_user('admin', [], pass_hash, 'admin User', + 'admin@user.com') + self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) + + def test_sanitize_password(self): + self.test_create_user() + password = 'myPass\\n\\r\\n' + with tempfile.TemporaryFile(mode='w+') as pwd_file: + # Add new line separators (like some text editors when a file is saved). + pwd_file.write('{}{}'.format(password, '\n\r\n\n')) + pwd_file.seek(0) + user = self.exec_cmd('ac-user-set-password', username='admin', + inbuf=pwd_file.read(), force_password=True) + pass_hash = password_hash(password, user['password']) + self.assertEqual(user['password'], pass_hash) + + def test_set_user_password_nonexistent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-password', username='admin', + inbuf='newpass', force_password=True) + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_set_user_password_empty(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-password', username='admin', inbuf='\n', + force_password=True) + + self.assertEqual(ctx.exception.retcode, -errno.EINVAL) + self.assertIn(ERROR_MSG_EMPTY_INPUT_FILE, str(ctx.exception)) + + def test_set_user_password_hash(self): + user_orig = self.test_create_user() + user = self.exec_cmd('ac-user-set-password-hash', username='admin', + inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') + pass_hash = password_hash('newpass', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'lastUpdate': user['lastUpdate'], + 'roles': [], + 'enabled': True + }) + self.validate_persistent_user('admin', [], pass_hash, 'admin User', + 'admin@user.com') + self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) + + def test_set_user_password_hash_nonexistent_user(self): + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-password-hash', username='admin', + inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') + + self.assertEqual(ctx.exception.retcode, -errno.ENOENT) + self.assertEqual(str(ctx.exception), "User 'admin' does not exist") + + def test_set_user_password_hash_broken_hash(self): + self.test_create_user() + with self.assertRaises(CmdException) as ctx: + self.exec_cmd('ac-user-set-password-hash', username='admin', + inbuf='1') + + self.assertEqual(ctx.exception.retcode, -errno.EINVAL) + self.assertEqual(str(ctx.exception), 'Invalid password hash') + + def test_set_login_credentials(self): + self.exec_cmd('set-login-credentials', username='admin', + inbuf='admin') + user = self.exec_cmd('ac-user-show', username='admin') + pass_hash = password_hash('admin', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': None, + 'email': None, + 'lastUpdate': user['lastUpdate'], + 'roles': ['administrator'], + 'enabled': True, + }) + self.validate_persistent_user('admin', ['administrator'], pass_hash, + None, None) + + def test_set_login_credentials_for_existing_user(self): + self.test_add_user_roles('admin', ['read-only']) + self.exec_cmd('set-login-credentials', username='admin', + inbuf='admin2') + user = self.exec_cmd('ac-user-show', username='admin') + pass_hash = password_hash('admin2', user['password']) + self.assertDictEqual(user, { + 'username': 'admin', + 'password': pass_hash, + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'lastUpdate': user['lastUpdate'], + 'roles': ['read-only'], + 'enabled': True + }) + self.validate_persistent_user('admin', ['read-only'], pass_hash, + 'admin User', 'admin@user.com') + + def test_load_v1(self): + self.CONFIG_KEY_DICT['accessdb_v1'] = ''' + {{ + "users": {{ + "admin": {{ + "username": "admin", + "password": + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + "roles": ["block-manager", "test_role"], + "name": "admin User", + "email": "admin@user.com", + "lastUpdate": {} + }} + }}, + "roles": {{ + "test_role": {{ + "name": "test_role", + "description": "Test Role", + "scopes_permissions": {{ + "{}": ["{}", "{}"], + "{}": ["{}"] + }} + }} + }}, + "version": 1 + }} + '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, + Permission.UPDATE, Scope.POOL, Permission.CREATE) + + load_access_control_db() + role = self.exec_cmd('ac-role-show', rolename="test_role") + self.assertDictEqual(role, { + 'name': 'test_role', + 'description': "Test Role", + 'scopes_permissions': { + Scope.ISCSI: [Permission.READ, Permission.UPDATE], + Scope.POOL: [Permission.CREATE] + } + }) + user = self.exec_cmd('ac-user-show', username="admin") + self.assertDictEqual(user, { + 'username': 'admin', + 'lastUpdate': user['lastUpdate'], + 'password': + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'roles': ['block-manager', 'test_role'], + 'enabled': True + }) + + def test_load_v2(self): + self.CONFIG_KEY_DICT['accessdb_v2'] = ''' + {{ + "users": {{ + "admin": {{ + "username": "admin", + "password": + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + "pwdExpirationDate": null, + "pwdUpdateRequired": false, + "roles": ["block-manager", "test_role"], + "name": "admin User", + "email": "admin@user.com", + "lastUpdate": {}, + "enabled": true + }} + }}, + "roles": {{ + "test_role": {{ + "name": "test_role", + "description": "Test Role", + "scopes_permissions": {{ + "{}": ["{}", "{}"], + "{}": ["{}"] + }} + }} + }}, + "version": 2 + }} + '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, + Permission.UPDATE, Scope.POOL, Permission.CREATE) + + load_access_control_db() + role = self.exec_cmd('ac-role-show', rolename="test_role") + self.assertDictEqual(role, { + 'name': 'test_role', + 'description': "Test Role", + 'scopes_permissions': { + Scope.ISCSI: [Permission.READ, Permission.UPDATE], + Scope.POOL: [Permission.CREATE] + } + }) + user = self.exec_cmd('ac-user-show', username="admin") + self.assertDictEqual(user, { + 'username': 'admin', + 'lastUpdate': user['lastUpdate'], + 'password': + "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", + 'pwdExpirationDate': None, + 'pwdUpdateRequired': False, + 'name': 'admin User', + 'email': 'admin@user.com', + 'roles': ['block-manager', 'test_role'], + 'enabled': True + }) + + def test_password_policy_pw_length(self): + Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True + Settings.PWD_POLICY_MIN_LENGTH = 3 + pw_policy = PasswordPolicy('foo') + self.assertTrue(pw_policy.check_password_length()) + + def test_password_policy_pw_length_fail(self): + Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True + pw_policy = PasswordPolicy('bar') + self.assertFalse(pw_policy.check_password_length()) + + def test_password_policy_credits_too_weak(self): + Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True + pw_policy = PasswordPolicy('foo') + pw_credits = pw_policy.check_password_complexity() + self.assertEqual(pw_credits, 3) + + def test_password_policy_credits_weak(self): + Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True + pw_policy = PasswordPolicy('mypassword1') + pw_credits = pw_policy.check_password_complexity() + self.assertEqual(pw_credits, 11) + + def test_password_policy_credits_ok(self): + Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True + pw_policy = PasswordPolicy('mypassword1!@') + pw_credits = pw_policy.check_password_complexity() + self.assertEqual(pw_credits, 17) + + def test_password_policy_credits_strong(self): + Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True + pw_policy = PasswordPolicy('testpassword0047!@') + pw_credits = pw_policy.check_password_complexity() + self.assertEqual(pw_credits, 22) + + def test_password_policy_credits_very_strong(self): + Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True + pw_policy = PasswordPolicy('testpassword#!$!@$') + pw_credits = pw_policy.check_password_complexity() + self.assertEqual(pw_credits, 30) + + def test_password_policy_forbidden_words(self): + Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True + pw_policy = PasswordPolicy('!@$testdashboard#!$') + self.assertTrue(pw_policy.check_if_contains_forbidden_words()) + + def test_password_policy_forbidden_words_custom(self): + Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True + Settings.PWD_POLICY_EXCLUSION_LIST = 'foo,bar' + pw_policy = PasswordPolicy('foo123bar') + self.assertTrue(pw_policy.check_if_contains_forbidden_words()) + + def test_password_policy_sequential_chars(self): + Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = True + pw_policy = PasswordPolicy('!@$test123#!$') + self.assertTrue(pw_policy.check_if_sequential_characters()) + + def test_password_policy_repetitive_chars(self): + Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = True + pw_policy = PasswordPolicy('!@$testfooo#!$') + self.assertTrue(pw_policy.check_if_repetitive_characters()) + + def test_password_policy_contain_username(self): + Settings.PWD_POLICY_CHECK_USERNAME_ENABLED = True + pw_policy = PasswordPolicy('%admin135)', 'admin') + self.assertTrue(pw_policy.check_if_contains_username()) + + def test_password_policy_is_old_pwd(self): + Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED = True + pw_policy = PasswordPolicy('foo', old_password='foo') + self.assertTrue(pw_policy.check_is_old_password()) -- cgit v1.2.3