summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tests/test_access_control.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/dashboard/tests/test_access_control.py')
-rw-r--r--src/pybind/mgr/dashboard/tests/test_access_control.py871
1 files changed, 871 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/tests/test_access_control.py b/src/pybind/mgr/dashboard/tests/test_access_control.py
new file mode 100644
index 000000000..361f65e60
--- /dev/null
+++ b/src/pybind/mgr/dashboard/tests/test_access_control.py
@@ -0,0 +1,871 @@
+# -*- coding: utf-8 -*-
+# pylint: disable=dangerous-default-value,too-many-public-methods
+from __future__ import absolute_import
+
+import errno
+import json
+import tempfile
+import time
+import unittest
+from datetime import datetime, timedelta
+
+from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE
+
+from .. import mgr
+from ..security import Permission, Scope
+from ..services.access_control import SYSTEM_ROLES, AccessControlDB, \
+ PasswordPolicy, load_access_control_db, password_hash
+from ..settings import Settings
+from ..tests import CLICommandTestMixin, CmdException
+
+
+class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.mock_kv_store()
+ mgr.ACCESS_CONTROL_DB = None
+
+ def setUp(self):
+ self.CONFIG_KEY_DICT.clear()
+ load_access_control_db()
+
+ def load_persistent_db(self):
+ config_key = AccessControlDB.accessdb_config_key()
+ self.assertIn(config_key, self.CONFIG_KEY_DICT)
+ db_json = self.CONFIG_KEY_DICT[config_key]
+ db = json.loads(db_json)
+ return db
+
+ # The DB is written to persistent storage the first time it is saved.
+ # However, should an operation fail due to <reasons>, we may end up in
+ # a state where we have a completely empty CONFIG_KEY_DICT (our mock
+ # equivalent to the persistent state). While this works for most of the
+ # tests in this class, that would prevent us from testing things like
+ # "run a command that is expected to fail, and then ensure nothing
+ # happened", because we'd be asserting in `load_persistent_db()` due to
+ # the map being empty.
+ #
+ # This function will therefore force state to be written to our mock
+ # persistent state. We could have added this extra step to
+ # `load_persistent_db()` directly, but that would conflict with the
+ # upgrade tests. This way, we can selectively enforce this requirement
+ # where we believe it to be necessary; generically speaking, this should
+ # not be needed unless we're testing very specific behaviors.
+ #
+ def setup_and_load_persistent_db(self):
+ mgr.ACCESS_CTRL_DB.save()
+ self.load_persistent_db()
+
+ def validate_persistent_role(self, rolename, scopes_permissions,
+ description=None):
+ db = self.load_persistent_db()
+ self.assertIn('roles', db)
+ self.assertIn(rolename, db['roles'])
+ self.assertEqual(db['roles'][rolename]['name'], rolename)
+ self.assertEqual(db['roles'][rolename]['description'], description)
+ self.assertDictEqual(db['roles'][rolename]['scopes_permissions'],
+ scopes_permissions)
+
+ def validate_persistent_no_role(self, rolename):
+ db = self.load_persistent_db()
+ self.assertIn('roles', db)
+ self.assertNotIn(rolename, db['roles'])
+
+ def validate_persistent_user(self, username, roles, password=None,
+ name=None, email=None, last_update=None,
+ enabled=True, pwdExpirationDate=None):
+ db = self.load_persistent_db()
+ self.assertIn('users', db)
+ self.assertIn(username, db['users'])
+ self.assertEqual(db['users'][username]['username'], username)
+ self.assertListEqual(db['users'][username]['roles'], roles)
+ if password:
+ self.assertEqual(db['users'][username]['password'], password)
+ if name:
+ self.assertEqual(db['users'][username]['name'], name)
+ if email:
+ self.assertEqual(db['users'][username]['email'], email)
+ if last_update:
+ self.assertEqual(db['users'][username]['lastUpdate'], last_update)
+ if pwdExpirationDate:
+ self.assertEqual(db['users'][username]['pwdExpirationDate'], pwdExpirationDate)
+ self.assertEqual(db['users'][username]['enabled'], enabled)
+
+ def validate_persistent_no_user(self, username):
+ db = self.load_persistent_db()
+ self.assertIn('users', db)
+ self.assertNotIn(username, db['users'])
+
+ def test_create_role(self):
+ role = self.exec_cmd('ac-role-create', rolename='test_role')
+ self.assertDictEqual(role, {'name': 'test_role', 'description': None,
+ 'scopes_permissions': {}})
+ self.validate_persistent_role('test_role', {})
+
+ def test_create_role_with_desc(self):
+ role = self.exec_cmd('ac-role-create', rolename='test_role',
+ description='Test Role')
+ self.assertDictEqual(role, {'name': 'test_role',
+ 'description': 'Test Role',
+ 'scopes_permissions': {}})
+ self.validate_persistent_role('test_role', {}, 'Test Role')
+
+ def test_create_duplicate_role(self):
+ self.test_create_role()
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-create', rolename='test_role')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EEXIST)
+ self.assertEqual(str(ctx.exception), "Role 'test_role' already exists")
+
+ def test_delete_role(self):
+ self.test_create_role()
+ out = self.exec_cmd('ac-role-delete', rolename='test_role')
+ self.assertEqual(out, "Role 'test_role' deleted")
+ self.validate_persistent_no_role('test_role')
+
+ def test_delete_nonexistent_role(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-delete', rolename='test_role')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
+
+ def test_show_single_role(self):
+ self.test_create_role()
+ role = self.exec_cmd('ac-role-show', rolename='test_role')
+ self.assertDictEqual(role, {'name': 'test_role', 'description': None,
+ 'scopes_permissions': {}})
+
+ def test_show_nonexistent_role(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-show', rolename='test_role')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
+
+ def test_show_system_roles(self):
+ roles = self.exec_cmd('ac-role-show')
+ self.assertEqual(len(roles), len(SYSTEM_ROLES))
+ for role in roles:
+ self.assertIn(role, SYSTEM_ROLES)
+
+ def test_show_system_role(self):
+ role = self.exec_cmd('ac-role-show', rolename="read-only")
+ self.assertEqual(role['name'], 'read-only')
+ self.assertEqual(
+ role['description'],
+ 'allows read permission for all security scope except dashboard settings and config-opt'
+ )
+
+ def test_delete_system_role(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-delete', rolename='administrator')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EPERM)
+ self.assertEqual(str(ctx.exception),
+ "Cannot delete system role 'administrator'")
+
+ def test_add_role_scope_perms(self):
+ self.test_create_role()
+ self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
+ scopename=Scope.POOL,
+ permissions=[Permission.READ, Permission.DELETE])
+ role = self.exec_cmd('ac-role-show', rolename='test_role')
+ self.assertDictEqual(role, {'name': 'test_role',
+ 'description': None,
+ 'scopes_permissions': {
+ Scope.POOL: [Permission.DELETE,
+ Permission.READ]
+ }})
+ self.validate_persistent_role('test_role', {
+ Scope.POOL: [Permission.DELETE, Permission.READ]
+ })
+
+ def test_del_role_scope_perms(self):
+ self.test_add_role_scope_perms()
+ self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
+ scopename=Scope.MONITOR,
+ permissions=[Permission.READ, Permission.CREATE])
+ self.validate_persistent_role('test_role', {
+ Scope.POOL: [Permission.DELETE, Permission.READ],
+ Scope.MONITOR: [Permission.CREATE, Permission.READ]
+ })
+ self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
+ scopename=Scope.POOL)
+ role = self.exec_cmd('ac-role-show', rolename='test_role')
+ self.assertDictEqual(role, {'name': 'test_role',
+ 'description': None,
+ 'scopes_permissions': {
+ Scope.MONITOR: [Permission.CREATE,
+ Permission.READ]
+ }})
+ self.validate_persistent_role('test_role', {
+ Scope.MONITOR: [Permission.CREATE, Permission.READ]
+ })
+
+ def test_add_role_scope_perms_nonexistent_role(self):
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
+ scopename='pool',
+ permissions=['read', 'delete'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
+
+ def test_add_role_invalid_scope_perms(self):
+ self.test_create_role()
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
+ scopename='invalidscope',
+ permissions=['read', 'delete'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception),
+ "Scope 'invalidscope' is not valid\n Possible values: "
+ "{}".format(Scope.all_scopes()))
+
+ def test_add_role_scope_invalid_perms(self):
+ self.test_create_role()
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
+ scopename='pool', permissions=['invalidperm'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception),
+ "Permission 'invalidperm' is not valid\n Possible "
+ "values: {}".format(Permission.all_permissions()))
+
+ def test_del_role_scope_perms_nonexistent_role(self):
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
+ scopename='pool')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
+
+ def test_del_role_nonexistent_scope_perms(self):
+ self.test_add_role_scope_perms()
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
+ scopename='nonexistentscope')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception),
+ "There are no permissions for scope 'nonexistentscope' "
+ "in role 'test_role'")
+
+ def test_not_permitted_add_role_scope_perms(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-add-scope-perms', rolename='read-only',
+ scopename='pool', permissions=['read', 'delete'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.EPERM)
+ self.assertEqual(str(ctx.exception),
+ "Cannot update system role 'read-only'")
+
+ def test_not_permitted_del_role_scope_perms(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-del-scope-perms', rolename='read-only',
+ scopename='pool')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EPERM)
+ self.assertEqual(str(ctx.exception),
+ "Cannot update system role 'read-only'")
+
+ def test_create_user(self, username='admin', rolename=None, enabled=True,
+ pwdExpirationDate=None):
+ user = self.exec_cmd('ac-user-create', username=username,
+ rolename=rolename, inbuf='admin',
+ name='{} User'.format(username),
+ email='{}@user.com'.format(username),
+ enabled=enabled, force_password=True,
+ pwd_expiration_date=pwdExpirationDate)
+
+ pass_hash = password_hash('admin', user['password'])
+ self.assertDictEqual(user, {
+ 'username': username,
+ 'password': pass_hash,
+ 'pwdExpirationDate': pwdExpirationDate,
+ 'pwdUpdateRequired': False,
+ 'lastUpdate': user['lastUpdate'],
+ 'name': '{} User'.format(username),
+ 'email': '{}@user.com'.format(username),
+ 'roles': [rolename] if rolename else [],
+ 'enabled': enabled
+ })
+ self.validate_persistent_user(username, [rolename] if rolename else [],
+ pass_hash, '{} User'.format(username),
+ '{}@user.com'.format(username),
+ user['lastUpdate'], enabled)
+ return user
+
+ def test_create_disabled_user(self):
+ self.test_create_user(enabled=False)
+
+ def test_create_user_pwd_expiration_date(self):
+ expiration_date = datetime.utcnow() + timedelta(days=10)
+ expiration_date = int(time.mktime(expiration_date.timetuple()))
+ self.test_create_user(pwdExpirationDate=expiration_date)
+
+ def test_create_user_with_role(self):
+ self.test_add_role_scope_perms()
+ self.test_create_user(rolename='test_role')
+
+ def test_create_user_with_system_role(self):
+ self.test_create_user(rolename='administrator')
+
+ def test_delete_user(self):
+ self.test_create_user()
+ out = self.exec_cmd('ac-user-delete', username='admin')
+ self.assertEqual(out, "User 'admin' deleted")
+ users = self.exec_cmd('ac-user-show')
+ self.assertEqual(len(users), 0)
+ self.validate_persistent_no_user('admin')
+
+ def test_create_duplicate_user(self):
+ self.test_create_user()
+ ret = self.exec_cmd('ac-user-create', username='admin', inbuf='admin',
+ force_password=True)
+ self.assertEqual(ret, "User 'admin' already exists")
+
+ def test_create_users_with_dne_role(self):
+ # one time call to setup our persistent db
+ self.setup_and_load_persistent_db()
+
+ # create a user with a role that does not exist; expect a failure
+ try:
+ self.exec_cmd('ac-user-create', username='foo',
+ rolename='dne_role', inbuf='foopass',
+ name='foo User', email='foo@user.com',
+ force_password=True)
+ except CmdException as e:
+ self.assertEqual(e.retcode, -errno.ENOENT)
+
+ db = self.load_persistent_db()
+ if 'users' in db:
+ self.assertNotIn('foo', db['users'])
+
+ # We could just finish our test here, given we ensured that the user
+ # with a non-existent role is not in persistent storage. However,
+ # we're going to test the database's consistency, making sure that
+ # side-effects are not written to persistent storage once we commit
+ # an unrelated operation. To ensure this, we'll issue another
+ # operation that is sharing the same code path, and will check whether
+ # the next operation commits dirty state.
+
+ # create a role (this will be 'test_role')
+ self.test_create_role()
+ self.exec_cmd('ac-user-create', username='bar',
+ rolename='test_role', inbuf='barpass',
+ name='bar User', email='bar@user.com',
+ force_password=True)
+
+ # validate db:
+ # user 'foo' should not exist
+ # user 'bar' should exist and have role 'test_role'
+ self.validate_persistent_user('bar', ['test_role'])
+
+ db = self.load_persistent_db()
+ self.assertIn('users', db)
+ self.assertNotIn('foo', db['users'])
+
+ def test_delete_nonexistent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-delete', username='admin')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_add_user_roles(self, username='admin',
+ roles=['pool-manager', 'block-manager']):
+ user_orig = self.test_create_user(username)
+ uroles = []
+ for role in roles:
+ uroles.append(role)
+ uroles.sort()
+ user = self.exec_cmd('ac-user-add-roles', username=username,
+ roles=[role])
+ self.assertLessEqual(uroles, user['roles'])
+ self.validate_persistent_user(username, uroles)
+ self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
+
+ def test_add_user_roles2(self):
+ user_orig = self.test_create_user()
+ user = self.exec_cmd('ac-user-add-roles', username="admin",
+ roles=['pool-manager', 'block-manager'])
+ self.assertLessEqual(['block-manager', 'pool-manager'],
+ user['roles'])
+ self.validate_persistent_user('admin', ['block-manager',
+ 'pool-manager'])
+ self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
+
+ def test_add_user_roles_not_existent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-add-roles', username="admin",
+ roles=['pool-manager', 'block-manager'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_add_user_roles_not_existent_role(self):
+ self.test_create_user()
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-add-roles', username="admin",
+ roles=['Invalid Role'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception),
+ "Role 'Invalid Role' does not exist")
+
+ def test_set_user_roles(self):
+ user_orig = self.test_create_user()
+ user = self.exec_cmd('ac-user-add-roles', username="admin",
+ roles=['pool-manager'])
+ self.assertLessEqual(['pool-manager'], user['roles'])
+ self.validate_persistent_user('admin', ['pool-manager'])
+ self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
+ user2 = self.exec_cmd('ac-user-set-roles', username="admin",
+ roles=['rgw-manager', 'block-manager'])
+ self.assertLessEqual(['block-manager', 'rgw-manager'],
+ user2['roles'])
+ self.validate_persistent_user('admin', ['block-manager',
+ 'rgw-manager'])
+ self.assertGreaterEqual(user2['lastUpdate'], user['lastUpdate'])
+
+ def test_set_user_roles_not_existent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-roles', username="admin",
+ roles=['pool-manager', 'block-manager'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_set_user_roles_not_existent_role(self):
+ self.test_create_user()
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-roles', username="admin",
+ roles=['Invalid Role'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception),
+ "Role 'Invalid Role' does not exist")
+
+ def test_del_user_roles(self):
+ self.test_add_user_roles()
+ user = self.exec_cmd('ac-user-del-roles', username="admin",
+ roles=['pool-manager'])
+ self.assertLessEqual(['block-manager'], user['roles'])
+ self.validate_persistent_user('admin', ['block-manager'])
+
+ def test_del_user_roles_not_existent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-del-roles', username="admin",
+ roles=['pool-manager', 'block-manager'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_del_user_roles_not_existent_role(self):
+ self.test_create_user()
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-del-roles', username="admin",
+ roles=['Invalid Role'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception),
+ "Role 'Invalid Role' does not exist")
+
+ def test_del_user_roles_not_associated_role(self):
+ self.test_create_user()
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-del-roles', username="admin",
+ roles=['rgw-manager'])
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception),
+ "Role 'rgw-manager' is not associated with user "
+ "'admin'")
+
+ def test_show_user(self):
+ self.test_add_user_roles()
+ user = self.exec_cmd('ac-user-show', username='admin')
+ pass_hash = password_hash('admin', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'pool-manager'],
+ 'enabled': True
+ })
+
+ def test_show_nonexistent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-show', username='admin')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_show_all_users(self):
+ self.test_add_user_roles('admin', ['administrator'])
+ self.test_add_user_roles('guest', ['read-only'])
+ users = self.exec_cmd('ac-user-show')
+ self.assertEqual(len(users), 2)
+ for user in users:
+ self.assertIn(user, ['admin', 'guest'])
+
+ def test_del_role_associated_with_user(self):
+ self.test_create_role()
+ self.test_add_user_roles('guest', ['test_role'])
+
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-role-delete', rolename='test_role')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EPERM)
+ self.assertEqual(str(ctx.exception),
+ "Role 'test_role' is still associated with user "
+ "'guest'")
+
+ def test_set_user_info(self):
+ user_orig = self.test_create_user()
+ user = self.exec_cmd('ac-user-set-info', username='admin',
+ name='Admin Name', email='admin@admin.com')
+ pass_hash = password_hash('admin', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'Admin Name',
+ 'email': 'admin@admin.com',
+ 'lastUpdate': user['lastUpdate'],
+ 'roles': [],
+ 'enabled': True
+ })
+ self.validate_persistent_user('admin', [], pass_hash, 'Admin Name',
+ 'admin@admin.com')
+ self.assertEqual(user['lastUpdate'], user_orig['lastUpdate'])
+
+ def test_set_user_info_nonexistent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-info', username='admin',
+ name='Admin Name', email='admin@admin.com')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_set_user_password(self):
+ user_orig = self.test_create_user()
+ user = self.exec_cmd('ac-user-set-password', username='admin',
+ inbuf='newpass', force_password=True)
+ pass_hash = password_hash('newpass', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'lastUpdate': user['lastUpdate'],
+ 'roles': [],
+ 'enabled': True
+ })
+ self.validate_persistent_user('admin', [], pass_hash, 'admin User',
+ 'admin@user.com')
+ self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
+
+ def test_sanitize_password(self):
+ self.test_create_user()
+ password = 'myPass\\n\\r\\n'
+ with tempfile.TemporaryFile(mode='w+') as pwd_file:
+ # Add new line separators (like some text editors when a file is saved).
+ pwd_file.write('{}{}'.format(password, '\n\r\n\n'))
+ pwd_file.seek(0)
+ user = self.exec_cmd('ac-user-set-password', username='admin',
+ inbuf=pwd_file.read(), force_password=True)
+ pass_hash = password_hash(password, user['password'])
+ self.assertEqual(user['password'], pass_hash)
+
+ def test_set_user_password_nonexistent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-password', username='admin',
+ inbuf='newpass', force_password=True)
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_set_user_password_empty(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-password', username='admin', inbuf='\n',
+ force_password=True)
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertIn(ERROR_MSG_EMPTY_INPUT_FILE, str(ctx.exception))
+
+ def test_set_user_password_hash(self):
+ user_orig = self.test_create_user()
+ user = self.exec_cmd('ac-user-set-password-hash', username='admin',
+ inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
+ pass_hash = password_hash('newpass', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'lastUpdate': user['lastUpdate'],
+ 'roles': [],
+ 'enabled': True
+ })
+ self.validate_persistent_user('admin', [], pass_hash, 'admin User',
+ 'admin@user.com')
+ self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
+
+ def test_set_user_password_hash_nonexistent_user(self):
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-password-hash', username='admin',
+ inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
+
+ self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
+ self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
+
+ def test_set_user_password_hash_broken_hash(self):
+ self.test_create_user()
+ with self.assertRaises(CmdException) as ctx:
+ self.exec_cmd('ac-user-set-password-hash', username='admin',
+ inbuf='1')
+
+ self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
+ self.assertEqual(str(ctx.exception), 'Invalid password hash')
+
+ def test_set_login_credentials(self):
+ self.exec_cmd('set-login-credentials', username='admin',
+ inbuf='admin')
+ user = self.exec_cmd('ac-user-show', username='admin')
+ pass_hash = password_hash('admin', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': None,
+ 'email': None,
+ 'lastUpdate': user['lastUpdate'],
+ 'roles': ['administrator'],
+ 'enabled': True,
+ })
+ self.validate_persistent_user('admin', ['administrator'], pass_hash,
+ None, None)
+
+ def test_set_login_credentials_for_existing_user(self):
+ self.test_add_user_roles('admin', ['read-only'])
+ self.exec_cmd('set-login-credentials', username='admin',
+ inbuf='admin2')
+ user = self.exec_cmd('ac-user-show', username='admin')
+ pass_hash = password_hash('admin2', user['password'])
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'password': pass_hash,
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'lastUpdate': user['lastUpdate'],
+ 'roles': ['read-only'],
+ 'enabled': True
+ })
+ self.validate_persistent_user('admin', ['read-only'], pass_hash,
+ 'admin User', 'admin@user.com')
+
+ def test_load_v1(self):
+ self.CONFIG_KEY_DICT['accessdb_v1'] = '''
+ {{
+ "users": {{
+ "admin": {{
+ "username": "admin",
+ "password":
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ "roles": ["block-manager", "test_role"],
+ "name": "admin User",
+ "email": "admin@user.com",
+ "lastUpdate": {}
+ }}
+ }},
+ "roles": {{
+ "test_role": {{
+ "name": "test_role",
+ "description": "Test Role",
+ "scopes_permissions": {{
+ "{}": ["{}", "{}"],
+ "{}": ["{}"]
+ }}
+ }}
+ }},
+ "version": 1
+ }}
+ '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+ Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+ load_access_control_db()
+ role = self.exec_cmd('ac-role-show', rolename="test_role")
+ self.assertDictEqual(role, {
+ 'name': 'test_role',
+ 'description': "Test Role",
+ 'scopes_permissions': {
+ Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+ Scope.POOL: [Permission.CREATE]
+ }
+ })
+ user = self.exec_cmd('ac-user-show', username="admin")
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password':
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
+ })
+
+ def test_load_v2(self):
+ self.CONFIG_KEY_DICT['accessdb_v2'] = '''
+ {{
+ "users": {{
+ "admin": {{
+ "username": "admin",
+ "password":
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ "pwdExpirationDate": null,
+ "pwdUpdateRequired": false,
+ "roles": ["block-manager", "test_role"],
+ "name": "admin User",
+ "email": "admin@user.com",
+ "lastUpdate": {},
+ "enabled": true
+ }}
+ }},
+ "roles": {{
+ "test_role": {{
+ "name": "test_role",
+ "description": "Test Role",
+ "scopes_permissions": {{
+ "{}": ["{}", "{}"],
+ "{}": ["{}"]
+ }}
+ }}
+ }},
+ "version": 2
+ }}
+ '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
+ Permission.UPDATE, Scope.POOL, Permission.CREATE)
+
+ load_access_control_db()
+ role = self.exec_cmd('ac-role-show', rolename="test_role")
+ self.assertDictEqual(role, {
+ 'name': 'test_role',
+ 'description': "Test Role",
+ 'scopes_permissions': {
+ Scope.ISCSI: [Permission.READ, Permission.UPDATE],
+ Scope.POOL: [Permission.CREATE]
+ }
+ })
+ user = self.exec_cmd('ac-user-show', username="admin")
+ self.assertDictEqual(user, {
+ 'username': 'admin',
+ 'lastUpdate': user['lastUpdate'],
+ 'password':
+ "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
+ 'pwdExpirationDate': None,
+ 'pwdUpdateRequired': False,
+ 'name': 'admin User',
+ 'email': 'admin@user.com',
+ 'roles': ['block-manager', 'test_role'],
+ 'enabled': True
+ })
+
+ def test_password_policy_pw_length(self):
+ Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
+ Settings.PWD_POLICY_MIN_LENGTH = 3
+ pw_policy = PasswordPolicy('foo')
+ self.assertTrue(pw_policy.check_password_length())
+
+ def test_password_policy_pw_length_fail(self):
+ Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
+ pw_policy = PasswordPolicy('bar')
+ self.assertFalse(pw_policy.check_password_length())
+
+ def test_password_policy_credits_too_weak(self):
+ Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
+ pw_policy = PasswordPolicy('foo')
+ pw_credits = pw_policy.check_password_complexity()
+ self.assertEqual(pw_credits, 3)
+
+ def test_password_policy_credits_weak(self):
+ Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
+ pw_policy = PasswordPolicy('mypassword1')
+ pw_credits = pw_policy.check_password_complexity()
+ self.assertEqual(pw_credits, 11)
+
+ def test_password_policy_credits_ok(self):
+ Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
+ pw_policy = PasswordPolicy('mypassword1!@')
+ pw_credits = pw_policy.check_password_complexity()
+ self.assertEqual(pw_credits, 17)
+
+ def test_password_policy_credits_strong(self):
+ Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
+ pw_policy = PasswordPolicy('testpassword0047!@')
+ pw_credits = pw_policy.check_password_complexity()
+ self.assertEqual(pw_credits, 22)
+
+ def test_password_policy_credits_very_strong(self):
+ Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
+ pw_policy = PasswordPolicy('testpassword#!$!@$')
+ pw_credits = pw_policy.check_password_complexity()
+ self.assertEqual(pw_credits, 30)
+
+ def test_password_policy_forbidden_words(self):
+ Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True
+ pw_policy = PasswordPolicy('!@$testdashboard#!$')
+ self.assertTrue(pw_policy.check_if_contains_forbidden_words())
+
+ def test_password_policy_forbidden_words_custom(self):
+ Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True
+ Settings.PWD_POLICY_EXCLUSION_LIST = 'foo,bar'
+ pw_policy = PasswordPolicy('foo123bar')
+ self.assertTrue(pw_policy.check_if_contains_forbidden_words())
+
+ def test_password_policy_sequential_chars(self):
+ Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = True
+ pw_policy = PasswordPolicy('!@$test123#!$')
+ self.assertTrue(pw_policy.check_if_sequential_characters())
+
+ def test_password_policy_repetitive_chars(self):
+ Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = True
+ pw_policy = PasswordPolicy('!@$testfooo#!$')
+ self.assertTrue(pw_policy.check_if_repetitive_characters())
+
+ def test_password_policy_contain_username(self):
+ Settings.PWD_POLICY_CHECK_USERNAME_ENABLED = True
+ pw_policy = PasswordPolicy('%admin135)', 'admin')
+ self.assertTrue(pw_policy.check_if_contains_username())
+
+ def test_password_policy_is_old_pwd(self):
+ Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED = True
+ pw_policy = PasswordPolicy('foo', old_password='foo')
+ self.assertTrue(pw_policy.check_is_old_password())