summaryrefslogtreecommitdiffstats
path: root/src/tests/intg/test_memory_cache.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 05:31:45 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 05:31:45 +0000
commit74aa0bc6779af38018a03fd2cf4419fe85917904 (patch)
tree9cb0681aac9a94a49c153d5823e7a55d1513d91f /src/tests/intg/test_memory_cache.py
parentInitial commit. (diff)
downloadsssd-74aa0bc6779af38018a03fd2cf4419fe85917904.tar.xz
sssd-74aa0bc6779af38018a03fd2cf4419fe85917904.zip
Adding upstream version 2.9.4.upstream/2.9.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tests/intg/test_memory_cache.py')
-rw-r--r--src/tests/intg/test_memory_cache.py1215
1 files changed, 1215 insertions, 0 deletions
diff --git a/src/tests/intg/test_memory_cache.py b/src/tests/intg/test_memory_cache.py
new file mode 100644
index 0000000..b30c902
--- /dev/null
+++ b/src/tests/intg/test_memory_cache.py
@@ -0,0 +1,1215 @@
+#
+# LDAP integration test
+#
+# Copyright (c) 2015 Red Hat, Inc.
+# Author: Lukas Slebodnik <lslebodn@redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+import os
+import stat
+import ent
+import grp
+import pwd
+import config
+import random
+import signal
+import string
+import struct
+import subprocess
+import time
+import pytest
+import pysss_murmur
+
+import ds_openldap
+import ldap_ent
+import sssd_id
+from util import unindent
+
+LDAP_BASE_DN = "dc=example,dc=com"
+
+
+@pytest.fixture(scope="module")
+def ds_inst(request):
+ """LDAP server instance fixture"""
+ ds_inst = ds_openldap.DSOpenLDAP(
+ config.PREFIX, 10389, LDAP_BASE_DN,
+ "cn=admin", "Secret123")
+ try:
+ ds_inst.setup()
+ except Exception:
+ ds_inst.teardown()
+ raise
+ request.addfinalizer(lambda: ds_inst.teardown())
+ return ds_inst
+
+
+@pytest.fixture(scope="module")
+def ldap_conn(request, ds_inst):
+ """LDAP server connection fixture"""
+ ldap_conn = ds_inst.bind()
+ ldap_conn.ds_inst = ds_inst
+ request.addfinalizer(lambda: ldap_conn.unbind_s())
+ return ldap_conn
+
+
+def create_ldap_fixture(request, ldap_conn, ent_list):
+ """Add LDAP entries and add teardown for removing them"""
+ for entry in ent_list:
+ ldap_conn.add_s(entry[0], entry[1])
+
+ def teardown():
+ for entry in ent_list:
+ ldap_conn.delete_s(entry[0])
+ request.addfinalizer(teardown)
+
+
+def create_conf_fixture(request, contents):
+ """Generate sssd.conf and add teardown for removing it"""
+ conf = open(config.CONF_PATH, "w")
+ conf.write(contents)
+ conf.close()
+ os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR)
+ request.addfinalizer(lambda: os.unlink(config.CONF_PATH))
+
+
+def stop_sssd():
+ pid_file = open(config.PIDFILE_PATH, "r")
+ pid = int(pid_file.read())
+ os.kill(pid, signal.SIGTERM)
+ while True:
+ try:
+ os.kill(pid, signal.SIGCONT)
+ except OSError:
+ break
+ time.sleep(1)
+
+
+def create_sssd_fixture(request):
+ """Start sssd and add teardown for stopping it and removing state"""
+ if subprocess.call(["sssd", "-D", "--logger=files"]) != 0:
+ raise Exception("sssd start failed")
+
+ def teardown():
+ try:
+ stop_sssd()
+ except Exception:
+ pass
+ for path in os.listdir(config.DB_PATH):
+ os.unlink(config.DB_PATH + "/" + path)
+ for path in os.listdir(config.MCACHE_PATH):
+ os.unlink(config.MCACHE_PATH + "/" + path)
+ # force sss_client libs to realize mem-cache files were deleted
+ try:
+ sssd_id.call_sssd_initgroups("user1", 2001)
+ except Exception:
+ pass
+ try:
+ grp.getgrnam("group1")
+ except Exception:
+ pass
+ try:
+ pwd.getpwnam("user1")
+ except Exception:
+ pass
+ request.addfinalizer(teardown)
+
+
+def load_data_to_ldap(request, ldap_conn):
+ ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn)
+ ent_list.add_user("user1", 1001, 2001)
+ ent_list.add_user("user2", 1002, 2002)
+ ent_list.add_user("user3", 1003, 2003)
+ ent_list.add_user("user11", 1011, 2001)
+ ent_list.add_user("user12", 1012, 2002)
+ ent_list.add_user("user13", 1013, 2003)
+ ent_list.add_user("user21", 1021, 2001)
+ ent_list.add_user("user22", 1022, 2002)
+ ent_list.add_user("user23", 1023, 2003)
+
+ ent_list.add_group("group1", 2001, ["user1", "user11", "user21"])
+ ent_list.add_group("group2", 2002, ["user2", "user12", "user22"])
+ ent_list.add_group("group3", 2003, ["user3", "user13", "user23"])
+
+ ent_list.add_group("group0x", 2000, ["user1", "user2", "user3"])
+ ent_list.add_group("group1x", 2010, ["user11", "user12", "user13"])
+ ent_list.add_group("group2x", 2020, ["user21", "user22", "user23"])
+ create_ldap_fixture(request, ldap_conn, ent_list)
+
+
+@pytest.fixture
+def disable_memcache_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_size_group = 0
+ memcache_size_passwd = 0
+ memcache_size_initgroups = 0
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def disable_pwd_mc_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_size_passwd = 0
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def disable_grp_mc_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_size_group = 0
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def disable_initgr_mc_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_size_initgroups = 0
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def sanity_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def fqname_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ use_fully_qualified_names = true
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def fqname_case_insensitive_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ use_fully_qualified_names = true
+ case_sensitive = false
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.fixture
+def zero_timeout_rfc2307(request, ldap_conn):
+ load_data_to_ldap(request, ldap_conn)
+
+ conf = unindent("""\
+ [sssd]
+ domains = LDAP
+ services = nss
+
+ [nss]
+ memcache_timeout = 0
+
+ [domain/LDAP]
+ ldap_auth_disable_tls_never_use_in_production = true
+ ldap_schema = rfc2307
+ id_provider = ldap
+ auth_provider = ldap
+ sudo_provider = ldap
+ ldap_uri = {ldap_conn.ds_inst.ldap_url}
+ ldap_search_base = {ldap_conn.ds_inst.base_dn}
+ """).format(**locals())
+ create_conf_fixture(request, conf)
+ create_sssd_fixture(request)
+ return None
+
+
+@pytest.mark.converted('test_id.py', 'test_id__getpwuid')
+@pytest.mark.converted('test_id.py', 'test_id__getpwnam')
+def test_getpwnam(ldap_conn, sanity_rfc2307):
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user2',
+ dict(name='user2', passwd='*', uid=1002, gid=2002,
+ gecos='1002', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1002,
+ dict(name='user2', passwd='*', uid=1002, gid=2002,
+ gecos='1002', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user3',
+ dict(name='user3', passwd='*', uid=1003, gid=2003,
+ gecos='1003', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1003,
+ dict(name='user3', passwd='*', uid=1003, gid=2003,
+ gecos='1003', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user11',
+ dict(name='user11', passwd='*', uid=1011, gid=2001,
+ gecos='1011', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1011,
+ dict(name='user11', passwd='*', uid=1011, gid=2001,
+ gecos='1011', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user12',
+ dict(name='user12', passwd='*', uid=1012, gid=2002,
+ gecos='1012', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1012,
+ dict(name='user12', passwd='*', uid=1012, gid=2002,
+ gecos='1012', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user13',
+ dict(name='user13', passwd='*', uid=1013, gid=2003,
+ gecos='1013', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1013,
+ dict(name='user13', passwd='*', uid=1013, gid=2003,
+ gecos='1013', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user21',
+ dict(name='user21', passwd='*', uid=1021, gid=2001,
+ gecos='1021', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1021,
+ dict(name='user21', passwd='*', uid=1021, gid=2001,
+ gecos='1021', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user22',
+ dict(name='user22', passwd='*', uid=1022, gid=2002,
+ gecos='1022', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1022,
+ dict(name='user22', passwd='*', uid=1022, gid=2002,
+ gecos='1022', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ 'user23',
+ dict(name='user23', passwd='*', uid=1023, gid=2003,
+ gecos='1023', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1023,
+ dict(name='user23', passwd='*', uid=1023, gid=2003,
+ gecos='1023', shell='/bin/bash'))
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam')
+def test_getpwnam_with_mc(ldap_conn, sanity_rfc2307):
+ test_getpwnam(ldap_conn, sanity_rfc2307)
+ stop_sssd()
+ test_getpwnam(ldap_conn, sanity_rfc2307)
+
+
+@pytest.mark.converted('test_id.py', 'test_id__getgrgid')
+@pytest.mark.converted('test_id.py', 'test_id__getgrnam')
+def test_getgrnam_simple(ldap_conn, sanity_rfc2307):
+ ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
+ ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
+
+ ent.assert_group_by_name("group2", dict(name="group2", gid=2002))
+ ent.assert_group_by_gid(2002, dict(name="group2", gid=2002))
+
+ ent.assert_group_by_name("group3", dict(name="group3", gid=2003))
+ ent.assert_group_by_gid(2003, dict(name="group3", gid=2003))
+
+ ent.assert_group_by_name("group0x", dict(name="group0x", gid=2000))
+ ent.assert_group_by_gid(2000, dict(name="group0x", gid=2000))
+
+ ent.assert_group_by_name("group1x", dict(name="group1x", gid=2010))
+ ent.assert_group_by_gid(2010, dict(name="group1x", gid=2010))
+
+ ent.assert_group_by_name("group2x", dict(name="group2x", gid=2020))
+ ent.assert_group_by_gid(2020, dict(name="group2x", gid=2020))
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getgrnam')
+def test_getgrnam_simple_with_mc(ldap_conn, sanity_rfc2307):
+ test_getgrnam_simple(ldap_conn, sanity_rfc2307)
+ stop_sssd()
+ test_getgrnam_simple(ldap_conn, sanity_rfc2307)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getgrnam')
+def test_getgrnam_simple_disabled_pwd_mc(ldap_conn, disable_pwd_mc_rfc2307):
+ test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307)
+ stop_sssd()
+ test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getgrnam')
+def test_getgrnam_simple_disabled_intitgr_mc(ldap_conn,
+ disable_initgr_mc_rfc2307):
+ test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307)
+ stop_sssd()
+ test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307)
+
+
+@pytest.mark.converted('test_id.py', 'test_id__membership_by_group_id')
+@pytest.mark.converted('test_id.py', 'test_id__membership_by_group_name')
+def test_getgrnam_membership(ldap_conn, sanity_rfc2307):
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_gid(
+ 2001,
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+
+ ent.assert_group_by_name(
+ "group2",
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+ ent.assert_group_by_gid(
+ 2002,
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+
+ ent.assert_group_by_name(
+ "group3",
+ dict(mem=ent.contains_only("user3", "user13", "user23")))
+ ent.assert_group_by_gid(
+ 2003,
+ dict(mem=ent.contains_only("user3", "user13", "user23")))
+
+ ent.assert_group_by_name(
+ "group0x",
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+ ent.assert_group_by_gid(
+ 2000,
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+
+ ent.assert_group_by_name(
+ "group1x",
+ dict(mem=ent.contains_only("user11", "user12", "user13")))
+ ent.assert_group_by_gid(
+ 2010,
+ dict(mem=ent.contains_only("user11", "user12", "user13")))
+
+ ent.assert_group_by_name(
+ "group2x",
+ dict(mem=ent.contains_only("user21", "user22", "user23")))
+ ent.assert_group_by_gid(
+ 2020,
+ dict(mem=ent.contains_only("user21", "user22", "user23")))
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_id')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_name')
+def test_getgrnam_membership_with_mc(ldap_conn, sanity_rfc2307):
+ test_getgrnam_membership(ldap_conn, sanity_rfc2307)
+ stop_sssd()
+ test_getgrnam_membership(ldap_conn, sanity_rfc2307)
+
+
+def assert_user_gids_equal(user, expected_gids):
+ (res, errno, gids) = sssd_id.get_user_gids(user)
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user %s, %d" % (user, errno)
+
+ assert sorted(gids) == sorted(expected_gids), \
+ "result: %s\n expected %s" % (
+ ", ".join(["%s" % s for s in sorted(gids)]),
+ ", ".join(["%s" % s for s in sorted(expected_gids)])
+ )
+
+
+@pytest.mark.converted('test_id.py', 'test_id__initgroups')
+def test_initgroups(ldap_conn, sanity_rfc2307):
+ assert_user_gids_equal('user1', [2000, 2001])
+ assert_user_gids_equal('user2', [2000, 2002])
+ assert_user_gids_equal('user3', [2000, 2003])
+
+ assert_user_gids_equal('user11', [2010, 2001])
+ assert_user_gids_equal('user12', [2010, 2002])
+ assert_user_gids_equal('user13', [2010, 2003])
+
+ assert_user_gids_equal('user21', [2020, 2001])
+ assert_user_gids_equal('user22', [2020, 2002])
+ assert_user_gids_equal('user23', [2020, 2003])
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__user_gids')
+def test_initgroups_with_mc(ldap_conn, sanity_rfc2307):
+ test_initgroups(ldap_conn, sanity_rfc2307)
+ stop_sssd()
+ test_initgroups(ldap_conn, sanity_rfc2307)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam_fully_qualified_names')
+@pytest.mark.converted('test_id.py', 'test_id__getpwnam_fully_qualified_names')
+def test_initgroups_fqname_with_mc(ldap_conn, fqname_rfc2307):
+ assert_user_gids_equal('user1@LDAP', [2000, 2001])
+ stop_sssd()
+ assert_user_gids_equal('user1@LDAP', [2000, 2001])
+
+
+def assert_initgroups_equal(user, primary_gid, expected_gids):
+ (res, errno, gids) = sssd_id.call_sssd_initgroups(user, primary_gid)
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user %s, %d" % (user, errno)
+
+ assert sorted(gids) == sorted(expected_gids), \
+ "result: %s\n expected %s" % (
+ ", ".join(["%s" % s for s in sorted(gids)]),
+ ", ".join(["%s" % s for s in sorted(expected_gids)])
+ )
+
+
+def assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
+ primary_gid, expected_gids):
+
+ assert_initgroups_equal(user1_case1, primary_gid, expected_gids)
+ assert_initgroups_equal(user1_case2, primary_gid, expected_gids)
+ assert_initgroups_equal(user1_case_last, primary_gid, expected_gids)
+ stop_sssd()
+
+ user = user1_case1
+ (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid)
+ assert res == sssd_id.NssReturnCode.UNAVAIL, \
+ "Initgroups for user should fail user %s, %d, %d" % (user, res, errno)
+
+ user = user1_case2
+ (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid)
+ assert res == sssd_id.NssReturnCode.UNAVAIL, \
+ "Initgroups for user should fail user %s, %d, %d" % (user, res, errno)
+
+ # Just last invocation of initgroups should PASS
+ # Otherwise, we would not be able to invalidate it
+ assert_initgroups_equal(user1_case_last, primary_gid, expected_gids)
+
+
+@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive')
+@pytest.mark.converted('test_id.py', 'test_id__case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive')
+def test_initgroups_case_insensitive_with_mc1(ldap_conn,
+ fqname_case_insensitive_rfc2307):
+ user1_case1 = 'User1@LDAP'
+ user1_case2 = 'uSer1@LDAP'
+ user1_case_last = 'usEr1@LDAP'
+ primary_gid = 2001
+ expected_gids = [2000, 2001]
+
+ assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
+ primary_gid, expected_gids)
+
+
+@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive')
+@pytest.mark.converted('test_id.py', 'test_id__case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive')
+def test_initgroups_case_insensitive_with_mc2(ldap_conn,
+ fqname_case_insensitive_rfc2307):
+ user1_case1 = 'usEr1@LDAP'
+ user1_case2 = 'User1@LDAP'
+ user1_case_last = 'uSer1@LDAP'
+ primary_gid = 2001
+ expected_gids = [2000, 2001]
+
+ assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
+ primary_gid, expected_gids)
+
+
+@pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive')
+@pytest.mark.converted('test_id.py', 'test_id__case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive')
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive')
+def test_initgroups_case_insensitive_with_mc3(ldap_conn,
+ fqname_case_insensitive_rfc2307):
+ user1_case1 = 'uSer1@LDAP'
+ user1_case2 = 'usEr1@LDAP'
+ user1_case_last = 'User1@LDAP'
+ primary_gid = 2001
+ expected_gids = [2000, 2001]
+
+ assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last,
+ primary_gid, expected_gids)
+
+
+def run_simple_test_with_initgroups():
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_gid(
+ 2001,
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+
+ # unrelated group to user1
+ ent.assert_group_by_name(
+ "group2",
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+ ent.assert_group_by_gid(
+ 2002,
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidatation_of_gids_after_initgroups')
+def test_invalidation_of_gids_after_initgroups(ldap_conn, sanity_rfc2307):
+
+ # the sssd cache was empty and not all user's group were
+ # resolved with getgr{nm,gid}. Therefore there is a change in
+ # group membership => user groups should be invalidated
+ run_simple_test_with_initgroups()
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+ stop_sssd()
+
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ # unrelated group to user1 must be returned
+ ent.assert_group_by_name(
+ "group2",
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+ ent.assert_group_by_gid(
+ 2002,
+ dict(mem=ent.contains_only("user2", "user12", "user22")))
+
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+ # user groups must be invalidated
+ for group in ["group1", "group0x"]:
+ with pytest.raises(KeyError):
+ grp.getgrnam(group)
+
+ for gid in [2000, 2001]:
+ with pytest.raises(KeyError):
+ grp.getgrgid(gid)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__initgroups_without_change_in_membership')
+def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307):
+
+ # the sssd cache was empty and not all user's group were
+ # resolved with getgr{nm,gid}. Therefore there is a change in
+ # group membership => user groups should be invalidated
+ run_simple_test_with_initgroups()
+
+ # invalidate cache
+ subprocess.call(["sss_cache", "-E"])
+
+ # all users and groups will be just refreshed from LDAP
+ # but there will not be a change in group membership
+ # user groups should not be invlaidated
+ run_simple_test_with_initgroups()
+
+ stop_sssd()
+
+ # everything should be in memory cache
+ run_simple_test_with_initgroups()
+
+
+def assert_mc_records_for_user1():
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_gid(
+ 2001,
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_name(
+ "group0x",
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+ ent.assert_group_by_gid(
+ 2000,
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+
+def assert_missing_mc_records_for_user1():
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ for gid in [2000, 2001]:
+ with pytest.raises(KeyError):
+ grp.getgrgid(gid)
+ for group in ["group0x", "group1"]:
+ with pytest.raises(KeyError):
+ grp.getgrnam(group)
+
+ (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001)
+ assert res == sssd_id.NssReturnCode.UNAVAIL, \
+ "Initgroups should not find anything after invalidation of mc.\n" \
+ "User user1, errno:%d" % err
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_before_stop')
+def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-u", "user1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_after_stop')
+def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-u", "user1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_before_stop')
+def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-U"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_after_stop')
+def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-U"])
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_before_stop')
+def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-g", "group1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_after_stop')
+def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-g", "group1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_before_stop')
+def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-G"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_after_stop')
+def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-G"])
+
+ assert_missing_mc_records_for_user1()
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_before_stop')
+def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-E"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_after_stop')
+def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-E"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def get_random_string(length):
+ return ''.join([random.choice(string.ascii_letters + string.digits)
+ for n in range(length)])
+
+
+class MemoryCache(object):
+ SIZEOF_UINT32_T = 4
+
+ def __init__(self, path):
+ with open(path, "rb") as fin:
+ fin.seek(4 * self.SIZEOF_UINT32_T)
+ self.seed = struct.unpack('i', fin.read(4))[0]
+ self.data_size = struct.unpack('i', fin.read(4))[0]
+ self.ft_size = struct.unpack('i', fin.read(4))[0]
+ hash_len = struct.unpack('i', fin.read(4))[0]
+ self.hash_size = hash_len / self.SIZEOF_UINT32_T
+
+ def sss_nss_mc_hash(self, key):
+ input_key = key + '\0'
+ input_len = len(key) + 1
+
+ murmur_hash = pysss_murmur.murmurhash3(input_key, input_len, self.seed)
+ return murmur_hash % self.hash_size
+
+
+def test_colliding_hashes(ldap_conn, sanity_rfc2307):
+ """
+ Regression test for ticket:
+ https://github.com/SSSD/sssd/issues/4595
+ """
+
+ first_user = 'user1'
+
+ # initialize data in memcache
+ ent.assert_passwd_by_name(
+ first_user,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ mem_cache = MemoryCache(config.MCACHE_PATH + '/passwd')
+
+ colliding_hash = mem_cache.sss_nss_mc_hash(first_user)
+
+ while True:
+ # string for colliding hash need to be longer then data for user1
+ # stored in memory cache (almost equivalent to:
+ # `getent passwd user1 | wc -c` ==> 45
+ second_user = get_random_string(80)
+ val = mem_cache.sss_nss_mc_hash(second_user)
+ if val == colliding_hash:
+ break
+
+ # add new user to LDAP
+ ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn)
+ ent_list.add_user(second_user, 5001, 5001)
+ ldap_conn.add_s(ent_list[0][0], ent_list[0][1])
+
+ ent.assert_passwd_by_name(
+ second_user,
+ dict(name=second_user, passwd='*', uid=5001, gid=5001,
+ gecos='5001', shell='/bin/bash'))
+
+ stop_sssd()
+
+ # check that both users are stored in cache
+ ent.assert_passwd_by_name(
+ first_user,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_passwd_by_name(
+ second_user,
+ dict(name=second_user, passwd='*', uid=5001, gid=5001,
+ gecos='5001', shell='/bin/bash'))
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__removed_cache_without_invalidation')
+def test_removed_mc(ldap_conn, sanity_rfc2307):
+ """
+ Regression test for ticket:
+ https://fedorahosted.org/sssd/ticket/2726
+ """
+
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
+ ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
+ stop_sssd()
+
+ # remove cache without invalidation
+ for path in os.listdir(config.MCACHE_PATH):
+ os.unlink(config.MCACHE_PATH + "/" + path)
+
+ # sssd is stopped; so the memory cache should not be used
+ # in long living clients (py.test in this case)
+ with pytest.raises(KeyError):
+ pwd.getpwnam('user1')
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ with pytest.raises(KeyError):
+ grp.getgrnam('group1')
+ with pytest.raises(KeyError):
+ grp.getgrgid(2001)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__memcache_timeout_zero')
+def test_mc_zero_timeout(ldap_conn, zero_timeout_rfc2307):
+ """
+ Test that the memory cache is not created at all with memcache_timeout=0
+ """
+ # No memory cache files must be created
+ assert len(os.listdir(config.MCACHE_PATH)) == 0
+
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
+ ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
+ stop_sssd()
+
+ # sssd is stopped; so the memory cache should not be used
+ # in long living clients (py.test in this case)
+ with pytest.raises(KeyError):
+ pwd.getpwnam('user1')
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ with pytest.raises(KeyError):
+ grp.getgrnam('group1')
+ with pytest.raises(KeyError):
+ grp.getgrgid(2001)
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_cache')
+def test_disabled_mc(ldap_conn, disable_memcache_rfc2307):
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
+ ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
+
+ assert_user_gids_equal('user1', [2000, 2001])
+
+ stop_sssd()
+
+ # sssd is stopped and the memory cache is disabled;
+ # so pytest should not be able to find anything
+ with pytest.raises(KeyError):
+ pwd.getpwnam('user1')
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ with pytest.raises(KeyError):
+ grp.getgrnam('group1')
+ with pytest.raises(KeyError):
+ grp.getgrgid(2001)
+
+ with pytest.raises(KeyError):
+ (res, errno, gids) = sssd_id.get_user_gids('user1')
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getpwnam')
+def test_disabled_passwd_mc(ldap_conn, disable_pwd_mc_rfc2307):
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ assert_user_gids_equal('user1', [2000, 2001])
+
+ stop_sssd()
+
+ # passwd cache is disabled
+ with pytest.raises(KeyError):
+ pwd.getpwnam('user1')
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ # Initgroups looks up the user first, hence KeyError from the
+ # passwd database even if the initgroups cache is active.
+ with pytest.raises(KeyError):
+ (res, errno, gids) = sssd_id.get_user_gids('user1')
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_group')
+def test_disabled_group_mc(ldap_conn, disable_grp_mc_rfc2307):
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name("group1", dict(name="group1", gid=2001))
+ ent.assert_group_by_gid(2001, dict(name="group1", gid=2001))
+
+ assert_user_gids_equal('user1', [2000, 2001])
+
+ stop_sssd()
+
+ # group cache is disabled, other caches should work
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ with pytest.raises(KeyError):
+ grp.getgrnam('group1')
+ with pytest.raises(KeyError):
+ grp.getgrgid(2001)
+
+ assert_user_gids_equal('user1', [2000, 2001])
+
+
+@pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getpwnam')
+def test_disabled_initgr_mc(ldap_conn, disable_initgr_mc_rfc2307):
+ # Even if initgroups is disabled, passwd should work
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ stop_sssd()
+
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))