diff options
Diffstat (limited to 'python/samba/tests/kcc/ldif_import_export.py')
-rw-r--r-- | python/samba/tests/kcc/ldif_import_export.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/python/samba/tests/kcc/ldif_import_export.py b/python/samba/tests/kcc/ldif_import_export.py new file mode 100644 index 0000000..9e573bf --- /dev/null +++ b/python/samba/tests/kcc/ldif_import_export.py @@ -0,0 +1,240 @@ +# Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export. +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.kcc.ldif_import_export""" + +import samba +import os +import time +import subprocess +import logging +import samba.tests +from samba.kcc import ldif_import_export, KCC +from samba import ldb +from samba.dcerpc import misc + + +from samba.param import LoadParm +from samba.credentials import Credentials +from samba.samdb import SamDB + +unix_now = int(time.time()) + +MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'], + "testdata/ldif-utils-test-multisite.ldif") + + +# UNCONNECTED_LDIF is a single site, unconnected 5DC database that was +# created using samba-tool domain join in testenv. +UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'], + "testdata/unconnected-intrasite.ldif") + +MULTISITE_LDIF_DSAS = ( + ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-4"), + ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-4"), + ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-3"), + ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-5"), + ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-5"), + ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Default-First-Site-Name"), +) + + +class LdifImportExportTests(samba.tests.TestCaseInTempDir): + def setUp(self): + super().setUp() + self.lp = LoadParm() + self.creds = Credentials() + self.creds.guess(self.lp) + + def remove_files(self, *files): + for f in files: + assert(f.startswith(self.tempdir)) + os.unlink(f) + + def test_write_search_url(self): + pass + + def test_ldif_to_samdb(self): + dburl = os.path.join(self.tempdir, "ldap") + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF) + self.assertIsInstance(samdb, SamDB) + + dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," + "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + + ntds_guid = misc.GUID(samdb.get_ntds_GUID()) + self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) + + service_name_res = samdb.search(base="", + scope=ldb.SCOPE_BASE, + attrs=["dsServiceName"]) + dn = ldb.Dn(samdb, + service_name_res[0]["dsServiceName"][0].decode('utf8')) + self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) + self.remove_files(dburl) + + def test_ldif_to_samdb_forced_local_dsa(self): + for dsa, site in MULTISITE_LDIF_DSAS: + dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa" + "-%s" % dsa) + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF, + forced_local_dsa=dsa) + self.assertIsInstance(samdb, SamDB) + self.assertEqual(samdb.server_site_name(), site) + + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + + ntds_guid = misc.GUID(samdb.get_ntds_GUID()) + self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) + + service_name_res = samdb.search(base="", + scope=ldb.SCOPE_BASE, + attrs=["dsServiceName"]) + dn = ldb.Dn(samdb, + service_name_res[0]["dsServiceName"][0].decode('utf8')) + self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) + self.remove_files(dburl) + + def test_samdb_to_ldif_file(self): + dburl = os.path.join(self.tempdir, "ldap") + dburl2 = os.path.join(self.tempdir, "ldap_roundtrip") + ldif_file = os.path.join(self.tempdir, "ldif") + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF) + self.assertIsInstance(samdb, SamDB) + ldif_import_export.samdb_to_ldif_file(samdb, dburl, + lp=self.lp, creds=None, + ldif_file=ldif_file) + self.assertGreater(os.path.getsize(ldif_file), 1000, + "LDIF should be larger than 1000 bytes") + samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp, + ldif_file) + self.assertIsInstance(samdb, SamDB) + dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," + "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + self.remove_files(dburl) + self.remove_files(dburl2) + self.remove_files(ldif_file) + + +class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir): + def setUp(self): + super().setUp() + self.lp = LoadParm() + self.creds = Credentials() + self.creds.guess(self.lp) + + def remove_files(self, *files): + for f in files: + assert(f.startswith(self.tempdir)) + os.unlink(f) + + def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None): + # Note that setting read-only to False won't affect the ldif, + # only the temporary database that is created from it. + my_kcc = KCC(unix_now, readonly=readonly, verify=verify, + dot_file_dir=dot_file_dir) + tmpdb = os.path.join(self.tempdir, 'tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + self.remove_files(tmpdb) + return my_kcc + + def test_list_dsas(self): + my_kcc = self._get_kcc('test-list') + dsas = set(my_kcc.list_dsas()) + expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS) + self.assertEqual(dsas, expected_dsas) + + def test_verify(self): + """Check that the KCC generates graphs that pass its own verify + option. + """ + my_kcc = self._get_kcc('test-verify', verify=True) + tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + self.remove_files(tmpdb) + + def test_unconnected_db(self): + """Check that the KCC generates errors on a unconnected db + """ + my_kcc = self._get_kcc('test-verify', verify=True) + tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF) + + try: + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + except samba.kcc.graph_utils.GraphError: + pass + except Exception: + self.fail("Did not expect this error.") + finally: + self.remove_files(tmpdb) + + def test_dotfiles(self): + """Check that KCC writes dot_files when asked. + """ + my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir) + tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb') + files = [tmpdb] + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + + dot = '/usr/bin/dot' + for fn in os.listdir(self.tempdir): + if fn.endswith('.dot'): + ffn = os.path.join(self.tempdir, fn) + if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0: + r = subprocess.call([dot, '-Tcanon', ffn]) + self.assertEqual(r, 0) + + # even if dot is not there, at least check the file is non-empty + size = os.stat(ffn).st_size + self.assertNotEqual(size, 0) + files.append(ffn) + + self.remove_files(*files) |