# # LDAP integration test # # Copyright (c) 2015 Red Hat, Inc. # Author: Lukas Slebodnik # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import os import stat import ent import grp import pwd import config import random import signal import string import struct import subprocess import time import pytest import pysss_murmur import ds_openldap import ldap_ent import sssd_id from util import unindent LDAP_BASE_DN = "dc=example,dc=com" @pytest.fixture(scope="module") def ds_inst(request): """LDAP server instance fixture""" ds_inst = ds_openldap.DSOpenLDAP( config.PREFIX, 10389, LDAP_BASE_DN, "cn=admin", "Secret123") try: ds_inst.setup() except Exception: ds_inst.teardown() raise request.addfinalizer(lambda: ds_inst.teardown()) return ds_inst @pytest.fixture(scope="module") def ldap_conn(request, ds_inst): """LDAP server connection fixture""" ldap_conn = ds_inst.bind() ldap_conn.ds_inst = ds_inst request.addfinalizer(lambda: ldap_conn.unbind_s()) return ldap_conn def create_ldap_fixture(request, ldap_conn, ent_list): """Add LDAP entries and add teardown for removing them""" for entry in ent_list: ldap_conn.add_s(entry[0], entry[1]) def teardown(): for entry in ent_list: ldap_conn.delete_s(entry[0]) request.addfinalizer(teardown) def create_conf_fixture(request, contents): """Generate sssd.conf and add teardown for removing it""" conf = open(config.CONF_PATH, "w") conf.write(contents) conf.close() os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) request.addfinalizer(lambda: os.unlink(config.CONF_PATH)) def stop_sssd(): pid_file = open(config.PIDFILE_PATH, "r") pid = int(pid_file.read()) os.kill(pid, signal.SIGTERM) while True: try: os.kill(pid, signal.SIGCONT) except OSError: break time.sleep(1) def create_sssd_fixture(request): """Start sssd and add teardown for stopping it and removing state""" if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: raise Exception("sssd start failed") def teardown(): try: stop_sssd() except Exception: pass for path in os.listdir(config.DB_PATH): os.unlink(config.DB_PATH + "/" + path) for path in os.listdir(config.MCACHE_PATH): os.unlink(config.MCACHE_PATH + "/" + path) # force sss_client libs to realize mem-cache files were deleted try: sssd_id.call_sssd_initgroups("user1", 2001) except Exception: pass try: grp.getgrnam("group1") except Exception: pass try: pwd.getpwnam("user1") except Exception: pass request.addfinalizer(teardown) def load_data_to_ldap(request, ldap_conn): ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) ent_list.add_user("user1", 1001, 2001) ent_list.add_user("user2", 1002, 2002) ent_list.add_user("user3", 1003, 2003) ent_list.add_user("user11", 1011, 2001) ent_list.add_user("user12", 1012, 2002) ent_list.add_user("user13", 1013, 2003) ent_list.add_user("user21", 1021, 2001) ent_list.add_user("user22", 1022, 2002) ent_list.add_user("user23", 1023, 2003) ent_list.add_group("group1", 2001, ["user1", "user11", "user21"]) ent_list.add_group("group2", 2002, ["user2", "user12", "user22"]) ent_list.add_group("group3", 2003, ["user3", "user13", "user23"]) ent_list.add_group("group0x", 2000, ["user1", "user2", "user3"]) ent_list.add_group("group1x", 2010, ["user11", "user12", "user13"]) ent_list.add_group("group2x", 2020, ["user21", "user22", "user23"]) create_ldap_fixture(request, ldap_conn, ent_list) @pytest.fixture def disable_memcache_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] memcache_size_group = 0 memcache_size_passwd = 0 memcache_size_initgroups = 0 [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def disable_pwd_mc_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] memcache_size_passwd = 0 [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def disable_grp_mc_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] memcache_size_group = 0 [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def disable_initgr_mc_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] memcache_size_initgroups = 0 [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def sanity_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def fqname_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} use_fully_qualified_names = true """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def fqname_case_insensitive_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} use_fully_qualified_names = true case_sensitive = false """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.fixture def zero_timeout_rfc2307(request, ldap_conn): load_data_to_ldap(request, ldap_conn) conf = unindent("""\ [sssd] domains = LDAP services = nss [nss] memcache_timeout = 0 [domain/LDAP] ldap_auth_disable_tls_never_use_in_production = true ldap_schema = rfc2307 id_provider = ldap auth_provider = ldap sudo_provider = ldap ldap_uri = {ldap_conn.ds_inst.ldap_url} ldap_search_base = {ldap_conn.ds_inst.base_dn} """).format(**locals()) create_conf_fixture(request, conf) create_sssd_fixture(request) return None @pytest.mark.converted('test_id.py', 'test_id__getpwuid') @pytest.mark.converted('test_id.py', 'test_id__getpwnam') def test_getpwnam(ldap_conn, sanity_rfc2307): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_name( 'user2', dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', shell='/bin/bash')) ent.assert_passwd_by_uid( 1002, dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', shell='/bin/bash')) ent.assert_passwd_by_name( 'user3', dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', shell='/bin/bash')) ent.assert_passwd_by_uid( 1003, dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', shell='/bin/bash')) ent.assert_passwd_by_name( 'user11', dict(name='user11', passwd='*', uid=1011, gid=2001, gecos='1011', shell='/bin/bash')) ent.assert_passwd_by_uid( 1011, dict(name='user11', passwd='*', uid=1011, gid=2001, gecos='1011', shell='/bin/bash')) ent.assert_passwd_by_name( 'user12', dict(name='user12', passwd='*', uid=1012, gid=2002, gecos='1012', shell='/bin/bash')) ent.assert_passwd_by_uid( 1012, dict(name='user12', passwd='*', uid=1012, gid=2002, gecos='1012', shell='/bin/bash')) ent.assert_passwd_by_name( 'user13', dict(name='user13', passwd='*', uid=1013, gid=2003, gecos='1013', shell='/bin/bash')) ent.assert_passwd_by_uid( 1013, dict(name='user13', passwd='*', uid=1013, gid=2003, gecos='1013', shell='/bin/bash')) ent.assert_passwd_by_name( 'user21', dict(name='user21', passwd='*', uid=1021, gid=2001, gecos='1021', shell='/bin/bash')) ent.assert_passwd_by_uid( 1021, dict(name='user21', passwd='*', uid=1021, gid=2001, gecos='1021', shell='/bin/bash')) ent.assert_passwd_by_name( 'user22', dict(name='user22', passwd='*', uid=1022, gid=2002, gecos='1022', shell='/bin/bash')) ent.assert_passwd_by_uid( 1022, dict(name='user22', passwd='*', uid=1022, gid=2002, gecos='1022', shell='/bin/bash')) ent.assert_passwd_by_name( 'user23', dict(name='user23', passwd='*', uid=1023, gid=2003, gecos='1023', shell='/bin/bash')) ent.assert_passwd_by_uid( 1023, dict(name='user23', passwd='*', uid=1023, gid=2003, gecos='1023', shell='/bin/bash')) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam') def test_getpwnam_with_mc(ldap_conn, sanity_rfc2307): test_getpwnam(ldap_conn, sanity_rfc2307) stop_sssd() test_getpwnam(ldap_conn, sanity_rfc2307) @pytest.mark.converted('test_id.py', 'test_id__getgrgid') @pytest.mark.converted('test_id.py', 'test_id__getgrnam') def test_getgrnam_simple(ldap_conn, sanity_rfc2307): ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) ent.assert_group_by_name("group2", dict(name="group2", gid=2002)) ent.assert_group_by_gid(2002, dict(name="group2", gid=2002)) ent.assert_group_by_name("group3", dict(name="group3", gid=2003)) ent.assert_group_by_gid(2003, dict(name="group3", gid=2003)) ent.assert_group_by_name("group0x", dict(name="group0x", gid=2000)) ent.assert_group_by_gid(2000, dict(name="group0x", gid=2000)) ent.assert_group_by_name("group1x", dict(name="group1x", gid=2010)) ent.assert_group_by_gid(2010, dict(name="group1x", gid=2010)) ent.assert_group_by_name("group2x", dict(name="group2x", gid=2020)) ent.assert_group_by_gid(2020, dict(name="group2x", gid=2020)) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getgrnam') def test_getgrnam_simple_with_mc(ldap_conn, sanity_rfc2307): test_getgrnam_simple(ldap_conn, sanity_rfc2307) stop_sssd() test_getgrnam_simple(ldap_conn, sanity_rfc2307) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getgrnam') def test_getgrnam_simple_disabled_pwd_mc(ldap_conn, disable_pwd_mc_rfc2307): test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307) stop_sssd() test_getgrnam_simple(ldap_conn, disable_pwd_mc_rfc2307) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getgrnam') def test_getgrnam_simple_disabled_intitgr_mc(ldap_conn, disable_initgr_mc_rfc2307): test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307) stop_sssd() test_getgrnam_simple(ldap_conn, disable_initgr_mc_rfc2307) @pytest.mark.converted('test_id.py', 'test_id__membership_by_group_id') @pytest.mark.converted('test_id.py', 'test_id__membership_by_group_name') def test_getgrnam_membership(ldap_conn, sanity_rfc2307): ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_name( "group3", dict(mem=ent.contains_only("user3", "user13", "user23"))) ent.assert_group_by_gid( 2003, dict(mem=ent.contains_only("user3", "user13", "user23"))) ent.assert_group_by_name( "group0x", dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_gid( 2000, dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_name( "group1x", dict(mem=ent.contains_only("user11", "user12", "user13"))) ent.assert_group_by_gid( 2010, dict(mem=ent.contains_only("user11", "user12", "user13"))) ent.assert_group_by_name( "group2x", dict(mem=ent.contains_only("user21", "user22", "user23"))) ent.assert_group_by_gid( 2020, dict(mem=ent.contains_only("user21", "user22", "user23"))) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_id') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__membership_by_group_name') def test_getgrnam_membership_with_mc(ldap_conn, sanity_rfc2307): test_getgrnam_membership(ldap_conn, sanity_rfc2307) stop_sssd() test_getgrnam_membership(ldap_conn, sanity_rfc2307) def assert_user_gids_equal(user, expected_gids): (res, errno, gids) = sssd_id.get_user_gids(user) assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user %s, %d" % (user, errno) assert sorted(gids) == sorted(expected_gids), \ "result: %s\n expected %s" % ( ", ".join(["%s" % s for s in sorted(gids)]), ", ".join(["%s" % s for s in sorted(expected_gids)]) ) @pytest.mark.converted('test_id.py', 'test_id__initgroups') def test_initgroups(ldap_conn, sanity_rfc2307): assert_user_gids_equal('user1', [2000, 2001]) assert_user_gids_equal('user2', [2000, 2002]) assert_user_gids_equal('user3', [2000, 2003]) assert_user_gids_equal('user11', [2010, 2001]) assert_user_gids_equal('user12', [2010, 2002]) assert_user_gids_equal('user13', [2010, 2003]) assert_user_gids_equal('user21', [2020, 2001]) assert_user_gids_equal('user22', [2020, 2002]) assert_user_gids_equal('user23', [2020, 2003]) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__user_gids') def test_initgroups_with_mc(ldap_conn, sanity_rfc2307): test_initgroups(ldap_conn, sanity_rfc2307) stop_sssd() test_initgroups(ldap_conn, sanity_rfc2307) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__getpwnam_fully_qualified_names') @pytest.mark.converted('test_id.py', 'test_id__getpwnam_fully_qualified_names') def test_initgroups_fqname_with_mc(ldap_conn, fqname_rfc2307): assert_user_gids_equal('user1@LDAP', [2000, 2001]) stop_sssd() assert_user_gids_equal('user1@LDAP', [2000, 2001]) def assert_initgroups_equal(user, primary_gid, expected_gids): (res, errno, gids) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user %s, %d" % (user, errno) assert sorted(gids) == sorted(expected_gids), \ "result: %s\n expected %s" % ( ", ".join(["%s" % s for s in sorted(gids)]), ", ".join(["%s" % s for s in sorted(expected_gids)]) ) def assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids): assert_initgroups_equal(user1_case1, primary_gid, expected_gids) assert_initgroups_equal(user1_case2, primary_gid, expected_gids) assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) stop_sssd() user = user1_case1 (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups for user should fail user %s, %d, %d" % (user, res, errno) user = user1_case2 (res, errno, _) = sssd_id.call_sssd_initgroups(user, primary_gid) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups for user should fail user %s, %d, %d" % (user, res, errno) # Just last invocation of initgroups should PASS # Otherwise, we would not be able to invalidate it assert_initgroups_equal(user1_case_last, primary_gid, expected_gids) @pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') @pytest.mark.converted('test_id.py', 'test_id__case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') def test_initgroups_case_insensitive_with_mc1(ldap_conn, fqname_case_insensitive_rfc2307): user1_case1 = 'User1@LDAP' user1_case2 = 'uSer1@LDAP' user1_case_last = 'usEr1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) @pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') @pytest.mark.converted('test_id.py', 'test_id__case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') def test_initgroups_case_insensitive_with_mc2(ldap_conn, fqname_case_insensitive_rfc2307): user1_case1 = 'usEr1@LDAP' user1_case2 = 'User1@LDAP' user1_case_last = 'uSer1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) @pytest.mark.converted('test_id.py', 'test_id__fq_names_case_insensitive') @pytest.mark.converted('test_id.py', 'test_id__case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__fq_names_case_insensitive') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__case_insensitive') def test_initgroups_case_insensitive_with_mc3(ldap_conn, fqname_case_insensitive_rfc2307): user1_case1 = 'uSer1@LDAP' user1_case2 = 'usEr1@LDAP' user1_case_last = 'User1@LDAP' primary_gid = 2001 expected_gids = [2000, 2001] assert_stored_last_initgroups(user1_case1, user1_case2, user1_case_last, primary_gid, expected_gids) def run_simple_test_with_initgroups(): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) # unrelated group to user1 ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidatation_of_gids_after_initgroups') def test_invalidation_of_gids_after_initgroups(ldap_conn, sanity_rfc2307): # the sssd cache was empty and not all user's group were # resolved with getgr{nm,gid}. Therefore there is a change in # group membership => user groups should be invalidated run_simple_test_with_initgroups() assert_initgroups_equal("user1", 2001, [2000, 2001]) stop_sssd() ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) # unrelated group to user1 must be returned ent.assert_group_by_name( "group2", dict(mem=ent.contains_only("user2", "user12", "user22"))) ent.assert_group_by_gid( 2002, dict(mem=ent.contains_only("user2", "user12", "user22"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) # user groups must be invalidated for group in ["group1", "group0x"]: with pytest.raises(KeyError): grp.getgrnam(group) for gid in [2000, 2001]: with pytest.raises(KeyError): grp.getgrgid(gid) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__initgroups_without_change_in_membership') def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307): # the sssd cache was empty and not all user's group were # resolved with getgr{nm,gid}. Therefore there is a change in # group membership => user groups should be invalidated run_simple_test_with_initgroups() # invalidate cache subprocess.call(["sss_cache", "-E"]) # all users and groups will be just refreshed from LDAP # but there will not be a change in group membership # user groups should not be invlaidated run_simple_test_with_initgroups() stop_sssd() # everything should be in memory cache run_simple_test_with_initgroups() def assert_mc_records_for_user1(): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name( "group1", dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_gid( 2001, dict(mem=ent.contains_only("user1", "user11", "user21"))) ent.assert_group_by_name( "group0x", dict(mem=ent.contains_only("user1", "user2", "user3"))) ent.assert_group_by_gid( 2000, dict(mem=ent.contains_only("user1", "user2", "user3"))) assert_initgroups_equal("user1", 2001, [2000, 2001]) def assert_missing_mc_records_for_user1(): with pytest.raises(KeyError): pwd.getpwnam("user1") with pytest.raises(KeyError): pwd.getpwuid(1001) for gid in [2000, 2001]: with pytest.raises(KeyError): grp.getgrgid(gid) for group in ["group0x", "group1"]: with pytest.raises(KeyError): grp.getgrnam(group) (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001) assert res == sssd_id.NssReturnCode.UNAVAIL, \ "Initgroups should not find anything after invalidation of mc.\n" \ "User user1, errno:%d" % err @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_before_stop') def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-u", "user1"]) stop_sssd() assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_user_after_stop') def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-u", "user1"]) assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_before_stop') def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-U"]) stop_sssd() assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_users_after_stop') def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-U"]) assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_before_stop') def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-g", "group1"]) stop_sssd() assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_group_after_stop') def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-g", "group1"]) assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_before_stop') def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-G"]) stop_sssd() assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_groups_after_stop') def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-G"]) assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_before_stop') def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() subprocess.call(["sss_cache", "-E"]) stop_sssd() assert_missing_mc_records_for_user1() @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__invalidate_everything_after_stop') def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307): # initialize cache with full ID (res, errno, _) = sssd_id.get_user_groups("user1") assert res == sssd_id.NssReturnCode.SUCCESS, \ "Could not find groups for user1, %d" % errno assert_mc_records_for_user1() stop_sssd() subprocess.call(["sss_cache", "-E"]) assert_missing_mc_records_for_user1() def get_random_string(length): return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(length)]) class MemoryCache(object): SIZEOF_UINT32_T = 4 def __init__(self, path): with open(path, "rb") as fin: fin.seek(4 * self.SIZEOF_UINT32_T) self.seed = struct.unpack('i', fin.read(4))[0] self.data_size = struct.unpack('i', fin.read(4))[0] self.ft_size = struct.unpack('i', fin.read(4))[0] hash_len = struct.unpack('i', fin.read(4))[0] self.hash_size = hash_len / self.SIZEOF_UINT32_T def sss_nss_mc_hash(self, key): input_key = key + '\0' input_len = len(key) + 1 murmur_hash = pysss_murmur.murmurhash3(input_key, input_len, self.seed) return murmur_hash % self.hash_size def test_colliding_hashes(ldap_conn, sanity_rfc2307): """ Regression test for ticket: https://github.com/SSSD/sssd/issues/4595 """ first_user = 'user1' # initialize data in memcache ent.assert_passwd_by_name( first_user, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) mem_cache = MemoryCache(config.MCACHE_PATH + '/passwd') colliding_hash = mem_cache.sss_nss_mc_hash(first_user) while True: # string for colliding hash need to be longer then data for user1 # stored in memory cache (almost equivalent to: # `getent passwd user1 | wc -c` ==> 45 second_user = get_random_string(80) val = mem_cache.sss_nss_mc_hash(second_user) if val == colliding_hash: break # add new user to LDAP ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) ent_list.add_user(second_user, 5001, 5001) ldap_conn.add_s(ent_list[0][0], ent_list[0][1]) ent.assert_passwd_by_name( second_user, dict(name=second_user, passwd='*', uid=5001, gid=5001, gecos='5001', shell='/bin/bash')) stop_sssd() # check that both users are stored in cache ent.assert_passwd_by_name( first_user, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_name( second_user, dict(name=second_user, passwd='*', uid=5001, gid=5001, gecos='5001', shell='/bin/bash')) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__removed_cache_without_invalidation') def test_removed_mc(ldap_conn, sanity_rfc2307): """ Regression test for ticket: https://fedorahosted.org/sssd/ticket/2726 """ ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) stop_sssd() # remove cache without invalidation for path in os.listdir(config.MCACHE_PATH): os.unlink(config.MCACHE_PATH + "/" + path) # sssd is stopped; so the memory cache should not be used # in long living clients (py.test in this case) with pytest.raises(KeyError): pwd.getpwnam('user1') with pytest.raises(KeyError): pwd.getpwuid(1001) with pytest.raises(KeyError): grp.getgrnam('group1') with pytest.raises(KeyError): grp.getgrgid(2001) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__memcache_timeout_zero') def test_mc_zero_timeout(ldap_conn, zero_timeout_rfc2307): """ Test that the memory cache is not created at all with memcache_timeout=0 """ # No memory cache files must be created assert len(os.listdir(config.MCACHE_PATH)) == 0 ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) stop_sssd() # sssd is stopped; so the memory cache should not be used # in long living clients (py.test in this case) with pytest.raises(KeyError): pwd.getpwnam('user1') with pytest.raises(KeyError): pwd.getpwuid(1001) with pytest.raises(KeyError): grp.getgrnam('group1') with pytest.raises(KeyError): grp.getgrgid(2001) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_cache') def test_disabled_mc(ldap_conn, disable_memcache_rfc2307): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) assert_user_gids_equal('user1', [2000, 2001]) stop_sssd() # sssd is stopped and the memory cache is disabled; # so pytest should not be able to find anything with pytest.raises(KeyError): pwd.getpwnam('user1') with pytest.raises(KeyError): pwd.getpwuid(1001) with pytest.raises(KeyError): grp.getgrnam('group1') with pytest.raises(KeyError): grp.getgrgid(2001) with pytest.raises(KeyError): (res, errno, gids) = sssd_id.get_user_gids('user1') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_passwd_getpwnam') def test_disabled_passwd_mc(ldap_conn, disable_pwd_mc_rfc2307): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) assert_user_gids_equal('user1', [2000, 2001]) stop_sssd() # passwd cache is disabled with pytest.raises(KeyError): pwd.getpwnam('user1') with pytest.raises(KeyError): pwd.getpwuid(1001) # Initgroups looks up the user first, hence KeyError from the # passwd database even if the initgroups cache is active. with pytest.raises(KeyError): (res, errno, gids) = sssd_id.get_user_gids('user1') @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_group') def test_disabled_group_mc(ldap_conn, disable_grp_mc_rfc2307): ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_group_by_name("group1", dict(name="group1", gid=2001)) ent.assert_group_by_gid(2001, dict(name="group1", gid=2001)) assert_user_gids_equal('user1', [2000, 2001]) stop_sssd() # group cache is disabled, other caches should work ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) with pytest.raises(KeyError): grp.getgrnam('group1') with pytest.raises(KeyError): grp.getgrgid(2001) assert_user_gids_equal('user1', [2000, 2001]) @pytest.mark.converted('test_memory_cache.py', 'test_memory_cache__disabled_intitgroups_getpwnam') def test_disabled_initgr_mc(ldap_conn, disable_initgr_mc_rfc2307): # Even if initgroups is disabled, passwd should work ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) stop_sssd() ent.assert_passwd_by_name( 'user1', dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash')) ent.assert_passwd_by_uid( 1001, dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', shell='/bin/bash'))