diff options
Diffstat (limited to 'src/tests/intg/test_resolver.py')
-rw-r--r-- | src/tests/intg/test_resolver.py | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/src/tests/intg/test_resolver.py b/src/tests/intg/test_resolver.py new file mode 100644 index 0000000..1b1a494 --- /dev/null +++ b/src/tests/intg/test_resolver.py @@ -0,0 +1,326 @@ +# +# Resolver integration test +# +# Authors: +# Samuel Cabrero <scabrero@suse.com> +# +# Copyright (C) 2019 SUSE LINUX GmbH, Nuernberg, Germany. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import stat +import signal +import subprocess +import time +import ldap +import pytest +import socket +import config +import ds_openldap +import ldap_ent +from util import unindent +from sssd_nss import NssReturnCode, HostError +from sssd_hosts import call_sssd_gethostbyname +from sssd_nets import call_sssd_getnetbyname, call_sssd_getnetbyaddr + +LDAP_BASE_DN = "dc=example,dc=com" + + +@pytest.fixture(scope="module") +def ds_inst(request): + """LDAP server instance fixture""" + ds_inst = ds_openldap.DSOpenLDAP( + config.PREFIX, 10389, LDAP_BASE_DN, + "cn=admin", "Secret123" + ) + + try: + ds_inst.setup() + except Exception: + ds_inst.teardown() + raise + request.addfinalizer(ds_inst.teardown) + return ds_inst + + +@pytest.fixture(scope="module") +def ldap_conn(request, ds_inst): + """LDAP server connection fixture""" + ldap_conn = ds_inst.bind() + ldap_conn.ds_inst = ds_inst + request.addfinalizer(ldap_conn.unbind_s) + return ldap_conn + + +def create_ldap_entries(ldap_conn, ent_list=None): + """Add LDAP entries from ent_list""" + if ent_list is not None: + for entry in ent_list: + ldap_conn.add_s(entry[0], entry[1]) + + +def cleanup_ldap_entries(ldap_conn, ent_list=None): + """Remove LDAP entries added by create_ldap_entries""" + if ent_list is None: + for ou in ("Users", "Groups", "Netgroups", "Services", "Policies", + "Hosts", "Networks"): + for entry in ldap_conn.search_s(f"ou={ou}," + f"{ldap_conn.ds_inst.base_dn}", + ldap.SCOPE_ONELEVEL, + attrlist=[]): + ldap_conn.delete_s(entry[0]) + else: + for entry in ent_list: + ldap_conn.delete_s(entry[0]) + + +def create_ldap_cleanup(request, ldap_conn, ent_list=None): + """Add teardown for removing all user/group LDAP entries""" + request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) + + +def create_ldap_fixture(request, ldap_conn, ent_list=None): + """Add LDAP entries and add teardown for removing them""" + create_ldap_entries(ldap_conn, ent_list) + create_ldap_cleanup(request, ldap_conn, ent_list) + + +SCHEMA_RFC2307 = "rfc2307" +SCHEMA_RFC2307_BIS = "rfc2307bis" + + +def format_basic_conf(ldap_conn, schema): + """Format a basic SSSD configuration""" + schema_conf = "ldap_schema = " + schema + "\n" + if schema == SCHEMA_RFC2307_BIS: + schema_conf += "ldap_group_object_class = groupOfNames\n" + iphost_search_base = "ou=Hosts," + ldap_conn.ds_inst.base_dn + ipnetwork_search_base = "ou=Networks," + ldap_conn.ds_inst.base_dn + return unindent("""\ + [sssd] + debug_level = 0xffff + domains = LDAP + services = nss + + [nss] + debug_level = 0xffff + memcache_timeout = 0 + entry_negative_timeout = 1 + + [domain/LDAP] + ldap_auth_disable_tls_never_use_in_production = true + debug_level = 0xffff + {schema_conf} + id_provider = ldap + auth_provider = ldap + resolver_provider = ldap + ldap_uri = {ldap_conn.ds_inst.ldap_url} + ldap_search_base = {ldap_conn.ds_inst.base_dn} + ldap_iphost_search_base = {iphost_search_base} + ldap_ipnetwork_search_base = {ipnetwork_search_base} + """).format(**locals()) + + +def create_conf_file(contents): + """Create sssd.conf with specified contents""" + conf = open(config.CONF_PATH, "w") + conf.write(contents) + conf.close() + os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) + + +def cleanup_conf_file(): + """Remove sssd.conf, if it exists""" + if os.path.lexists(config.CONF_PATH): + os.unlink(config.CONF_PATH) + + +def create_conf_cleanup(request): + """Add teardown for removing sssd.conf""" + request.addfinalizer(cleanup_conf_file) + + +def create_conf_fixture(request, contents): + """ + Create sssd.conf with specified contents and add teardown for removing it + """ + create_conf_file(contents) + create_conf_cleanup(request) + + +def create_sssd_process(): + """Start the SSSD process""" + if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: + raise Exception("sssd start failed") + + +def get_sssd_pid(): + pid_file = open(config.PIDFILE_PATH, "r") + pid = int(pid_file.read()) + return pid + + +def cleanup_sssd_process(): + """Stop the SSSD process and remove its state""" + try: + pid = get_sssd_pid() + os.kill(pid, signal.SIGTERM) + while True: + try: + os.kill(pid, signal.SIGCONT) + except OSError: + break + time.sleep(1) + except OSError: + pass + for path in os.listdir(config.DB_PATH): + os.unlink(config.DB_PATH + "/" + path) + for path in os.listdir(config.MCACHE_PATH): + os.unlink(config.MCACHE_PATH + "/" + path) + + +def create_sssd_fixture(request): + """Start SSSD and add teardown for stopping it and removing its state""" + create_sssd_process() + create_sssd_cleanup(request) + + +def create_sssd_cleanup(request): + """Add teardown for stopping SSSD and removing its state""" + request.addfinalizer(cleanup_sssd_process) + + +@pytest.fixture +def add_hosts(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_host("host1", + aliases=["host1_alias1", "host1_alias2"], + addresses=["192.168.1.1", "192.168.1.2", + "2001:db8:1::1", "2001:db8:1::2"]) + ent_list.add_host("host2.example.com", + aliases=["host2_alias1.example.com", + "host2_alias2.example.com"], + addresses=["192.168.2.1", "192.168.2.2", + "2001:db8:2::1", "2001:db8:2::2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +@pytest.fixture +def add_nets(request, ldap_conn): + ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) + + ent_list.add_ipnet("net1", "192.168.1.1", + aliases=["net1_alias1", "net1_alias2"]) + ent_list.add_ipnet("net2", "10.2.2.2", + aliases=["net2_alias1", "net2_alias2"]) + + create_ldap_fixture(request, ldap_conn, ent_list) + conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) + create_conf_fixture(request, conf) + create_sssd_fixture(request) + return None + + +def test_hostbyname(add_hosts): + (res, hres, _) = call_sssd_gethostbyname("invalid") + + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_gethostbyname("example.com") + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_gethostbyname("invalid.example.com") + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_gethostbyname("host1") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_gethostbyname("host1_alias1") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_gethostbyname("host1_alias2") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_gethostbyname("host2.example.com") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_gethostbyname("host2_alias1.example.com") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_gethostbyname("host2_alias2.example.com") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + +def test_netbyname(add_nets): + (res, hres, _) = call_sssd_getnetbyname("invalid") + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_getnetbyname("net1") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyname("net1_alias1") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyname("net1_alias2") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyname("net2") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyname("net2_alias1") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyname("net2_alias2") + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + +def test_netbyaddr(add_nets): + (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.1", socket.AF_INET) + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.1", socket.AF_UNSPEC) + assert res == NssReturnCode.NOTFOUND + assert hres == HostError.HOST_NOT_FOUND + + (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_INET) + assert res == NssReturnCode.SUCCESS + assert hres == 0 + + (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_UNSPEC) + assert res == NssReturnCode.SUCCESS + assert hres == 0 |