diff options
Diffstat (limited to 'src/tests/intg/files_ops.py')
-rw-r--r-- | src/tests/intg/files_ops.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/src/tests/intg/files_ops.py b/src/tests/intg/files_ops.py new file mode 100644 index 0000000..57959f5 --- /dev/null +++ b/src/tests/intg/files_ops.py @@ -0,0 +1,173 @@ +# +# SSSD integration test - operations on UNIX user and group database +# +# Copyright (c) 2016 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import os.path +import tempfile +import pytest + +import ent +from util import backup_envvar_file, restore_envvar_file + + +@pytest.fixture +def passwd_ops_setup(request): + pwd_file = os.environ["NSS_WRAPPER_PASSWD"] + backup_envvar_file("NSS_WRAPPER_PASSWD") + request.addfinalizer(lambda: restore_envvar_file("NSS_WRAPPER_PASSWD")) + pwd_ops = PasswdOps(pwd_file) + return pwd_ops + + +@pytest.fixture +def group_ops_setup(request): + grp_file = os.environ["NSS_WRAPPER_GROUP"] + backup_envvar_file("NSS_WRAPPER_GROUP") + request.addfinalizer(lambda: restore_envvar_file("NSS_WRAPPER_GROUP")) + grp_ops = GroupOps(grp_file) + return grp_ops + + +@pytest.fixture +def group_db_setup(request): + group = request.param + grp_ops = group_ops_setup(request) + grp_ops.groupadd(**group) + ent.assert_group_by_name(group['name'], group) + return grp_ops + + +class FilesOps(object): + """ + A naive implementation of operations as a basis for user or group + operations. Uses rename to (hopefully) trigger the same fs-level + notifications as shadow-utils would. + """ + def __init__(self, file_name): + self.file_name = file_name + self.tmp_dir = os.path.dirname(self.file_name) + + @staticmethod + def _get_named_line(name, contents): + for num, line in enumerate(contents, 0): + pname = line.split(':')[0] + if name == pname: + return num + raise KeyError("%s not found" % name) + + def _read_contents(self): + with open(self.file_name, "r") as pfile: + contents = pfile.readlines() + return contents + + def _write_contents(self, contents): + tmp_file = tempfile.NamedTemporaryFile(mode='w', dir=self.tmp_dir, + delete=False) + tmp_file.writelines(contents) + tmp_file.flush() + + os.rename(tmp_file.name, self.file_name) + + def _append_line(self, new_line): + contents = self._read_contents() + contents.extend(new_line) + self._write_contents(contents) + + def _subst_line(self, key, line): + contents = self._read_contents() + kindex = self._get_named_line(key, contents) + contents[kindex] = line + self._write_contents(contents) + + def _del_line(self, key): + contents = self._read_contents() + kindex = self._get_named_line(key, contents) + contents.pop(kindex) + self._write_contents(contents) + + contents = self._read_contents() + + def _has_line(self, key): + try: + self._get_named_line(key, self._read_contents()) + return True + except KeyError: + return False + + +class PasswdOps(FilesOps): + """ + A naive implementation of user operations + """ + def __init__(self, file_name): + super(PasswdOps, self).__init__(file_name) + + def _pwd2line(self, name, uid, gid, passwd, gecos, homedir, shell): + pwd_fmt = "{name}:{passwd}:{uid}:{gid}:{gecos}:{homedir}:{shell}\n" + return pwd_fmt.format(name=name, + passwd=passwd, + uid=uid, + gid=gid, + gecos=gecos, + homedir=homedir, + shell=shell) + + def useradd(self, name, uid, gid, passwd='', gecos='', dir='', shell=''): + pwd_line = self._pwd2line(name, uid, gid, passwd, gecos, dir, shell) + self._append_line(pwd_line) + + def usermod(self, name, uid, gid, passwd='', gecos='', dir='', shell=''): + pwd_line = self._pwd2line(name, uid, gid, passwd, gecos, dir, shell) + self._subst_line(name, pwd_line) + + def userdel(self, name): + self._del_line(name) + + def userexist(self, name): + return self._has_line(name) + + +class GroupOps(FilesOps): + """ + A naive implementation of group operations + """ + def __init__(self, file_name): + super(GroupOps, self).__init__(file_name) + + def _grp2line(self, name, gid, mem, passwd): + member_list = ",".join(m for m in mem) + grp_fmt = "{name}:{passwd}:{gid}:{member_list}\n" + return grp_fmt.format(name=name, + passwd=passwd, + gid=gid, + member_list=member_list) + + def groupadd(self, name, gid, mem, passwd="*"): + grp_line = self._grp2line(name, gid, mem, passwd) + self._append_line(grp_line) + + def groupmod(self, old_name, name, gid, mem, passwd="*"): + grp_line = self._grp2line(name, gid, mem, passwd) + self._subst_line(old_name, grp_line) + + def groupdel(self, name): + self._del_line(name) + + def groupexist(self, name): + return self._has_line(name) |