summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/kcc
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/tests/kcc')
-rw-r--r--python/samba/tests/kcc/__init__.py90
-rw-r--r--python/samba/tests/kcc/graph.py67
-rw-r--r--python/samba/tests/kcc/graph_utils.py165
-rw-r--r--python/samba/tests/kcc/kcc_utils.py393
-rw-r--r--python/samba/tests/kcc/ldif_import_export.py240
5 files changed, 955 insertions, 0 deletions
diff --git a/python/samba/tests/kcc/__init__.py b/python/samba/tests/kcc/__init__.py
new file mode 100644
index 0000000..31354f0
--- /dev/null
+++ b/python/samba/tests/kcc/__init__.py
@@ -0,0 +1,90 @@
+# Unix SMB/CIFS implementation. Tests for samba.kcc core.
+# Copyright (C) Andrew Bartlett 2015
+#
+# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.kcc"""
+
+import samba
+import os
+import time
+from tempfile import mkdtemp
+
+import samba.tests
+from samba import kcc
+from samba import ldb
+from samba.dcerpc import misc
+
+
+from samba.param import LoadParm
+from samba.credentials import Credentials
+from samba.samdb import SamDB
+
+unix_now = int(time.time())
+unix_once_upon_a_time = 1000000000 # 2001-09-09
+
+ENV_DSAS = {
+ 'ad_dc_ntvfs': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
+ 'fl2000dc': ['CN=DC5,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2000,DC=example,DC=com'],
+ 'fl2003dc': ['CN=DC6,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2003,DC=example,DC=com'],
+ 'fl2008r2dc': ['CN=DC7,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2008r2,DC=example,DC=com'],
+ 'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
+ 'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
+ 'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
+ 'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
+}
+
+
+class KCCTests(samba.tests.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.lp = LoadParm()
+ self.creds = Credentials()
+ self.creds.guess(self.lp)
+ self.creds.set_username(os.environ["USERNAME"])
+ self.creds.set_password(os.environ["PASSWORD"])
+
+ def test_list_dsas(self):
+ my_kcc = kcc.KCC(unix_now, False, False, False, False)
+ my_kcc.load_samdb("ldap://%s" % os.environ["SERVER"],
+ self.lp, self.creds)
+ try:
+ dsas = my_kcc.list_dsas()
+ except kcc.KCCError as e:
+ self.fail("kcc.list_dsas failed with %s" % e)
+ env = os.environ['TEST_ENV']
+ for expected_dsa in ENV_DSAS[env]:
+ self.assertIn(expected_dsa, dsas)
+
+ def test_verify(self):
+ """check that the KCC generates graphs that pass its own verify
+ option. This is not a spectacular achievement when there are
+ only a couple of nodes to connect, but it shows something.
+ """
+ my_kcc = kcc.KCC(unix_now, readonly=True, verify=True,
+ debug=False, dot_file_dir=None)
+
+ # As this is flapping with errors under python3, we catch
+ # exceptions and turn them into failures..
+ try:
+ my_kcc.run("ldap://%s" % os.environ["SERVER"],
+ self.lp, self.creds,
+ attempt_live_connections=False)
+ except (samba.kcc.graph_utils.GraphError, kcc.KCCError):
+ import traceback
+ traceback.print_exc()
+ self.fail()
diff --git a/python/samba/tests/kcc/graph.py b/python/samba/tests/kcc/graph.py
new file mode 100644
index 0000000..b581158
--- /dev/null
+++ b/python/samba/tests/kcc/graph.py
@@ -0,0 +1,67 @@
+# Unix SMB/CIFS implementation. Tests for kcc.graph routines
+# Copyright (C) Andrew Bartlett 2015
+#
+# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.kcc.graph"""
+
+import samba
+import samba.tests
+from samba.kcc.graph import total_schedule, convert_schedule_to_repltimes
+
+def ntdsconn_schedule(times):
+ if times is None:
+ return None
+ from samba.dcerpc import drsblobs
+ schedule = drsblobs.schedule()
+ schedule.size = 188
+ schedule.bandwidth = 0
+ schedule.numberOfSchedules = 1
+ header = drsblobs.scheduleHeader()
+ header.type = 0
+ header.offset = 20
+ schedule.headerArray = [header]
+ data = drsblobs.scheduleSlots()
+ data.slots = times
+ schedule.dataArray = [data]
+ return schedule
+
+
+class GraphFunctionTests(samba.tests.TestCase):
+
+ def test_total_schedule(self):
+ schedule = [0x81] * 84
+ for schedule, total in (
+ ([0x81] * 84, 168),
+ ([0xff] * 84, 84 * 8),
+ ([0xaa] * 84, 84 * 4),
+ ([0x03, 0x33] * 42, 42 * 6),
+ (list(range(7)) * 12, 12 * 9),
+ (list(range(4)) * 21, 21 * 4)):
+ self.assertEqual(total_schedule(schedule), total)
+
+ def test_convert_schedule_to_repltimes(self):
+ for ntdsconn_times, repltimes in (
+ ([0x01] * 168, [0x11] * 84),
+ (None, [0x11] * 84),
+ ([0x06] * 168, [0x66] * 84),
+ ([0x03, 0xa] * 84, [0x3a] * 84),
+ (list(range(7)) * 24,
+ [0x01, 0x23, 0x45, 0x60, 0x12, 0x34, 0x56] * 12)):
+ schedule = ntdsconn_schedule(ntdsconn_times)
+ self.assertEqual(convert_schedule_to_repltimes(schedule),
+ repltimes)
diff --git a/python/samba/tests/kcc/graph_utils.py b/python/samba/tests/kcc/graph_utils.py
new file mode 100644
index 0000000..3eaa1c7
--- /dev/null
+++ b/python/samba/tests/kcc/graph_utils.py
@@ -0,0 +1,165 @@
+# Unix SMB/CIFS implementation. Tests for kcc.graph_utils routines
+# Copyright (C) Andrew Bartlett 2015
+#
+# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.kcc.graph_utils"""
+
+import samba
+import samba.tests
+from samba.kcc.graph_utils import GraphError
+from samba.kcc.graph_utils import (verify_graph_complete,
+ verify_graph_connected,
+ verify_graph_connected_under_edge_failures,
+ verify_graph_forest,
+ verify_graph_connected_under_vertex_failures,
+ verify_graph_no_lonely_vertices)
+
+import itertools
+
+
+def make_tree(vertices):
+ if len(vertices) < 2:
+ return ()
+ remaining = set(vertices)
+ used = set()
+ edges = set()
+ used.add(remaining.pop())
+ used.add(remaining.pop())
+ edges.add(tuple(used))
+ while remaining:
+ v = remaining.pop()
+ w = used.pop()
+ e = (w, v)
+ edges.add(e)
+ used.update(e)
+ return tuple(edges)
+
+# TODO: test directed graphs
+
+
+class UndirectedGraphTests(samba.tests.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ vertices = tuple('abcdefgh')
+ vertices2 = tuple('ijk')
+ edges = tuple(itertools.combinations(vertices, 2))
+ edges2 = tuple(itertools.combinations(vertices2, 2))
+ line_edges = list(zip(vertices[1:], vertices[:-1]))
+ ring_edges = line_edges + [(vertices[0], vertices[-1])]
+
+ tree = make_tree(vertices)
+ tree2 = make_tree(vertices2)
+
+ self.complete_graph = [edges, vertices, vertices]
+
+ self.disconnected_clusters = [edges + edges2,
+ vertices + vertices2,
+ vertices + vertices2]
+
+ self.graph_with_unreachables = [edges,
+ vertices + vertices2,
+ vertices]
+
+ self.ring = [ring_edges, vertices, vertices]
+ self.line = [line_edges, vertices, vertices]
+
+ self.tree = [tree, vertices, vertices]
+ self.forest = [tree + tree2,
+ vertices + vertices2,
+ vertices + vertices2]
+
+ self.unconnected_graph = ((), vertices, ())
+
+ def assertGraphError(self, fn, *args):
+ return self.assertRaises(GraphError, fn, *args)
+
+ def test_graph_complete(self):
+ fn = verify_graph_complete
+
+ self.assertGraphError(fn, *self.disconnected_clusters)
+ self.assertGraphError(fn, *self.graph_with_unreachables)
+ self.assertGraphError(fn, *self.ring)
+ self.assertGraphError(fn, *self.tree)
+
+ self.assertIsNone(fn(*self.complete_graph))
+
+ def test_graph_connected(self):
+ fn = verify_graph_connected
+
+ self.assertGraphError(fn, *self.disconnected_clusters)
+ self.assertGraphError(fn, *self.graph_with_unreachables)
+ self.assertGraphError(fn, *self.forest)
+ self.assertGraphError(fn, *self.unconnected_graph)
+
+ self.assertIsNone(fn(*self.line))
+ self.assertIsNone(fn(*self.ring))
+ self.assertIsNone(fn(*self.complete_graph))
+ self.assertIsNone(fn(*self.tree))
+
+ def test_graph_forest(self):
+ fn = verify_graph_forest
+
+ self.assertGraphError(fn, *self.disconnected_clusters)
+ self.assertGraphError(fn, *self.graph_with_unreachables)
+ self.assertGraphError(fn, *self.ring)
+
+ self.assertIsNone(fn(*self.line))
+ self.assertIsNone(fn(*self.tree))
+ self.assertIsNone(fn(*self.forest))
+ self.assertIsNone(fn(*self.unconnected_graph))
+
+ def test_graph_connected_under_edge_failures(self):
+ fn = verify_graph_connected_under_edge_failures
+
+ self.assertGraphError(fn, *self.line)
+ self.assertGraphError(fn, *self.tree)
+ self.assertGraphError(fn, *self.forest)
+ self.assertGraphError(fn, *self.disconnected_clusters)
+
+ self.assertIsNone(fn(*self.ring))
+ self.assertIsNone(fn(*self.complete_graph))
+
+ def test_graph_connected_under_vertex_failures(self):
+ # XXX no tests to distinguish this from the edge_failures case
+ fn = verify_graph_connected_under_vertex_failures
+
+ self.assertGraphError(fn, *self.line)
+ self.assertGraphError(fn, *self.tree)
+ self.assertGraphError(fn, *self.forest)
+ self.assertGraphError(fn, *self.disconnected_clusters)
+
+ self.assertIsNone(fn(*self.ring))
+ self.assertIsNone(fn(*self.complete_graph))
+
+ def test_graph_multi_edge_forest(self):
+ pass
+
+ def test_graph_no_lonely_vertices(self):
+ fn = verify_graph_no_lonely_vertices
+ self.assertGraphError(fn, *self.unconnected_graph)
+ self.assertGraphError(fn, *self.graph_with_unreachables)
+
+ self.assertIsNone(fn(*self.ring))
+ self.assertIsNone(fn(*self.complete_graph))
+ self.assertIsNone(fn(*self.line))
+ self.assertIsNone(fn(*self.tree))
+ self.assertIsNone(fn(*self.forest))
+
+ def test_graph_no_unknown_vertices(self):
+ pass
diff --git a/python/samba/tests/kcc/kcc_utils.py b/python/samba/tests/kcc/kcc_utils.py
new file mode 100644
index 0000000..c1af998
--- /dev/null
+++ b/python/samba/tests/kcc/kcc_utils.py
@@ -0,0 +1,393 @@
+# Unix SMB/CIFS implementation. Tests for samba.kcc.kcc_utils.
+# Copyright (C) Andrew Bartlett 2015
+#
+# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.kcc.kcc_utils"""
+import samba
+import samba.tests
+from samba.kcc.kcc_utils import new_connection_schedule, drsblobs
+from samba.kcc.kcc_utils import uncovered_sites_to_cover
+from samba.credentials import Credentials
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.tests import delete_force
+
+
+class ScheduleTests(samba.tests.TestCase):
+
+ def test_new_connection_schedule(self):
+ schedule = new_connection_schedule()
+ self.assertIsInstance(schedule, drsblobs.schedule)
+ self.assertEqual(schedule.size, 188)
+ self.assertEqual(len(schedule.dataArray[0].slots), 168)
+
+
+# OK, this is pathetic, but the rest of it looks really hard, with the
+# classes all intertwingled with each other and the samdb. That is to say:
+# XXX later.
+
+class SiteCoverageTests(samba.tests.TestCase):
+
+ def setUp(self):
+ self.prefix = "kcc_"
+ self.lp = samba.tests.env_loadparm()
+
+ self.sites = {}
+ self.site_links = {}
+
+ self.creds = Credentials()
+ self.creds.guess(self.lp)
+ self.session = system_session()
+
+ self.samdb = SamDB(session_info=self.session,
+ credentials=self.creds,
+ lp=self.lp)
+
+ def tearDown(self):
+ self.samdb.transaction_start()
+
+ for site in self.sites:
+ delete_force(self.samdb, site, controls=['tree_delete:1'])
+
+ for site_link in self.site_links:
+ delete_force(self.samdb, site_link)
+
+ self.samdb.transaction_commit()
+
+ def _add_server(self, name, site):
+ dn = "CN={0},CN=Servers,{1}".format(name, site)
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "server",
+ "serverReference": self.samdb.domain_dn()
+ })
+ return dn
+
+ def _add_site(self, name):
+ dn = "CN={0},CN=Sites,{1}".format(
+ name, self.samdb.get_config_basedn()
+ )
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "site"
+ })
+ self.samdb.add({
+ "dn": "CN=Servers," + dn,
+ "objectClass": ["serversContainer"]
+ })
+
+ self.sites[dn] = name
+ return dn, name.lower()
+
+ def _add_site_link(self, name, links=None, cost=100):
+ if links is None:
+ links = []
+ dn = "CN={0},CN=IP,CN=Inter-Site Transports,CN=Sites,{1}".format(
+ name, self.samdb.get_config_basedn()
+ )
+ self.samdb.add({
+ "dn": dn,
+ "objectClass": "siteLink",
+ "cost": str(cost),
+ "siteList": links
+ })
+ self.site_links[dn] = name
+ return dn
+
+ def test_single_site_link_same_dc_count(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+
+ self._add_site_link(self.prefix + "link",
+ [site1, site2, uncovered_dn])
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ def test_single_site_link_different_dc_count(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "ABCD" + '2', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "BCDE" + '2', site2)
+ self._add_server(self.prefix + "BCDE" + '3', site2)
+
+ self._add_site_link(self.prefix + "link",
+ [site1, site2, uncovered_dn])
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ def test_two_site_links_same_cost(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "ABCD" + '2', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "BCDE" + '2', site2)
+ self._add_server(self.prefix + "BCDE" + '3', site2)
+
+ self._add_site_link(self.prefix + "link1",
+ [site1, uncovered_dn])
+ self._add_site_link(self.prefix + "link2",
+ [site2, uncovered_dn])
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ def test_two_site_links_different_costs(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "BCDE" + '2', site2)
+
+ self._add_site_link(self.prefix + "link1",
+ [site1, uncovered_dn],
+ cost=50)
+ self._add_site_link(self.prefix + "link2",
+ [site2, uncovered_dn],
+ cost=75)
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ def test_three_site_links_different_costs(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+ site3, name3 = self._add_site(self.prefix + "CDEF")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "CDEF" + '1', site3)
+ self._add_server(self.prefix + "CDEF" + '2', site3)
+
+ self._add_site_link(self.prefix + "link1",
+ [site1, uncovered_dn],
+ cost=50)
+ self._add_site_link(self.prefix + "link2",
+ [site2, uncovered_dn],
+ cost=75)
+ self._add_site_link(self.prefix + "link3",
+ [site3, uncovered_dn],
+ cost=60)
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name3)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ def test_three_site_links_duplicate_costs(self):
+ # two of the links have the same cost; the other is higher
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+ site3, name3 = self._add_site(self.prefix + "CDEF")
+
+ uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered")
+
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "CDEF" + '1', site3)
+ self._add_server(self.prefix + "CDEF" + '2', site3)
+
+ self._add_site_link(self.prefix + "link1",
+ [site1, uncovered_dn],
+ cost=50)
+ self._add_site_link(self.prefix + "link2",
+ [site2, uncovered_dn],
+ cost=75)
+ self._add_site_link(self.prefix + "link3",
+ [site3, uncovered_dn],
+ cost=50)
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name3)
+ to_cover.sort()
+
+ self.assertEqual([uncovered], to_cover)
+
+ def test_complex_setup_with_multiple_uncovered_sites(self):
+ self.samdb.transaction_start()
+ site1, name1 = self._add_site(self.prefix + "ABCD")
+ site2, name2 = self._add_site(self.prefix + "BCDE")
+ site3, name3 = self._add_site(self.prefix + "CDEF")
+
+ site4, name4 = self._add_site(self.prefix + "1234")
+ site5, name5 = self._add_site(self.prefix + "2345")
+ site6, name6 = self._add_site(self.prefix + "3456")
+
+ uncovered_dn1, uncovered1 = self._add_site(self.prefix + "uncovered1")
+ uncovered_dn2, uncovered2 = self._add_site(self.prefix + "uncovered2")
+ uncovered_dn3, uncovered3 = self._add_site(self.prefix + "uncovered3")
+
+ # Site Link Cluster 1 - Server List
+ self._add_server(self.prefix + "ABCD" + '1', site1)
+
+ self._add_server(self.prefix + "BCDE" + '1', site2)
+ self._add_server(self.prefix + "BCDE" + '2', site2)
+
+ self._add_server(self.prefix + "CDEF" + '1', site3)
+ self._add_server(self.prefix + "CDEF" + '2', site3)
+ self._add_server(self.prefix + "CDEF" + '3', site3)
+
+ # Site Link Cluster 2 - Server List
+ self._add_server(self.prefix + "1234" + '1', site4)
+ self._add_server(self.prefix + "1234" + '2', site4)
+
+ self._add_server(self.prefix + "2345" + '1', site5)
+ self._add_server(self.prefix + "2345" + '2', site5)
+
+ self._add_server(self.prefix + "3456" + '1', site6)
+
+ # Join to Uncovered1 (preference to site link cluster 1)
+ self._add_site_link(self.prefix + "link1A",
+ [site1, site2, site3, uncovered_dn1],
+ cost=49)
+ self._add_site_link(self.prefix + "link2A",
+ [site4, site5, site6, uncovered_dn1],
+ cost=50)
+
+ # Join to Uncovered2 (no preferene on site links)
+ self._add_site_link(self.prefix + "link1B",
+ [site1, site2, site3, uncovered_dn2],
+ cost=50)
+ self._add_site_link(self.prefix + "link2B",
+ [site4, site5, site6, uncovered_dn2],
+ cost=50)
+
+ # Join to Uncovered3 (preference to site link cluster 2)
+ self._add_site_link(self.prefix + "link1C",
+ [site1, site2, site3, uncovered_dn3],
+ cost=50)
+ self._add_site_link(self.prefix + "link2C",
+ [site4, site5, site6, uncovered_dn3],
+ cost=49)
+
+ self.samdb.transaction_commit()
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name1)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name2)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name3)
+ to_cover.sort()
+
+ self.assertEqual([uncovered1, uncovered2], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name4)
+ to_cover.sort()
+
+ self.assertEqual([uncovered2, uncovered3], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name5)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ to_cover = uncovered_sites_to_cover(self.samdb, name6)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
+
+ for to_check in [uncovered1, uncovered2, uncovered3]:
+ to_cover = uncovered_sites_to_cover(self.samdb, to_check)
+ to_cover.sort()
+
+ self.assertEqual([], to_cover)
diff --git a/python/samba/tests/kcc/ldif_import_export.py b/python/samba/tests/kcc/ldif_import_export.py
new file mode 100644
index 0000000..9e573bf
--- /dev/null
+++ b/python/samba/tests/kcc/ldif_import_export.py
@@ -0,0 +1,240 @@
+# Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
+# Copyright (C) Andrew Bartlett 2015
+#
+# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for samba.kcc.ldif_import_export"""
+
+import samba
+import os
+import time
+import subprocess
+import logging
+import samba.tests
+from samba.kcc import ldif_import_export, KCC
+from samba import ldb
+from samba.dcerpc import misc
+
+
+from samba.param import LoadParm
+from samba.credentials import Credentials
+from samba.samdb import SamDB
+
+unix_now = int(time.time())
+
+MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
+ "testdata/ldif-utils-test-multisite.ldif")
+
+
+# UNCONNECTED_LDIF is a single site, unconnected 5DC database that was
+# created using samba-tool domain join in testenv.
+UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
+ "testdata/unconnected-intrasite.ldif")
+
+MULTISITE_LDIF_DSAS = (
+ ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-4"),
+ ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-4"),
+ ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-3"),
+ ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-5"),
+ ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-5"),
+ ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-2"),
+ ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-2"),
+ ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-2"),
+ ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Site-2"),
+ ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
+ "Default-First-Site-Name"),
+)
+
+
+class LdifImportExportTests(samba.tests.TestCaseInTempDir):
+ def setUp(self):
+ super().setUp()
+ self.lp = LoadParm()
+ self.creds = Credentials()
+ self.creds.guess(self.lp)
+
+ def remove_files(self, *files):
+ for f in files:
+ assert(f.startswith(self.tempdir))
+ os.unlink(f)
+
+ def test_write_search_url(self):
+ pass
+
+ def test_ldif_to_samdb(self):
+ dburl = os.path.join(self.tempdir, "ldap")
+ samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
+ MULTISITE_LDIF)
+ self.assertIsInstance(samdb, SamDB)
+
+ dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
+ "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
+ res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
+ scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+
+ ntds_guid = misc.GUID(samdb.get_ntds_GUID())
+ self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
+
+ service_name_res = samdb.search(base="",
+ scope=ldb.SCOPE_BASE,
+ attrs=["dsServiceName"])
+ dn = ldb.Dn(samdb,
+ service_name_res[0]["dsServiceName"][0].decode('utf8'))
+ self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+ self.remove_files(dburl)
+
+ def test_ldif_to_samdb_forced_local_dsa(self):
+ for dsa, site in MULTISITE_LDIF_DSAS:
+ dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
+ "-%s" % dsa)
+ samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
+ MULTISITE_LDIF,
+ forced_local_dsa=dsa)
+ self.assertIsInstance(samdb, SamDB)
+ self.assertEqual(samdb.server_site_name(), site)
+
+ res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
+ scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+
+ ntds_guid = misc.GUID(samdb.get_ntds_GUID())
+ self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
+
+ service_name_res = samdb.search(base="",
+ scope=ldb.SCOPE_BASE,
+ attrs=["dsServiceName"])
+ dn = ldb.Dn(samdb,
+ service_name_res[0]["dsServiceName"][0].decode('utf8'))
+ self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+ self.remove_files(dburl)
+
+ def test_samdb_to_ldif_file(self):
+ dburl = os.path.join(self.tempdir, "ldap")
+ dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
+ ldif_file = os.path.join(self.tempdir, "ldif")
+ samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
+ MULTISITE_LDIF)
+ self.assertIsInstance(samdb, SamDB)
+ ldif_import_export.samdb_to_ldif_file(samdb, dburl,
+ lp=self.lp, creds=None,
+ ldif_file=ldif_file)
+ self.assertGreater(os.path.getsize(ldif_file), 1000,
+ "LDIF should be larger than 1000 bytes")
+ samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
+ ldif_file)
+ self.assertIsInstance(samdb, SamDB)
+ dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
+ "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
+ res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
+ scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+ self.remove_files(dburl)
+ self.remove_files(dburl2)
+ self.remove_files(ldif_file)
+
+
+class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
+ def setUp(self):
+ super().setUp()
+ self.lp = LoadParm()
+ self.creds = Credentials()
+ self.creds.guess(self.lp)
+
+ def remove_files(self, *files):
+ for f in files:
+ assert(f.startswith(self.tempdir))
+ os.unlink(f)
+
+ def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
+ # Note that setting read-only to False won't affect the ldif,
+ # only the temporary database that is created from it.
+ my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
+ dot_file_dir=dot_file_dir)
+ tmpdb = os.path.join(self.tempdir, 'tmpdb')
+ my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
+ self.remove_files(tmpdb)
+ return my_kcc
+
+ def test_list_dsas(self):
+ my_kcc = self._get_kcc('test-list')
+ dsas = set(my_kcc.list_dsas())
+ expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
+ self.assertEqual(dsas, expected_dsas)
+
+ def test_verify(self):
+ """Check that the KCC generates graphs that pass its own verify
+ option.
+ """
+ my_kcc = self._get_kcc('test-verify', verify=True)
+ tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
+ my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
+
+ my_kcc.run(None,
+ self.lp, self.creds,
+ attempt_live_connections=False)
+ self.remove_files(tmpdb)
+
+ def test_unconnected_db(self):
+ """Check that the KCC generates errors on a unconnected db
+ """
+ my_kcc = self._get_kcc('test-verify', verify=True)
+ tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
+ my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF)
+
+ try:
+ my_kcc.run(None,
+ self.lp, self.creds,
+ attempt_live_connections=False)
+ except samba.kcc.graph_utils.GraphError:
+ pass
+ except Exception:
+ self.fail("Did not expect this error.")
+ finally:
+ self.remove_files(tmpdb)
+
+ def test_dotfiles(self):
+ """Check that KCC writes dot_files when asked.
+ """
+ my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
+ tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
+ files = [tmpdb]
+ my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
+ my_kcc.run(None,
+ self.lp, self.creds,
+ attempt_live_connections=False)
+
+ dot = '/usr/bin/dot'
+ for fn in os.listdir(self.tempdir):
+ if fn.endswith('.dot'):
+ ffn = os.path.join(self.tempdir, fn)
+ if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
+ r = subprocess.call([dot, '-Tcanon', ffn])
+ self.assertEqual(r, 0)
+
+ # even if dot is not there, at least check the file is non-empty
+ size = os.stat(ffn).st_size
+ self.assertNotEqual(size, 0)
+ files.append(ffn)
+
+ self.remove_files(*files)