diff options
Diffstat (limited to 'src/tests/intg/test_ldap.py')
-rw-r--r-- | src/tests/intg/test_ldap.py | 2106 |
1 files changed, 2106 insertions, 0 deletions
diff --git a/src/tests/intg/test_ldap.py b/src/tests/intg/test_ldap.py new file mode 100644 index 0000000..81c4f31 --- /dev/null +++ b/src/tests/intg/test_ldap.py @@ -0,0 +1,2106 @@ +# +# LDAP integration test +# +# Copyright (c) 2015 Red Hat, Inc. +# Author: Nikolai Kondrashov <Nikolai.Kondrashov@redhat.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +import os +import stat +import pwd +import grp +import signal +import subprocess +import time +import ldap +import ldap.modlist +import pytest + +import config +import ds_openldap +import ent +import ldap_ent +import sssd_id +import sssd_ldb +from util import unindent +from sssd_nss import NssReturnCode +from sssd_passwd import call_sssd_getpwnam, call_sssd_getpwuid +from sssd_group import call_sssd_getgrnam, call_sssd_getgrgid + +LDAP_BASE_DN = "dc=example,dc=com" +INTERACTIVE_TIMEOUT = 4 + + +@pytest.fixture(scope="module") +def ds_inst(request): + """LDAP server instance fixture""" + ds_inst = ds_openldap.DSOpenLDAP( + config.PREFIX, 10389, LDAP_BASE_DN, + "cn=admin", "Secret123" + ) + + try: + ds_inst.setup() + except Exception: + ds_inst.teardown() + raise + request.addfinalizer(ds_inst.teardown) + return ds_inst + + +@pytest.fixture(scope="module") +def ldap_conn(request, ds_inst): + """LDAP server connection fixture""" + ldap_conn = ds_inst.bind() + ldap_conn.ds_inst = ds_inst + request.addfinalizer(ldap_conn.unbind_s) + return ldap_conn + + +def create_ldap_entries(ldap_conn, ent_list=None): + """Add LDAP entries from ent_list""" + if ent_list is not None: + for entry in ent_list: + ldap_conn.add_s(entry[0], entry[1]) + + +def cleanup_ldap_entries(ldap_conn, ent_list=None): + """Remove LDAP entries added by create_ldap_entries""" + if ent_list is None: + for ou in ("Users", "Groups", "Netgroups", "Services", "Policies"): + for entry in ldap_conn.search_s(f"ou={ou}," + f"{ldap_conn.ds_inst.base_dn}", + ldap.SCOPE_ONELEVEL, + attrlist=[]): + ldap_conn.delete_s(entry[0]) + else: + for entry in ent_list: + ldap_conn.delete_s(entry[0]) + + +def create_ldap_cleanup(request, ldap_conn, ent_list=None): + """Add teardown for removing all user/group LDAP entries""" + request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) + + +def create_ldap_fixture(request, ldap_conn, ent_list=None, cleanup=True): + """Add LDAP entries and add teardown for removing them""" + create_ldap_entries(ldap_conn, ent_list) + if cleanup: + create_ldap_cleanup(request, ldap_conn, ent_list) + + +SCHEMA_RFC2307 = "rfc2307" +SCHEMA_RFC2307_BIS = "rfc2307bis" + + +def format_basic_conf(ldap_conn, schema): + """Format a basic SSSD configuration""" + schema_conf = "ldap_schema = " + schema + "\n" + if schema == SCHEMA_RFC2307_BIS: + schema_conf += "ldap_group_object_class = groupOfNames\n" + return unindent("""\ + [sssd] + debug_level = 0xffff + domains = LDAP + services = nss, pam + enable_files_domain = false + + [nss] + debug_level = 0xffff + memcache_timeout = 0 + entry_negative_timeout = 1 + + [pam] + debug_level = 0xffff + + [domain/LDAP] + ldap_auth_disable_tls_never_use_in_production = true + debug_level = 0xffff + {schema_conf} + id_provider = ldap + auth_provider = ldap + ldap_uri = {ldap_conn.ds_inst.ldap_url} + ldap_search_base = {ldap_conn.ds_inst.base_dn} + """).format(**locals()) + + +def format_interactive_conf(ldap_conn, schema): + """Format an SSSD configuration with all caches refreshing in 4 seconds""" + return \ + format_basic_conf(ldap_conn, schema) + \ + unindent(""" + [nss] + memcache_timeout = 0 + entry_negative_timeout = 0 + + [domain/LDAP] + ldap_purge_cache_timeout = 1 + entry_cache_timeout = {0} + """).format(INTERACTIVE_TIMEOUT) + + +def format_rfc2307bis_deref_conf(ldap_conn, schema): + """Format an SSSD configuration with all caches refreshing in 4 seconds""" + return \ + format_basic_conf(ldap_conn, schema) + \ + unindent(""" + [nss] + memcache_timeout = 0 + entry_negative_timeout = 0 + + [domain/LDAP] + entry_cache_timeout = {0} + ldap_deref_threshold = 1 + """).format(INTERACTIVE_TIMEOUT) + + +def create_conf_file(contents): + """Create sssd.conf with specified contents""" + conf = open(config.CONF_PATH, "w") + conf.write(contents) + conf.close() + os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) + + +def cleanup_conf_file(): + """Remove sssd.conf, if it exists""" + if os.path.lexists(config.CONF_PATH): + os.unlink(config.CONF_PATH) + + +def create_conf_cleanup(request): + """Add teardown for removing sssd.conf""" + request.addfinalizer(cleanup_conf_file) + + +def create_conf_fixture(request, contents): + """ + Create sssd.conf with specified contents and add teardown for removing it + """ + create_conf_file(contents) + create_conf_cleanup(request) + + +def create_sssd_process(): + """Start the SSSD process""" + if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: + raise Exception("sssd start failed") + + +def cleanup_sssd_process(): + """Stop the SSSD process and remove its state""" + try: + pid_file = open(config.PIDFILE_PATH, "r") + pid = int(pid_file.read()) + os.kill(pid, signal.SIGTERM) + while True: + try: + os.kill(pid, signal.SIGCONT) + except OSError: + break + time.sleep(1) + except OSError: + pass + for path in os.listdir(config.DB_PATH): + os.unlink(config.DB_PATH + "/" + path) + for path in os.listdir(config.MCACHE_PATH): + os.unlink(config.MCACHE_PATH + "/" + path) + + +def create_sssd_cleanup(request): + """Add teardown for stopping SSSD and removing its state""" + request.addfinalizer(cleanup_sssd_process) + + +def create_sssd_fixture(request): + """Start SSSD and add teardown for stopping it and removing its state""" + create_sssd_process() + create_sssd_cleanup(request) + + +@pytest.fixture +def sanity_rfc2307(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group("group1", 2001) + ent_list.add_group("group2", 2002) + ent_list.add_group("group3", 2003) + + ent_list.add_group("empty_group", 2010) + + ent_list.add_group("two_user_group", 2012, ["user1", "user2"]) + + ent_list.add_user("t(u)ser", 5000, 5001) + ent_list.add_group("group(_u)ser1", 5001, ["t(u)ser"]) + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def simple_rfc2307(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user('usr\\\\001', 181818, 181818) + ent_list.add_group("group1", 181818) + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def sanity_rfc2307_bis(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + ent_list.add_group_bis("group3", 2003) + + ent_list.add_group_bis("empty_group1", 2010) + ent_list.add_group_bis("empty_group2", 2011) + + ent_list.add_group_bis("two_user_group", 2012, ["user1", "user2"]) + ent_list.add_group_bis("group_empty_group", 2013, [], ["empty_group1"]) + ent_list.add_group_bis("group_two_empty_groups", 2014, + [], ["empty_group1", "empty_group2"]) + ent_list.add_group_bis("one_user_group1", 2015, ["user1"]) + ent_list.add_group_bis("one_user_group2", 2016, ["user2"]) + ent_list.add_group_bis("group_one_user_group", 2017, + [], ["one_user_group1"]) + ent_list.add_group_bis("group_two_user_group", 2018, + [], ["two_user_group"]) + ent_list.add_group_bis("group_two_one_user_groups", 2019, + [], ["one_user_group1", "one_user_group2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def member_with_different_cases_rfc2307_bis(request, ldap_conn): + """ + Create a group where the user DN values of the RFC2307bis member attribute + differ in case from the original DN of the user object. + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + + ent_list.add_group_bis("two_user_group", 2012, ["USER1", "uSeR2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def expected_list_to_name_dict(entries): + return dict((u["name"], u) for u in entries) + + +def test_regression_ticket2163(ldap_conn, simple_rfc2307): + ent.assert_passwd_by_name( + 'usr\\001', + dict(name='usr\\001', passwd='*', uid=181818, gid=181818, + gecos='181818', shell='/bin/bash')) + + +def test_sanity_rfc2307(ldap_conn, sanity_rfc2307): + passwd_pattern = expected_list_to_name_dict([ + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', + dir='/home/user1', shell='/bin/bash'), + dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', + dir='/home/user2', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', + dir='/home/user3', shell='/bin/bash') + ]) + ent.assert_each_passwd_by_name(passwd_pattern) + + group_pattern = expected_list_to_name_dict([ + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group2', passwd='*', gid=2002, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + dict(name='empty_group', passwd='*', gid=2010, + mem=ent.contains_only()), + dict(name='two_user_group', passwd='*', gid=2012, + mem=ent.contains_only("user1", "user2")) + ]) + ent.assert_each_group_by_name(group_pattern) + + with pytest.raises(KeyError): + pwd.getpwnam("non_existent_user") + with pytest.raises(KeyError): + pwd.getpwuid(1) + with pytest.raises(KeyError): + grp.getgrnam("non_existent_group") + with pytest.raises(KeyError): + grp.getgrgid(1) + + +def test_sanity_rfc2307_bis(ldap_conn, sanity_rfc2307_bis): + passwd_pattern = expected_list_to_name_dict([ + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', + dir='/home/user1', shell='/bin/bash'), + dict(name='user2', passwd='*', uid=1002, gid=2002, gecos='1002', + dir='/home/user2', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', + dir='/home/user3', shell='/bin/bash') + ]) + ent.assert_each_passwd_by_name(passwd_pattern) + + group_pattern = expected_list_to_name_dict([ + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group2', passwd='*', gid=2002, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + dict(name='empty_group1', passwd='*', gid=2010, + mem=ent.contains_only()), + dict(name='empty_group2', passwd='*', gid=2011, + mem=ent.contains_only()), + dict(name='two_user_group', passwd='*', gid=2012, + mem=ent.contains_only("user1", "user2")), + dict(name='group_empty_group', passwd='*', gid=2013, + mem=ent.contains_only()), + dict(name='group_two_empty_groups', passwd='*', gid=2014, + mem=ent.contains_only()), + dict(name='one_user_group1', passwd='*', gid=2015, + mem=ent.contains_only("user1")), + dict(name='one_user_group2', passwd='*', gid=2016, + mem=ent.contains_only("user2")), + dict(name='group_one_user_group', passwd='*', gid=2017, + mem=ent.contains_only("user1")), + dict(name='group_two_user_group', passwd='*', gid=2018, + mem=ent.contains_only("user1", "user2")), + dict(name='group_two_one_user_groups', passwd='*', gid=2019, + mem=ent.contains_only("user1", "user2")) + ]) + ent.assert_each_group_by_name(group_pattern) + + with pytest.raises(KeyError): + pwd.getpwnam("non_existent_user") + with pytest.raises(KeyError): + pwd.getpwuid(1) + with pytest.raises(KeyError): + grp.getgrnam("non_existent_group") + with pytest.raises(KeyError): + grp.getgrgid(1) + + +def test_member_with_different_cases_rfc2307_bis( + ldap_conn, + member_with_different_cases_rfc2307_bis): + """ + Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=1817122 + Make sure that group members are added properly to the group even if the + user DN in the RFC2307bis member attribute differs in case from the + original DN of the user object. + """ + group_pattern = expected_list_to_name_dict([ + dict(name='two_user_group', passwd='*', gid=2012, + mem=ent.contains_only("user1", "user2")), + ]) + ent.assert_each_group_by_name(group_pattern) + + +@pytest.fixture +def refresh_after_cleanup_task(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + + ent_list.add_group_bis("group1", 2001, ["user1"]) + ent_list.add_group_bis("group2", 2002, [], ["group1"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + entry_cache_user_timeout = 1 + entry_cache_group_timeout = 5000 + ldap_purge_cache_timeout = 3 + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_refresh_after_cleanup_task(ldap_conn, refresh_after_cleanup_task): + """ + Regression test for ticket: + https://fedorahosted.org/sssd/ticket/2676 + """ + ent.assert_group_by_name( + "group2", + dict(mem=ent.contains_only("user1"))) + + ent.assert_passwd_by_name( + 'user1', + dict(name='user1', passwd='*', uid=1001, gid=2001, + gecos='1001', shell='/bin/bash')) + + time.sleep(15) + + ent.assert_group_by_name( + "group2", + dict(mem=ent.contains_only("user1"))) + + +@pytest.fixture +def update_ts_after_cleanup_task(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2001) + + ent_list.add_group_bis("group1", 2001, ["user1", "user2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + ldap_purge_cache_timeout = 3 + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_update_ts_cache_after_cleanup_task(ldap_conn, + update_ts_after_cleanup_task): + """ + Regression test for ticket: + https://fedorahosted.org/sssd/ticket/2676 + """ + ent.assert_group_by_name( + "group1", + dict(mem=ent.contains_only("user1", "user2"))) + + ent.assert_passwd_by_name( + 'user1', + dict(name='user1', passwd='*', uid=1001, gid=2001, + gecos='1001', shell='/bin/bash')) + + ent.assert_passwd_by_name( + 'user2', + dict(name='user2', passwd='*', uid=1002, gid=2001, + gecos='1002', shell='/bin/bash')) + + if subprocess.call(["sss_cache", "-u", "user1"]) != 0: + raise Exception("sssd_cache failed") + + # The cleanup task runs every 3 seconds, so sleep for 6 + # so that we know the cleanup task ran at least once + # even if we start sleeping during the first one + time.sleep(6) + + ent.assert_group_by_name( + "group1", + dict(mem=ent.contains_only("user1", "user2"))) + + +@pytest.fixture +def blank_rfc2307(request, ldap_conn): + """Create blank RFC2307 directory fixture with interactive SSSD conf""" + create_ldap_cleanup(request, ldap_conn) + create_conf_fixture(request, + format_interactive_conf(ldap_conn, SCHEMA_RFC2307)) + create_sssd_fixture(request) + + +@pytest.fixture +def blank_rfc2307_bis(request, ldap_conn): + """Create blank RFC2307bis directory fixture with interactive SSSD conf""" + create_ldap_cleanup(request, ldap_conn) + create_conf_fixture(request, + format_interactive_conf(ldap_conn, SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + + +@pytest.fixture +def user_and_group_rfc2307(request, ldap_conn): + """ + Create an RFC2307 directory fixture with interactive SSSD conf, + one user and one group + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user", 1001, 2000) + ent_list.add_group("group", 2001) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_interactive_conf(ldap_conn, SCHEMA_RFC2307)) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def user_and_groups_rfc2307_bis(request, ldap_conn): + """ + Create an RFC2307bis directory fixture with interactive SSSD conf, + one user and two groups + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user", 1001, 2000) + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_interactive_conf(ldap_conn, SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def rfc2307bis_deref_group_with_users(request, ldap_conn): + """ + Create an RFC2307bis directory fixture with interactive SSSD conf, + one user and two groups + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2000) + ent_list.add_user("user2", 1001, 2000) + ent_list.add_user("user3", 1001, 2000) + ent_list.add_group_bis("group1", 20000, member_uids=("user1", "user2")) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_rfc2307bis_deref_conf( + ldap_conn, + SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +def test_ldap_group_dereference(ldap_conn, rfc2307bis_deref_group_with_users): + ent.assert_group_by_name("group1", + dict(mem=ent.contains_only("user1", "user2"))) + + +@pytest.fixture +def override_homedir(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_homedir_A", 1001, 2001, + homeDirectory="/home/A") + ent_list.add_user("user_with_homedir_B", 1002, 2002, + homeDirectory="/home/B") + ent_list.add_user("user_with_empty_homedir", 1003, 2003, + homeDirectory="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + override_homedir = /home/B + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_override_homedir(override_homedir): + """Test the effect of the "override_homedir" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_homedir_A", uid=1001, dir="/home/B"), + dict(name="user_with_homedir_B", uid=1002, dir="/home/B"), + dict(name="user_with_empty_homedir", uid=1003, dir="/home/B") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +@pytest.fixture +def fallback_homedir(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_homedir_A", 1001, 2001, + homeDirectory="/home/A") + ent_list.add_user("user_with_homedir_B", 1002, 2002, + homeDirectory="/home/B") + ent_list.add_user("user_with_empty_homedir", 1003, 2003, + homeDirectory="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + fallback_homedir = /home/B + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_fallback_homedir(fallback_homedir): + """Test the effect of the "fallback_homedir" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_homedir_A", uid=1001, dir="/home/A"), + dict(name="user_with_homedir_B", uid=1002, dir="/home/B"), + dict(name="user_with_empty_homedir", uid=1003, dir="/home/B") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +@pytest.fixture +def override_shell(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_shell_A", 1001, 2001, + loginShell="/bin/A") + ent_list.add_user("user_with_shell_B", 1002, 2002, + loginShell="/bin/B") + ent_list.add_user("user_with_empty_shell", 1003, 2003, + loginShell="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + override_shell = /bin/B + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_override_shell(override_shell): + """Test the effect of the "override_shell" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_shell_A", uid=1001, shell="/bin/B"), + dict(name="user_with_shell_B", uid=1002, shell="/bin/B"), + dict(name="user_with_empty_shell", uid=1003, shell="/bin/B") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +@pytest.fixture +def shell_fallback(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_sh_shell", 1001, 2001, + loginShell="/bin/sh") + ent_list.add_user("user_with_not_installed_shell", 1002, 2002, + loginShell="/bin/not_installed") + ent_list.add_user("user_with_empty_shell", 1003, 2003, + loginShell="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + shell_fallback = /bin/fallback + allowed_shells = /bin/not_installed + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_shell_fallback(shell_fallback): + """Test the effect of the "shell_fallback" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_sh_shell", uid=1001, shell="/bin/sh"), + dict(name="user_with_not_installed_shell", uid=1002, + shell="/bin/fallback"), + dict(name="user_with_empty_shell", uid=1003, shell="") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +@pytest.fixture +def default_shell(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_sh_shell", 1001, 2001, + loginShell="/bin/sh") + ent_list.add_user("user_with_not_installed_shell", 1002, 2002, + loginShell="/bin/not_installed") + ent_list.add_user("user_with_empty_shell", 1003, 2003, + loginShell="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + default_shell = /bin/default + allowed_shells = /bin/default, /bin/not_installed + shell_fallback = /bin/fallback + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_default_shell(default_shell): + """Test the effect of the "default_shell" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_sh_shell", uid=1001, shell="/bin/sh"), + dict(name="user_with_not_installed_shell", uid=1002, + shell="/bin/fallback"), + dict(name="user_with_empty_shell", uid=1003, + shell="/bin/default") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +@pytest.fixture +def vetoed_shells(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user_with_sh_shell", 1001, 2001, + loginShell="/bin/sh") + ent_list.add_user("user_with_vetoed_shell", 1002, 2002, + loginShell="/bin/vetoed") + ent_list.add_user("user_with_empty_shell", 1003, 2003, + loginShell="") + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [nss] + default_shell = /bin/default + vetoed_shells = /bin/vetoed + shell_fallback = /bin/fallback + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_vetoed_shells(vetoed_shells): + """Test the effect of the "vetoed_shells" option""" + passwd_pattern = expected_list_to_name_dict([ + dict(name="user_with_sh_shell", uid=1001, shell="/bin/sh"), + dict(name="user_with_vetoed_shell", uid=1002, + shell="/bin/fallback"), + dict(name="user_with_empty_shell", uid=1003, + shell="/bin/default") + ]) + + ent.assert_each_passwd_by_name(passwd_pattern) + + +def test_user_2307bis_nested_groups(ldap_conn, + sanity_rfc2307_bis): + """ + Test nested groups. + + Regression test for ticket: + https://fedorahosted.org/sssd/ticket/3093 + """ + primary_gid = 2001 + # group1, two_user_group, one_user_group1, group_one_user_group, + # group_two_user_group, group_two_one_user_groups + expected_gids = [2001, 2012, 2015, 2017, 2018, 2019] + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, + gid=primary_gid)) + + (res, errno, gids) = sssd_id.call_sssd_initgroups("user1", primary_gid) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(expected_gids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(expected_gids)]) + ) + + +def test_special_characters_in_names(ldap_conn, sanity_rfc2307): + """ + Test special characters which could cause malformed filter + in ldb_seach. + + Regression test for ticket: + https://fedorahosted.org/sssd/ticket/3121 + """ + ent.assert_passwd_by_name( + "t(u)ser", + dict(name="t(u)ser", passwd="*", uid=5000, gid=5001, + gecos="5000", shell="/bin/bash")) + + ent.assert_group_by_name( + "group(_u)ser1", + dict(name="group(_u)ser1", passwd="*", gid=5001, + mem=ent.contains_only("t(u)ser"))) + + +@pytest.fixture +def extra_attributes(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user", 2001, 2000) + ent_list.add_group("group", 2000) + create_ldap_fixture(request, ldap_conn, ent_list) + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent("""\ + [domain/LDAP] + ldap_user_extra_attrs = mail, name:uid, givenName + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +def test_extra_attribute_already_exists(ldap_conn, extra_attributes): + """Test the effect of the "vetoed_shells" option""" + + user = 'user' + extra_attribute = 'givenName' + given_name = b'unix_user' + + user_dn = "uid=" + user + ",ou=Users," + ldap_conn.ds_inst.base_dn + + old = {'objectClass': [b'top', b'inetOrgPerson', b'posixAccount']} + new = {'objectClass': [b'top', b'inetOrgPerson', b'posixAccount', + b'extensibleObject']} + ldif = ldap.modlist.modifyModlist(old, new) + + ldap_conn.modify_s(user_dn, ldif) + ldap_conn.modify_s(user_dn, [(ldap.MOD_ADD, extra_attribute, given_name)]) + + ent.assert_passwd_by_name( + user, + dict(name="user", uid=2001, gid=2000, shell="/bin/bash"), + ) + + domain = 'LDAP' + ldb_conn = sssd_ldb.SssdLdb('LDAP') + val = ldb_conn.get_entry_attr(sssd_ldb.CacheType.sysdb, + sssd_ldb.TsCacheEntry.user, + user, domain, extra_attribute) + + assert val == given_name + + +@pytest.fixture +def add_user_to_group(request, ldap_conn): + """ + Adding user to group + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_group_bis("group1", 20001, member_uids=["user1"]) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_rfc2307bis_deref_conf( + ldap_conn, + SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +def test_add_user_to_group(ldap_conn, add_user_to_group): + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_group_by_name("group1", dict(mem=ent.contains_only("user1"))) + + +@pytest.fixture +def remove_user_from_group(request, ldap_conn): + """ + Adding user to group + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_group_bis("group1", 20001, member_uids=["user1", "user2"]) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_rfc2307bis_deref_conf( + ldap_conn, + SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +def test_remove_user_from_group(ldap_conn, remove_user_from_group): + """ + Removing two users from group, step by step + """ + group1_dn = 'cn=group1,ou=Groups,' + ldap_conn.ds_inst.base_dn + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + ent.assert_group_by_name("group1", + dict(mem=ent.contains_only("user1", "user2"))) + + # removing of user2 from group1 + old = {'member': [b"uid=user1,ou=Users,dc=example,dc=com", + b"uid=user2,ou=Users,dc=example,dc=com"]} + new = {'member': [b"uid=user1,ou=Users,dc=example,dc=com"]} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group1_dn, ldif) + + if subprocess.call(["sss_cache", "-GU"]) != 0: + raise Exception("sssd_cache failed") + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + ent.assert_group_by_name("group1", dict(mem=ent.contains_only("user1"))) + + # removing of user1 from group1 + old = {'member': [b"uid=user1,ou=Users,dc=example,dc=com"]} + new = {'member': []} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group1_dn, ldif) + + if subprocess.call(["sss_cache", "-GU"]) != 0: + raise Exception("sssd_cache failed") + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + ent.assert_group_by_name("group1", dict(mem=ent.contains_only())) + + +@pytest.fixture +def remove_user_from_nested_group(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_group_bis("group1", 20001, member_uids=["user1"]) + ent_list.add_group_bis("group2", 20002, member_uids=["user2"]) + ent_list.add_group_bis("group3", 20003, member_gids=["group1", "group2"]) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + format_rfc2307bis_deref_conf( + ldap_conn, + SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +def test_remove_user_from_nested_group(ldap_conn, + remove_user_from_nested_group): + + group3_dn = 'cn=group3,ou=Groups,' + ldap_conn.ds_inst.base_dn + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + + ent.assert_group_by_name("group1", + dict(mem=ent.contains_only("user1"))) + ent.assert_group_by_name("group2", + dict(mem=ent.contains_only("user2"))) + + ent.assert_group_by_name("group3", + dict(mem=ent.contains_only("user1", + "user2"))) + + # removing of group2 from group3 + old = {'member': [b"cn=group1,ou=Groups,dc=example,dc=com", + b"cn=group2,ou=Groups,dc=example,dc=com"]} + new = {'member': [b"cn=group1,ou=Groups,dc=example,dc=com"]} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group3_dn, ldif) + + if subprocess.call(["sss_cache", "-GU"]) != 0: + raise Exception("sssd_cache failed") + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + + ent.assert_group_by_name("group1", + dict(mem=ent.contains_only("user1"))) + ent.assert_group_by_name("group2", + dict(mem=ent.contains_only("user2"))) + ent.assert_group_by_name("group3", + dict(mem=ent.contains_only("user1"))) + + # removing of group1 from group3 + old = {'member': [b"cn=group1,ou=Groups,dc=example,dc=com"]} + new = {'member': []} + + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group3_dn, ldif) + + if subprocess.call(["sss_cache", "-GU"]) != 0: + raise Exception("sssd_cache failed") + + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=2001)) + ent.assert_passwd_by_name("user2", dict(name="user2", uid=1002, gid=2002)) + + ent.assert_group_by_name("group1", + dict(mem=ent.contains_only("user1"))) + ent.assert_group_by_name("group2", + dict(mem=ent.contains_only("user2"))) + ent.assert_group_by_name("group3", + dict(mem=ent.contains_only())) + + +def zero_nesting_sssd_conf(ldap_conn, schema): + """Format an SSSD configuration with group nesting disabled""" + return \ + format_basic_conf(ldap_conn, schema) + \ + unindent(""" + [domain/LDAP] + ldap_group_nesting_level = 0 + """).format(INTERACTIVE_TIMEOUT) + + +@pytest.fixture +def rfc2307bis_no_nesting(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_group_bis("primarygroup", 2001) + ent_list.add_group_bis("parentgroup", 2010, member_uids=["user1"]) + ent_list.add_group_bis("nestedgroup", 2011, member_gids=["parentgroup"]) + create_ldap_fixture(request, ldap_conn, ent_list) + create_conf_fixture(request, + zero_nesting_sssd_conf( + ldap_conn, + SCHEMA_RFC2307_BIS)) + create_sssd_fixture(request) + return None + + +def test_zero_nesting_level(ldap_conn, rfc2307bis_no_nesting): + """ + Test initgroups operation with rfc2307bis schema asserting + only primary group and parent groups are included in group + list. No parent groups of groups should be returned with zero + group nesting level. + """ + ent.assert_group_by_name("parentgroup", + dict(mem=ent.contains_only("user1"))) + ent.assert_group_by_name("nestedgroup", + dict(mem=ent.contains_only())) + + (res, errno, grp_list) = sssd_id.get_user_groups("user1") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user1, %d" % errno + + # test nestedgroup is not returned in group list + assert sorted(grp_list) == sorted(["primarygroup", "parentgroup"]) + + +@pytest.fixture +def sanity_nss_filter(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + ent_list.add_group_bis("group3", 2003) + + ent_list.add_group_bis("empty_group1", 2010) + ent_list.add_group_bis("empty_group2", 2011) + + ent_list.add_group_bis("two_user_group", 2012, ["user1", "user2"]) + ent_list.add_group_bis("group_empty_group", 2013, [], ["empty_group1"]) + ent_list.add_group_bis("group_two_empty_groups", 2014, + [], ["empty_group1", "empty_group2"]) + ent_list.add_group_bis("one_user_group1", 2015, ["user1"]) + ent_list.add_group_bis("one_user_group2", 2016, ["user2"]) + ent_list.add_group_bis("group_one_user_group", 2017, + [], ["one_user_group1"]) + ent_list.add_group_bis("group_two_user_group", 2018, + [], ["two_user_group"]) + ent_list.add_group_bis("group_two_one_user_groups", 2019, + [], ["one_user_group1", "one_user_group2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [nss] + filter_users = user2 + filter_groups = group_two_one_user_groups + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_nss_filters(ldap_conn, sanity_nss_filter): + passwd_pattern = expected_list_to_name_dict([ + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', + dir='/home/user1', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', + dir='/home/user3', shell='/bin/bash') + ]) + + # test filtered user + ent.assert_each_passwd_by_name(passwd_pattern) + with pytest.raises(KeyError): + pwd.getpwnam("user2") + with pytest.raises(KeyError): + pwd.getpwuid(1002) + + group_pattern = expected_list_to_name_dict([ + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group2', passwd='*', gid=2002, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + dict(name='empty_group1', passwd='*', gid=2010, + mem=ent.contains_only()), + dict(name='empty_group2', passwd='*', gid=2011, + mem=ent.contains_only()), + dict(name='two_user_group', passwd='*', gid=2012, + mem=ent.contains_only("user1")), + dict(name='group_empty_group', passwd='*', gid=2013, + mem=ent.contains_only()), + dict(name='group_two_empty_groups', passwd='*', gid=2014, + mem=ent.contains_only()), + dict(name='one_user_group1', passwd='*', gid=2015, + mem=ent.contains_only("user1")), + dict(name='one_user_group2', passwd='*', gid=2016, + mem=ent.contains_only()), + dict(name='group_one_user_group', passwd='*', gid=2017, + mem=ent.contains_only("user1")), + dict(name='group_two_user_group', passwd='*', gid=2018, + mem=ent.contains_only("user1")), + ]) + + # test filtered group + ent.assert_each_group_by_name(group_pattern) + with pytest.raises(KeyError): + grp.getgrnam("group_two_one_user_groups") + with pytest.raises(KeyError): + grp.getgrgid(2019) + + # test non-existing user/group + with pytest.raises(KeyError): + pwd.getpwnam("non_existent_user") + with pytest.raises(KeyError): + pwd.getpwuid(9) + with pytest.raises(KeyError): + grp.getgrnam("non_existent_group") + with pytest.raises(KeyError): + grp.getgrgid(14) + + # test initgroups - user1 is member of group_two_one_user_groups (2019) + # which is filtered out + (res, errno, gids) = sssd_id.call_sssd_initgroups("user1", 2001) + assert res == sssd_id.NssReturnCode.SUCCESS + + user_with_group_ids = [2001, 2012, 2015, 2017, 2018] + assert sorted(gids) == sorted(user_with_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user_with_group_ids)]) + ) + + +@pytest.fixture +def sanity_nss_filter_cached(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + ent_list.add_user("root", 1004, 2004) + ent_list.add_user("zerouid", 0, 0) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + ent_list.add_group_bis("group3", 2003) + ent_list.add_group_bis("root", 2004) + ent_list.add_group_bis("zerogid", 0) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [nss] + filter_users = user2 + filter_groups = group2 + entry_negative_timeout = 1 + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_nss_filters_cached(ldap_conn, sanity_nss_filter_cached): + passwd_pattern = expected_list_to_name_dict([ + dict(name='user1', passwd='*', uid=1001, gid=2001, gecos='1001', + dir='/home/user1', shell='/bin/bash'), + dict(name='user3', passwd='*', uid=1003, gid=2003, gecos='1003', + dir='/home/user3', shell='/bin/bash') + ]) + ent.assert_each_passwd_by_name(passwd_pattern) + + # test filtered user + with pytest.raises(KeyError): + pwd.getpwuid(1002) + time.sleep(2) + with pytest.raises(KeyError): + pwd.getpwuid(1002) + + group_pattern = expected_list_to_name_dict([ + dict(name='group1', passwd='*', gid=2001, mem=ent.contains_only()), + dict(name='group3', passwd='*', gid=2003, mem=ent.contains_only()), + ]) + ent.assert_each_group_by_name(group_pattern) + + # test filtered group + with pytest.raises(KeyError): + grp.getgrgid(2002) + time.sleep(2) + with pytest.raises(KeyError): + grp.getgrgid(2002) + + # test that root is always filtered even if filter_users contains other + # entries. This is a regression test for upstream ticket #3460 + res, _ = call_sssd_getpwnam("root") + assert res == NssReturnCode.NOTFOUND + + res, _ = call_sssd_getgrnam("root") + assert res == NssReturnCode.NOTFOUND + + res, _ = call_sssd_getpwuid(0) + assert res == NssReturnCode.NOTFOUND + + res, _ = call_sssd_getgrgid(0) + assert res == NssReturnCode.NOTFOUND + + +@pytest.fixture +def mpg_setup(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 2003) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("group2", 2002) + ent_list.add_group_bis("group3", 2003) + + ent_list.add_group_bis("two_user_group", 2012, ["user1", "user2"]) + ent_list.add_group_bis("one_user_group1", 2015, ["user1"]) + ent_list.add_group_bis("one_user_group2", 2016, ["user2"]) + + create_ldap_entries(ldap_conn, ent_list) + create_ldap_cleanup(request, ldap_conn, None) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + auto_private_groups = True + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_ldap_auto_private_groups_direct(ldap_conn, mpg_setup): + """ + Integration test for auto_private_groups + + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + # Make sure the user's GID is taken from their uidNumber + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=1001)) + # Make sure the private group is resolvable by name and by GID + ent.assert_group_by_name("user1", dict(gid=1001, mem=ent.contains_only())) + ent.assert_group_by_gid(1001, dict(name="user1", mem=ent.contains_only())) + + # The group referenced in user's gidNumber attribute should be still + # visible, but it's fine that it doesn't contain the user as a member + # as the group is currently added during the initgroups operation only + ent.assert_group_by_name("group1", dict(gid=2001, mem=ent.contains_only())) + ent.assert_group_by_gid(2001, dict(name="group1", mem=ent.contains_only())) + + # The user's secondary groups list must be correct as well + # Note that the original GID is listed as well -- this is correct and + # expected because we save the original GID in the + # SYSDB_PRIMARY_GROUP_GIDNUM attribute + user1_expected_gids = [1001, 2001, 2012, 2015] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user1", 1001) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user1_expected_gids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user1_expected_gids)]) + ) + + # Request user2's private group by GID without resolving the user first. + # This must trigger user resolution through by-GID resolution, since the + # GID doesn't exist on its own in LDAP + ent.assert_group_by_gid(1002, dict(name="user2", mem=ent.contains_only())) + + # Test supplementary groups for user2 as well + user1_expected_gids = [1002, 2002, 2012, 2016] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user2", 1002) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user1_expected_gids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user1_expected_gids)]) + ) + + # Request user3's private group by name without resolving the user first + # This must trigger user resolution through by-name resolution, since the + # name doesn't exist on its own in LDAP + ent.assert_group_by_name("user3", dict(gid=1003, mem=ent.contains_only())) + + # Remove entries and request them again to make sure they are not + # resolvable anymore + cleanup_ldap_entries(ldap_conn, None) + + if subprocess.call(["sss_cache", "-GU"]) != 0: + raise Exception("sssd_cache failed") + + with pytest.raises(KeyError): + pwd.getpwnam("user1") + with pytest.raises(KeyError): + grp.getgrnam("user1") + with pytest.raises(KeyError): + grp.getgrgid(1002) + with pytest.raises(KeyError): + grp.getgrnam("user3") + + +@pytest.fixture +def mpg_setup_conflict(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_user("user2", 1002, 2002) + ent_list.add_user("user3", 1003, 1003) + ent_list.add_group_bis("group1", 1001) + ent_list.add_group_bis("group2", 1002) + ent_list.add_group_bis("group3", 1003) + ent_list.add_group_bis("supp_group", 2015, ["user3"]) + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + auto_private_groups = True + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_ldap_auto_private_groups_conflict(ldap_conn, mpg_setup_conflict): + """ + Make sure that conflicts between groups that are auto-created with the + help of the auto_private_groups option and between 'real' LDAP groups + are handled in a predictable manner. + """ + # Make sure the user's GID is taken from their uidNumber + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=1001)) + # Make sure the private group is resolvable by name and by GID + ent.assert_group_by_name("user1", dict(gid=1001, mem=ent.contains_only())) + ent.assert_group_by_gid(1001, dict(name="user1", mem=ent.contains_only())) + + # Let's request the group with the same ID as user2's private group + # The request should match the 'real' group + ent.assert_group_by_gid(1002, dict(name="group2", mem=ent.contains_only())) + # But because of the GID conflict, the user cannot be resolved + with pytest.raises(KeyError): + pwd.getpwnam("user2") + + # This user's GID is the same as the UID in this entry. The most important + # thing here is that the supplementary groups are correct and the GID + # resolves to the private group (as long as the user was requested first) + user3_expected_gids = [1003, 2015] + ent.assert_passwd_by_name("user3", dict(name="user3", uid=1003, gid=1003)) + (res, errno, gids) = sssd_id.call_sssd_initgroups("user3", 1003) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user3_expected_gids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user3_expected_gids)]) + ) + # Make sure the private group is resolvable by name and by GID + ent.assert_group_by_gid(1003, dict(name="user3", mem=ent.contains_only())) + ent.assert_group_by_name("user3", dict(gid=1003, mem=ent.contains_only())) + + +@pytest.fixture +def mpg_setup_no_gid(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + + ent_list.add_group_bis("group1", 2001) + ent_list.add_group_bis("one_user_group1", 2015, ["user1"]) + + create_ldap_entries(ldap_conn, ent_list) + create_ldap_cleanup(request, ldap_conn, None) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + auto_private_groups = True + ldap_user_gid_number = no_such_attribute + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_ldap_auto_private_groups_direct_no_gid(ldap_conn, mpg_setup_no_gid): + """ + Integration test for auto_private_groups - test that even a user with + no GID assigned at all can be resolved including their autogenerated + primary group. + + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + # Make sure the user's GID is taken from their uidNumber + ent.assert_passwd_by_name("user1", dict(name="user1", uid=1001, gid=1001)) + # Make sure the private group is resolvable by name and by GID + ent.assert_group_by_name("user1", dict(gid=1001, mem=ent.contains_only())) + ent.assert_group_by_gid(1001, dict(name="user1", mem=ent.contains_only())) + + # The group referenced in user's gidNumber attribute should be still + # visible, but shouldn't have any relation to the user + ent.assert_group_by_name("group1", dict(gid=2001, mem=ent.contains_only())) + ent.assert_group_by_gid(2001, dict(name="group1", mem=ent.contains_only())) + + # The user's secondary groups list must be correct as well. This time only + # the generated group and the explicit secondary group are added, since + # there is no original GID + user1_expected_gids = [1001, 2015] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user1", 1001) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user1_expected_gids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user1_expected_gids)]) + ) + + +@pytest.fixture +def mpg_setup_hybrid(request, ldap_conn): + """ + This setup creates two users - one with a GID that corresponds to + a group and another with GID that does not. + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_user("user_with_group", 1001, 2001) + ent_list.add_group_bis("user_with_group_pvt", 2001) + ent_list.add_group_bis("with_group_group1", 10010, ["user_with_group"]) + ent_list.add_group_bis("with_group_group2", 10011, ["user_with_group"]) + + ent_list.add_user("user_with_no_group", 1002, 1002) + # Note - there is no group for the GID 1002 + ent_list.add_group_bis("no_group_group1", 10020, ["user_with_no_group"]) + ent_list.add_group_bis("no_group_group2", 10021, ["user_with_no_group"]) + + # This user has their gid different from the UID, but there is + # no group with gid 2003 + ent_list.add_user("user_with_unresolvable_gid", 1003, 2003) + ent_list.add_group_bis("unresolvable_group1", + 10030, + ["user_with_unresolvable_gid"]) + ent_list.add_group_bis("unresolvable_group2", + 10031, + ["user_with_unresolvable_gid"]) + + # This user's autogenerated private group should be shadowed + # by the real one + ent_list.add_user("user_with_real_group", 1004, 1004) + ent_list.add_user("user_in_pvt_group", 1005, 1005) + ent_list.add_group_bis("user_with_real_group_pvt", + 1004, + ['user_in_pvt_group']) + ent_list.add_group_bis("with_real_group_group1", + 10040, + ["user_with_real_group"]) + ent_list.add_group_bis("with_real_group_group2", + 10041, + ["user_with_real_group"]) + + # Test shadowing again, but this time with the same name + ent_list.add_user("u_g_same_name", 1006, 1006) + ent_list.add_group_bis("u_g_same_name", + 1006, + ['user_in_pvt_group']) + ent_list.add_group_bis("u_g_same_g1", + 10060, + ["u_g_same_name"]) + ent_list.add_group_bis("u_g_same_g2", + 10061, + ["u_g_same_name"]) + + create_ldap_entries(ldap_conn, ent_list) + create_ldap_cleanup(request, ldap_conn, None) + + conf = \ + format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [domain/LDAP] + auto_private_groups = hybrid + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_ldap_auto_private_groups_hybrid_direct(ldap_conn, mpg_setup_hybrid): + """ + Integration test for auto_private_groups=hybrid. This test checks the + resolution of the users and their groups. + + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + # Make sure the user's GID is taken from their gidNumber, if available + ent.assert_passwd_by_name("user_with_group", + dict(name="user_with_group", uid=1001, gid=2001)) + + # The user's secondary groups list must be correct as well and include + # the primary gid, too + user_with_group_ids = [2001, 10010, 10011] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user_with_group", 2001) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user_with_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user_with_group_ids)]) + ) + + # On the other hand, if the gidNumber is the same as UID, SSSD should + # just autogenerate the private group on its own + ent.assert_passwd_by_name("user_with_no_group", + dict(name="user_with_no_group", + uid=1002, gid=1002)) + + # The user's secondary groups list must be correct as well. Since there was + # no original GID, it is not added to the list + user_without_group_ids = [1002, 10020, 10021] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user_with_no_group", + 1002) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user_without_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user_without_group_ids)]) + ) + + ent.assert_passwd_by_name("user_with_unresolvable_gid", + dict(name="user_with_unresolvable_gid", + uid=1003, gid=2003)) + unresolvable_group_ids = [2003, 10030, 10031] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user_with_unresolvable_gid", 2003) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(unresolvable_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(unresolvable_group_ids)]) + ) + + ent.assert_passwd_by_name("user_with_real_group", + dict(name="user_with_real_group", + uid=1004, gid=1004)) + with_real_group_ids = [1004, 10040, 10041] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user_with_real_group", 1004) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(with_real_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(with_real_group_ids)]) + ) + + +def test_ldap_auto_private_groups_hybrid_priv_group_byname(ldap_conn, + mpg_setup_hybrid): + """ + Integration test for auto_private_groups=hybrid. This test checks the + resolution of user private groups by name. + + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + # gidNumber is resolvable by name.. + ent.assert_group_by_name("user_with_group_pvt", + dict(gid=2001, + mem=ent.contains_only())) + + # ..but since this user /has/ a gidNumber set, their autogenerated group + # should not be resolvable + with pytest.raises(KeyError): + grp.getgrnam("user_with_group") + + # Finally, the autogenerated group for the user with + # uidNumber==gidNumber must be resolvable + ent.assert_group_by_name("user_with_no_group", + dict(gid=1002, + mem=ent.contains_only())) + + # A gid that is different from an UID must not resolve to a private + # group even if the private group does not exist + with pytest.raises(KeyError): + grp.getgrnam("user_with_unresolvable_gid") + + # If there is a user with the same UID and GID but there is a real + # group corresponding to the primary GID, the real group should take + # precedence and the automatic group should not be resolvable + ent.assert_group_by_name("user_with_real_group_pvt", + dict(gid=1004, + mem=ent.contains_only('user_in_pvt_group',))) + + # getgrnam should not return + with pytest.raises(KeyError): + grp.getgrnam("user_with_real_group") + + +def test_ldap_auto_private_groups_hybrid_priv_group_byid(ldap_conn, + mpg_setup_hybrid): + """ + Integration test for auto_private_groups=hybrid. This test checks the + resolution of user private groups by name. + + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + # Make sure the private group of user who has this group set in their + # gidNumber is resolvable by ID + ent.assert_group_by_gid(2001, + dict(name="user_with_group_pvt", + mem=ent.contains_only())) + + # ..but since this user /has/ a gidNumber set different from the uidNumber, + # their autogenerated group + # should not be resolvable + with pytest.raises(KeyError): + grp.getgrgid(1001) + + # Finally, the autogenerated group for the user with + # uidNumber==gidNumber must be resolvable + ent.assert_group_by_gid(1002, + dict(name="user_with_no_group", + mem=ent.contains_only())) + + # A gid that is different from an UID must not resolve to a private + # group even if the private group does not exist + with pytest.raises(KeyError): + grp.getgrgid(2003) + + # Conversely, a GID that corresponds to a group must not resolve to + # the autogenerated group (IOW, the autogenerated group should not + # shadow the real one + ent.assert_group_by_gid(1004, + dict(name="user_with_real_group_pvt", + mem=ent.contains_only('user_in_pvt_group'))) + + +def test_ldap_auto_private_groups_hybrid_name_gid_identical(ldap_conn, + mpg_setup_hybrid): + """ + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + ent.assert_passwd_by_name("u_g_same_name", + dict(name="u_g_same_name", + uid=1006, gid=1006)) + user_without_group_ids = [1006, 10060, 10061] + (res, errno, gids) = sssd_id.call_sssd_initgroups("u_g_same_name", + 1006) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user_without_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user_without_group_ids)]) + ) + ent.assert_group_by_gid(1006, + dict(name="u_g_same_name", + mem=ent.contains_only('user_in_pvt_group'))) + + +def test_ldap_auto_private_groups_hybrid_initgr(ldap_conn, mpg_setup_hybrid): + """ + See also ticket https://github.com/SSSD/sssd/issues/2914 + """ + user_without_group_ids = [1004, 10040, 10041] + (res, errno, gids) = sssd_id.call_sssd_initgroups("user_with_real_group", + 1004) + assert res == sssd_id.NssReturnCode.SUCCESS + + assert sorted(gids) == sorted(user_without_group_ids), \ + "result: %s\n expected %s" % ( + ", ".join(["%s" % s for s in sorted(gids)]), + ", ".join(["%s" % s for s in sorted(user_without_group_ids)]) + ) + + ent.assert_group_by_gid(1004, + dict(name="user_with_real_group_pvt", + mem=ent.contains_only('user_in_pvt_group'))) + + +def rename_setup_no_cleanup(request, ldap_conn, cleanup_ent=None): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001) + ent_list.add_group_bis("user1_private", 2001) + + ent_list.add_user("user2", 1002, 2002) + ent_list.add_group_bis("user2_private", 2002) + + ent_list.add_group_bis("group1", 2015, ["user1", "user2"]) + + if cleanup_ent is None: + create_ldap_fixture(request, ldap_conn, ent_list) + else: + # Since the entries were renamed, we need to clean up + # the renamed entries.. + create_ldap_fixture(request, ldap_conn, ent_list, cleanup=False) + create_ldap_cleanup(request, ldap_conn, None) + + +@pytest.fixture +def rename_setup_cleanup(request, ldap_conn): + cleanup_ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + cleanup_ent_list.add_user("user1", 1001, 2001) + cleanup_ent_list.add_group_bis("new_user1_private", 2001) + + cleanup_ent_list.add_user("user2", 1002, 2002) + cleanup_ent_list.add_group_bis("new_user2_private", 2002) + + cleanup_ent_list.add_group_bis("new_group1", 2015, ["user1", "user2"]) + + rename_setup_no_cleanup(request, ldap_conn, cleanup_ent_list) + + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def rename_setup_with_name(request, ldap_conn): + rename_setup_no_cleanup(request, ldap_conn) + + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + \ + unindent(""" + [nss] + [domain/LDAP] + ldap_group_name = name + timeout = 3000 + """).format(**locals()) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_rename_incomplete_group_same_dn(ldap_conn, rename_setup_with_name): + """ + Test that if a group's name attribute changes, but the DN stays the same, + the incomplete group object will be renamed. + + Because the RDN attribute must be present in the entry, we add another + attribute "name" that is purposefully different from the CN and make + sure the group names are reflected in name + + Regression test for https://github.com/SSSD/sssd/issues/4315 + """ + pvt_dn1 = 'cn=user1_private,ou=Groups,' + ldap_conn.ds_inst.base_dn + pvt_dn2 = 'cn=user2_private,ou=Groups,' + ldap_conn.ds_inst.base_dn + group1_dn = 'cn=group1,ou=Groups,' + ldap_conn.ds_inst.base_dn + + # Add the name we want for both private and secondary group + old = {'name': []} + new = {'name': [b"user1_group1"]} + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group1_dn, ldif) + + new = {'name': [b"pvt_user1"]} + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(pvt_dn1, ldif) + + new = {'name': [b"pvt_user2"]} + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(pvt_dn2, ldif) + + # Make sure the old name shows up in the id output + (res, errno, grp_list) = sssd_id.get_user_groups("user1") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user1, %d" % errno + + assert sorted(grp_list) == sorted(["pvt_user1", "user1_group1"]) + + # Rename the group by changing the cn attribute, but keep the DN the same + old = {'name': [b"user1_group1"]} + new = {'name': [b"new_user1_group1"]} + ldif = ldap.modlist.modifyModlist(old, new) + ldap_conn.modify_s(group1_dn, ldif) + + (res, errno, grp_list) = sssd_id.get_user_groups("user2") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user2, %d" % errno + + assert sorted(grp_list) == sorted(["pvt_user2", "new_user1_group1"]) + + (res, errno, grp_list) = sssd_id.get_user_groups("user1") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user1, %d" % errno + + assert sorted(grp_list) == sorted(["pvt_user1", "new_user1_group1"]) + + +def test_rename_incomplete_group_rdn_changed(ldap_conn, rename_setup_cleanup): + """ + If a group is renamed and also some attributes changed and gid remains the + same then existing group in the cache is overridden with the new attributes + and name. + """ + pvt_dn = 'cn=user1_private,ou=Groups,' + ldap_conn.ds_inst.base_dn + group1_dn = 'cn=group1,ou=Groups,' + ldap_conn.ds_inst.base_dn + + # Make sure the old name shows up in the id output + (res, errno, grp_list) = sssd_id.get_user_groups("user1") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user1, %d" % errno + + assert sorted(grp_list) == sorted(["user1_private", "group1"]) + + # Rename the groups, changing the RDN + ldap_conn.rename_s(group1_dn, "cn=new_group1") + ldap_conn.rename_s(pvt_dn, "cn=new_user1_private") + + (res, errno, grp_list) = sssd_id.get_user_groups("user2") + assert res == sssd_id.NssReturnCode.SUCCESS, \ + "Could not find groups for user2, %d" % errno + + # The initgroups succeeds, but because saving the new group fails, + # SSSD will revert to the cache contents and return what's in the cache + assert sorted(grp_list) == sorted(["user2_private", "new_group1"]) + + +@pytest.fixture +def find_local_user_and_group(): + f = open("/etc/passwd") + for line in f: + passwd_user = line.split(':') + passwd_user[2] = int(passwd_user[2]) + if passwd_user[2] != 0: + break + f.close() + assert passwd_user[2] != 0 + + f = open("/etc/group") + for line in f: + passwd_group = line.split(':') + passwd_group[2] = int(passwd_group[2]) + if passwd_group[2] != 0: + break + f.close() + assert passwd_group[2] != 0 + + return (passwd_user, passwd_group) + + +@pytest.fixture +def user_and_group_rfc2307_lcl(find_local_user_and_group, + user_and_group_rfc2307): + return find_local_user_and_group + + +def test_local_negative_timeout_enabled_by_default(ldap_conn, + user_and_group_rfc2307_lcl): + """ + Test that with the default local_negative_timeout value, a user who can't + be resolved through SSSD but can be resolved in LDAP is negatively cached + """ + # sanity check - try resolving an LDAP user + ent.assert_passwd_by_name("user", dict(name="user", uid=1001, gid=2000)) + + passwd_user, passwd_group = user_and_group_rfc2307_lcl + + # resolve a user who is not in LDAP, but exists locally + res, _ = call_sssd_getpwnam(passwd_user[0]) + assert res == NssReturnCode.NOTFOUND + # Do the same by UID + res, _ = call_sssd_getpwuid(passwd_user[2]) + assert res == NssReturnCode.NOTFOUND + + # Do the same for a group both by name and by ID + res, _ = call_sssd_getgrnam(passwd_group[0]) + assert res == NssReturnCode.NOTFOUND + res, _ = call_sssd_getgrgid(passwd_group[2]) + assert res == NssReturnCode.NOTFOUND + + # add the user and the group to LDAP + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user(passwd_user[0], passwd_user[2], 2000) + ent_list.add_group(passwd_group[0], passwd_group[2]) + create_ldap_entries(ldap_conn, ent_list) + + # Make sure the negative cache would expire if global timeout was used + time.sleep(2) + + # The user is now negatively cached and can't be resolved by either + # name or UID + res, _ = call_sssd_getpwnam(passwd_group[0]) + assert res == NssReturnCode.NOTFOUND + res, _ = call_sssd_getpwuid(passwd_group[2]) + assert res == NssReturnCode.NOTFOUND + + res, _ = call_sssd_getgrnam(passwd_group[0]) + assert res == NssReturnCode.NOTFOUND + res, _ = call_sssd_getgrgid(passwd_group[2]) + assert res == NssReturnCode.NOTFOUND + + cleanup_ldap_entries(ldap_conn, ent_list) + + +@pytest.fixture +def usr_and_grp_rfc2307_no_local_ncache(request, find_local_user_and_group, + ldap_conn): + """ + Create an RFC2307 directory fixture with interactive SSSD conf, + one user and one group but with the local negative timeout + disabled + """ + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user", 1001, 2000) + ent_list.add_group("group", 2001) + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_interactive_conf(ldap_conn, SCHEMA_RFC2307) + \ + unindent(""" + [nss] + local_negative_timeout = 0 + """) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return find_local_user_and_group + + +def test_local_negative_timeout_disabled(ldap_conn, + usr_and_grp_rfc2307_no_local_ncache): + """ + Test that with the local negative cache disabled, a user who is in both + LDAP and files can be resolved once the negative cache expires + """ + # sanity check - try resolving an LDAP user + ent.assert_passwd_by_name("user", dict(name="user", uid=1001, gid=2000)) + + passwd_user, passwd_group = usr_and_grp_rfc2307_no_local_ncache + + # resolve a user who is not in LDAP, but exists locally + res, _ = call_sssd_getpwnam(passwd_user[0]) + assert res == NssReturnCode.NOTFOUND + # Do the same by UID + res, _ = call_sssd_getpwuid(passwd_user[2]) + assert res == NssReturnCode.NOTFOUND + + # Do the same for a group both by name and by ID + res, _ = call_sssd_getgrnam(passwd_group[0]) + assert res == NssReturnCode.NOTFOUND + res, _ = call_sssd_getgrgid(passwd_group[2]) + assert res == NssReturnCode.NOTFOUND + + # add the user and the group to LDAP + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user(passwd_user[0], passwd_user[2], 2000) + ent_list.add_group(passwd_group[0], passwd_group[2]) + create_ldap_entries(ldap_conn, ent_list) + + # Make sure the negative cache expired + time.sleep(2) + + # The user can now be resolved + res, _ = call_sssd_getpwnam(passwd_user[0]) + assert res == NssReturnCode.SUCCESS + # Do the same by UID + res, _ = call_sssd_getpwuid(passwd_user[2]) + assert res == NssReturnCode.SUCCESS + + res, _ = call_sssd_getgrnam(passwd_group[0]) + assert res == NssReturnCode.SUCCESS + res, _ = call_sssd_getgrgid(passwd_group[2]) + assert res == NssReturnCode.SUCCESS + + cleanup_ldap_entries(ldap_conn, ent_list) + + +def users_with_email_setup(request, ldap_conn, cache_first): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + ent_list.add_user("user1", 1001, 2001, mail="user1.email@LDAP") + + ent_list.add_user("emailuser", 1002, 2002) + ent_list.add_user("emailuser2", 1003, 2003, mail="emailuser@LDAP") + + ent_list.add_user("userx", 1004, 2004, mail="userxy@LDAP") + ent_list.add_user("usery", 1005, 2005, mail="userxy@LDAP") + + create_ldap_fixture(request, ldap_conn, ent_list) + + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307_BIS) + + conf += unindent(""" + [nss] + cache_first = {0} + """).format(str(cache_first)) + + create_conf_fixture(request, conf) + create_sssd_fixture(request) + + +@pytest.mark.parametrize('cache_first', [True, False]) +def test_lookup_by_email(request, ldap_conn, cache_first): + """ + Test the simple case of looking up a user by e-mail + """ + users_with_email_setup(request, ldap_conn, cache_first) + ent.assert_passwd_by_name("user1.email@LDAP", + dict(name="user1", uid=1001, gid=2001)) + + +@pytest.mark.parametrize('cache_first', [True, False]) +def test_conflicting_mail_addresses_and_fqdn(request, ldap_conn, cache_first): + """ + Test that we handle the case where one user's mail address is the + same as another user's FQDN + + This is a regression test for https://github.com/SSSD/sssd/issues/4630 + """ + users_with_email_setup(request, ldap_conn, cache_first) + # With #3607 unfixed, these two lookups would prime the cache with + # nameAlias: emailuser@LDAP for both entries.. + ent.assert_passwd_by_name("emailuser@LDAP", + dict(name="emailuser", uid=1002, gid=2002)) + ent.assert_passwd_by_name("emailuser2@LDAP", + dict(name="emailuser2", uid=1003, gid=2003)) + + # ..and subsequently, emailuser would not be returned because the cache + # lookup would have had returned two entries which is an error + ent.assert_passwd_by_name("emailuser@LDAP", + dict(name="emailuser", uid=1002, gid=2002)) + ent.assert_passwd_by_name("emailuser2@LDAP", + dict(name="emailuser2", uid=1003, gid=2003)) + + +@pytest.mark.parametrize('cache_first', [True, False]) +def test_conflicting_mail_addresses(request, ldap_conn, cache_first): + """ + Negative test: looking up a user by e-mail which belongs to more than + one account fails in the back end. + """ + users_with_email_setup(request, ldap_conn, cache_first) + with pytest.raises(KeyError): + pwd.getpwnam("userxy@LDAP") + + # However resolving the users on their own must work + ent.assert_passwd_by_name("userx", dict(name="userx", uid=1004, gid=2004)) + ent.assert_passwd_by_name("usery", dict(name="usery", uid=1005, gid=2005)) |