From 8daa83a594a2e98f39d764422bfbdbc62c9efd44 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 19:20:00 +0200 Subject: Adding upstream version 2:4.20.0+dfsg. Signed-off-by: Daniel Baumann --- python/samba/tests/kcc/__init__.py | 90 ++++++ python/samba/tests/kcc/graph.py | 67 +++++ python/samba/tests/kcc/graph_utils.py | 165 +++++++++++ python/samba/tests/kcc/kcc_utils.py | 393 +++++++++++++++++++++++++++ python/samba/tests/kcc/ldif_import_export.py | 240 ++++++++++++++++ 5 files changed, 955 insertions(+) create mode 100644 python/samba/tests/kcc/__init__.py create mode 100644 python/samba/tests/kcc/graph.py create mode 100644 python/samba/tests/kcc/graph_utils.py create mode 100644 python/samba/tests/kcc/kcc_utils.py create mode 100644 python/samba/tests/kcc/ldif_import_export.py (limited to 'python/samba/tests/kcc') diff --git a/python/samba/tests/kcc/__init__.py b/python/samba/tests/kcc/__init__.py new file mode 100644 index 0000000..31354f0 --- /dev/null +++ b/python/samba/tests/kcc/__init__.py @@ -0,0 +1,90 @@ +# Unix SMB/CIFS implementation. Tests for samba.kcc core. +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.kcc""" + +import samba +import os +import time +from tempfile import mkdtemp + +import samba.tests +from samba import kcc +from samba import ldb +from samba.dcerpc import misc + + +from samba.param import LoadParm +from samba.credentials import Credentials +from samba.samdb import SamDB + +unix_now = int(time.time()) +unix_once_upon_a_time = 1000000000 # 2001-09-09 + +ENV_DSAS = { + 'ad_dc_ntvfs': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'], + 'fl2000dc': ['CN=DC5,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2000,DC=example,DC=com'], + 'fl2003dc': ['CN=DC6,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2003,DC=example,DC=com'], + 'fl2008r2dc': ['CN=DC7,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba2008r2,DC=example,DC=com'], + 'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com', + 'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'], + 'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com', + 'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'], +} + + +class KCCTests(samba.tests.TestCase): + def setUp(self): + super().setUp() + self.lp = LoadParm() + self.creds = Credentials() + self.creds.guess(self.lp) + self.creds.set_username(os.environ["USERNAME"]) + self.creds.set_password(os.environ["PASSWORD"]) + + def test_list_dsas(self): + my_kcc = kcc.KCC(unix_now, False, False, False, False) + my_kcc.load_samdb("ldap://%s" % os.environ["SERVER"], + self.lp, self.creds) + try: + dsas = my_kcc.list_dsas() + except kcc.KCCError as e: + self.fail("kcc.list_dsas failed with %s" % e) + env = os.environ['TEST_ENV'] + for expected_dsa in ENV_DSAS[env]: + self.assertIn(expected_dsa, dsas) + + def test_verify(self): + """check that the KCC generates graphs that pass its own verify + option. This is not a spectacular achievement when there are + only a couple of nodes to connect, but it shows something. + """ + my_kcc = kcc.KCC(unix_now, readonly=True, verify=True, + debug=False, dot_file_dir=None) + + # As this is flapping with errors under python3, we catch + # exceptions and turn them into failures.. + try: + my_kcc.run("ldap://%s" % os.environ["SERVER"], + self.lp, self.creds, + attempt_live_connections=False) + except (samba.kcc.graph_utils.GraphError, kcc.KCCError): + import traceback + traceback.print_exc() + self.fail() diff --git a/python/samba/tests/kcc/graph.py b/python/samba/tests/kcc/graph.py new file mode 100644 index 0000000..b581158 --- /dev/null +++ b/python/samba/tests/kcc/graph.py @@ -0,0 +1,67 @@ +# Unix SMB/CIFS implementation. Tests for kcc.graph routines +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.kcc.graph""" + +import samba +import samba.tests +from samba.kcc.graph import total_schedule, convert_schedule_to_repltimes + +def ntdsconn_schedule(times): + if times is None: + return None + from samba.dcerpc import drsblobs + schedule = drsblobs.schedule() + schedule.size = 188 + schedule.bandwidth = 0 + schedule.numberOfSchedules = 1 + header = drsblobs.scheduleHeader() + header.type = 0 + header.offset = 20 + schedule.headerArray = [header] + data = drsblobs.scheduleSlots() + data.slots = times + schedule.dataArray = [data] + return schedule + + +class GraphFunctionTests(samba.tests.TestCase): + + def test_total_schedule(self): + schedule = [0x81] * 84 + for schedule, total in ( + ([0x81] * 84, 168), + ([0xff] * 84, 84 * 8), + ([0xaa] * 84, 84 * 4), + ([0x03, 0x33] * 42, 42 * 6), + (list(range(7)) * 12, 12 * 9), + (list(range(4)) * 21, 21 * 4)): + self.assertEqual(total_schedule(schedule), total) + + def test_convert_schedule_to_repltimes(self): + for ntdsconn_times, repltimes in ( + ([0x01] * 168, [0x11] * 84), + (None, [0x11] * 84), + ([0x06] * 168, [0x66] * 84), + ([0x03, 0xa] * 84, [0x3a] * 84), + (list(range(7)) * 24, + [0x01, 0x23, 0x45, 0x60, 0x12, 0x34, 0x56] * 12)): + schedule = ntdsconn_schedule(ntdsconn_times) + self.assertEqual(convert_schedule_to_repltimes(schedule), + repltimes) diff --git a/python/samba/tests/kcc/graph_utils.py b/python/samba/tests/kcc/graph_utils.py new file mode 100644 index 0000000..3eaa1c7 --- /dev/null +++ b/python/samba/tests/kcc/graph_utils.py @@ -0,0 +1,165 @@ +# Unix SMB/CIFS implementation. Tests for kcc.graph_utils routines +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.kcc.graph_utils""" + +import samba +import samba.tests +from samba.kcc.graph_utils import GraphError +from samba.kcc.graph_utils import (verify_graph_complete, + verify_graph_connected, + verify_graph_connected_under_edge_failures, + verify_graph_forest, + verify_graph_connected_under_vertex_failures, + verify_graph_no_lonely_vertices) + +import itertools + + +def make_tree(vertices): + if len(vertices) < 2: + return () + remaining = set(vertices) + used = set() + edges = set() + used.add(remaining.pop()) + used.add(remaining.pop()) + edges.add(tuple(used)) + while remaining: + v = remaining.pop() + w = used.pop() + e = (w, v) + edges.add(e) + used.update(e) + return tuple(edges) + +# TODO: test directed graphs + + +class UndirectedGraphTests(samba.tests.TestCase): + + def setUp(self): + super().setUp() + vertices = tuple('abcdefgh') + vertices2 = tuple('ijk') + edges = tuple(itertools.combinations(vertices, 2)) + edges2 = tuple(itertools.combinations(vertices2, 2)) + line_edges = list(zip(vertices[1:], vertices[:-1])) + ring_edges = line_edges + [(vertices[0], vertices[-1])] + + tree = make_tree(vertices) + tree2 = make_tree(vertices2) + + self.complete_graph = [edges, vertices, vertices] + + self.disconnected_clusters = [edges + edges2, + vertices + vertices2, + vertices + vertices2] + + self.graph_with_unreachables = [edges, + vertices + vertices2, + vertices] + + self.ring = [ring_edges, vertices, vertices] + self.line = [line_edges, vertices, vertices] + + self.tree = [tree, vertices, vertices] + self.forest = [tree + tree2, + vertices + vertices2, + vertices + vertices2] + + self.unconnected_graph = ((), vertices, ()) + + def assertGraphError(self, fn, *args): + return self.assertRaises(GraphError, fn, *args) + + def test_graph_complete(self): + fn = verify_graph_complete + + self.assertGraphError(fn, *self.disconnected_clusters) + self.assertGraphError(fn, *self.graph_with_unreachables) + self.assertGraphError(fn, *self.ring) + self.assertGraphError(fn, *self.tree) + + self.assertIsNone(fn(*self.complete_graph)) + + def test_graph_connected(self): + fn = verify_graph_connected + + self.assertGraphError(fn, *self.disconnected_clusters) + self.assertGraphError(fn, *self.graph_with_unreachables) + self.assertGraphError(fn, *self.forest) + self.assertGraphError(fn, *self.unconnected_graph) + + self.assertIsNone(fn(*self.line)) + self.assertIsNone(fn(*self.ring)) + self.assertIsNone(fn(*self.complete_graph)) + self.assertIsNone(fn(*self.tree)) + + def test_graph_forest(self): + fn = verify_graph_forest + + self.assertGraphError(fn, *self.disconnected_clusters) + self.assertGraphError(fn, *self.graph_with_unreachables) + self.assertGraphError(fn, *self.ring) + + self.assertIsNone(fn(*self.line)) + self.assertIsNone(fn(*self.tree)) + self.assertIsNone(fn(*self.forest)) + self.assertIsNone(fn(*self.unconnected_graph)) + + def test_graph_connected_under_edge_failures(self): + fn = verify_graph_connected_under_edge_failures + + self.assertGraphError(fn, *self.line) + self.assertGraphError(fn, *self.tree) + self.assertGraphError(fn, *self.forest) + self.assertGraphError(fn, *self.disconnected_clusters) + + self.assertIsNone(fn(*self.ring)) + self.assertIsNone(fn(*self.complete_graph)) + + def test_graph_connected_under_vertex_failures(self): + # XXX no tests to distinguish this from the edge_failures case + fn = verify_graph_connected_under_vertex_failures + + self.assertGraphError(fn, *self.line) + self.assertGraphError(fn, *self.tree) + self.assertGraphError(fn, *self.forest) + self.assertGraphError(fn, *self.disconnected_clusters) + + self.assertIsNone(fn(*self.ring)) + self.assertIsNone(fn(*self.complete_graph)) + + def test_graph_multi_edge_forest(self): + pass + + def test_graph_no_lonely_vertices(self): + fn = verify_graph_no_lonely_vertices + self.assertGraphError(fn, *self.unconnected_graph) + self.assertGraphError(fn, *self.graph_with_unreachables) + + self.assertIsNone(fn(*self.ring)) + self.assertIsNone(fn(*self.complete_graph)) + self.assertIsNone(fn(*self.line)) + self.assertIsNone(fn(*self.tree)) + self.assertIsNone(fn(*self.forest)) + + def test_graph_no_unknown_vertices(self): + pass diff --git a/python/samba/tests/kcc/kcc_utils.py b/python/samba/tests/kcc/kcc_utils.py new file mode 100644 index 0000000..c1af998 --- /dev/null +++ b/python/samba/tests/kcc/kcc_utils.py @@ -0,0 +1,393 @@ +# Unix SMB/CIFS implementation. Tests for samba.kcc.kcc_utils. +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.kcc.kcc_utils""" +import samba +import samba.tests +from samba.kcc.kcc_utils import new_connection_schedule, drsblobs +from samba.kcc.kcc_utils import uncovered_sites_to_cover +from samba.credentials import Credentials +from samba.auth import system_session +from samba.samdb import SamDB +from samba.tests import delete_force + + +class ScheduleTests(samba.tests.TestCase): + + def test_new_connection_schedule(self): + schedule = new_connection_schedule() + self.assertIsInstance(schedule, drsblobs.schedule) + self.assertEqual(schedule.size, 188) + self.assertEqual(len(schedule.dataArray[0].slots), 168) + + +# OK, this is pathetic, but the rest of it looks really hard, with the +# classes all intertwingled with each other and the samdb. That is to say: +# XXX later. + +class SiteCoverageTests(samba.tests.TestCase): + + def setUp(self): + self.prefix = "kcc_" + self.lp = samba.tests.env_loadparm() + + self.sites = {} + self.site_links = {} + + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + + self.samdb = SamDB(session_info=self.session, + credentials=self.creds, + lp=self.lp) + + def tearDown(self): + self.samdb.transaction_start() + + for site in self.sites: + delete_force(self.samdb, site, controls=['tree_delete:1']) + + for site_link in self.site_links: + delete_force(self.samdb, site_link) + + self.samdb.transaction_commit() + + def _add_server(self, name, site): + dn = "CN={0},CN=Servers,{1}".format(name, site) + self.samdb.add({ + "dn": dn, + "objectClass": "server", + "serverReference": self.samdb.domain_dn() + }) + return dn + + def _add_site(self, name): + dn = "CN={0},CN=Sites,{1}".format( + name, self.samdb.get_config_basedn() + ) + self.samdb.add({ + "dn": dn, + "objectClass": "site" + }) + self.samdb.add({ + "dn": "CN=Servers," + dn, + "objectClass": ["serversContainer"] + }) + + self.sites[dn] = name + return dn, name.lower() + + def _add_site_link(self, name, links=None, cost=100): + if links is None: + links = [] + dn = "CN={0},CN=IP,CN=Inter-Site Transports,CN=Sites,{1}".format( + name, self.samdb.get_config_basedn() + ) + self.samdb.add({ + "dn": dn, + "objectClass": "siteLink", + "cost": str(cost), + "siteList": links + }) + self.site_links[dn] = name + return dn + + def test_single_site_link_same_dc_count(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + + self._add_site_link(self.prefix + "link", + [site1, site2, uncovered_dn]) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([], to_cover) + + def test_single_site_link_different_dc_count(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "ABCD" + '2', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "BCDE" + '2', site2) + self._add_server(self.prefix + "BCDE" + '3', site2) + + self._add_site_link(self.prefix + "link", + [site1, site2, uncovered_dn]) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + def test_two_site_links_same_cost(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "ABCD" + '2', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "BCDE" + '2', site2) + self._add_server(self.prefix + "BCDE" + '3', site2) + + self._add_site_link(self.prefix + "link1", + [site1, uncovered_dn]) + self._add_site_link(self.prefix + "link2", + [site2, uncovered_dn]) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + def test_two_site_links_different_costs(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "BCDE" + '2', site2) + + self._add_site_link(self.prefix + "link1", + [site1, uncovered_dn], + cost=50) + self._add_site_link(self.prefix + "link2", + [site2, uncovered_dn], + cost=75) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([], to_cover) + + def test_three_site_links_different_costs(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + site3, name3 = self._add_site(self.prefix + "CDEF") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "CDEF" + '1', site3) + self._add_server(self.prefix + "CDEF" + '2', site3) + + self._add_site_link(self.prefix + "link1", + [site1, uncovered_dn], + cost=50) + self._add_site_link(self.prefix + "link2", + [site2, uncovered_dn], + cost=75) + self._add_site_link(self.prefix + "link3", + [site3, uncovered_dn], + cost=60) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name3) + to_cover.sort() + + self.assertEqual([], to_cover) + + def test_three_site_links_duplicate_costs(self): + # two of the links have the same cost; the other is higher + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + site3, name3 = self._add_site(self.prefix + "CDEF") + + uncovered_dn, uncovered = self._add_site(self.prefix + "uncovered") + + self._add_server(self.prefix + "ABCD" + '1', site1) + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "CDEF" + '1', site3) + self._add_server(self.prefix + "CDEF" + '2', site3) + + self._add_site_link(self.prefix + "link1", + [site1, uncovered_dn], + cost=50) + self._add_site_link(self.prefix + "link2", + [site2, uncovered_dn], + cost=75) + self._add_site_link(self.prefix + "link3", + [site3, uncovered_dn], + cost=50) + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name3) + to_cover.sort() + + self.assertEqual([uncovered], to_cover) + + def test_complex_setup_with_multiple_uncovered_sites(self): + self.samdb.transaction_start() + site1, name1 = self._add_site(self.prefix + "ABCD") + site2, name2 = self._add_site(self.prefix + "BCDE") + site3, name3 = self._add_site(self.prefix + "CDEF") + + site4, name4 = self._add_site(self.prefix + "1234") + site5, name5 = self._add_site(self.prefix + "2345") + site6, name6 = self._add_site(self.prefix + "3456") + + uncovered_dn1, uncovered1 = self._add_site(self.prefix + "uncovered1") + uncovered_dn2, uncovered2 = self._add_site(self.prefix + "uncovered2") + uncovered_dn3, uncovered3 = self._add_site(self.prefix + "uncovered3") + + # Site Link Cluster 1 - Server List + self._add_server(self.prefix + "ABCD" + '1', site1) + + self._add_server(self.prefix + "BCDE" + '1', site2) + self._add_server(self.prefix + "BCDE" + '2', site2) + + self._add_server(self.prefix + "CDEF" + '1', site3) + self._add_server(self.prefix + "CDEF" + '2', site3) + self._add_server(self.prefix + "CDEF" + '3', site3) + + # Site Link Cluster 2 - Server List + self._add_server(self.prefix + "1234" + '1', site4) + self._add_server(self.prefix + "1234" + '2', site4) + + self._add_server(self.prefix + "2345" + '1', site5) + self._add_server(self.prefix + "2345" + '2', site5) + + self._add_server(self.prefix + "3456" + '1', site6) + + # Join to Uncovered1 (preference to site link cluster 1) + self._add_site_link(self.prefix + "link1A", + [site1, site2, site3, uncovered_dn1], + cost=49) + self._add_site_link(self.prefix + "link2A", + [site4, site5, site6, uncovered_dn1], + cost=50) + + # Join to Uncovered2 (no preferene on site links) + self._add_site_link(self.prefix + "link1B", + [site1, site2, site3, uncovered_dn2], + cost=50) + self._add_site_link(self.prefix + "link2B", + [site4, site5, site6, uncovered_dn2], + cost=50) + + # Join to Uncovered3 (preference to site link cluster 2) + self._add_site_link(self.prefix + "link1C", + [site1, site2, site3, uncovered_dn3], + cost=50) + self._add_site_link(self.prefix + "link2C", + [site4, site5, site6, uncovered_dn3], + cost=49) + + self.samdb.transaction_commit() + + to_cover = uncovered_sites_to_cover(self.samdb, name1) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name2) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name3) + to_cover.sort() + + self.assertEqual([uncovered1, uncovered2], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name4) + to_cover.sort() + + self.assertEqual([uncovered2, uncovered3], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name5) + to_cover.sort() + + self.assertEqual([], to_cover) + + to_cover = uncovered_sites_to_cover(self.samdb, name6) + to_cover.sort() + + self.assertEqual([], to_cover) + + for to_check in [uncovered1, uncovered2, uncovered3]: + to_cover = uncovered_sites_to_cover(self.samdb, to_check) + to_cover.sort() + + self.assertEqual([], to_cover) diff --git a/python/samba/tests/kcc/ldif_import_export.py b/python/samba/tests/kcc/ldif_import_export.py new file mode 100644 index 0000000..9e573bf --- /dev/null +++ b/python/samba/tests/kcc/ldif_import_export.py @@ -0,0 +1,240 @@ +# Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export. +# Copyright (C) Andrew Bartlett 2015 +# +# Written by Douglas Bagnall +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""Tests for samba.kcc.ldif_import_export""" + +import samba +import os +import time +import subprocess +import logging +import samba.tests +from samba.kcc import ldif_import_export, KCC +from samba import ldb +from samba.dcerpc import misc + + +from samba.param import LoadParm +from samba.credentials import Credentials +from samba.samdb import SamDB + +unix_now = int(time.time()) + +MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'], + "testdata/ldif-utils-test-multisite.ldif") + + +# UNCONNECTED_LDIF is a single site, unconnected 5DC database that was +# created using samba-tool domain join in testenv. +UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'], + "testdata/unconnected-intrasite.ldif") + +MULTISITE_LDIF_DSAS = ( + ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-4"), + ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-4"), + ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-3"), + ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-5"), + ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-5"), + ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Site-2"), + ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", + "Default-First-Site-Name"), +) + + +class LdifImportExportTests(samba.tests.TestCaseInTempDir): + def setUp(self): + super().setUp() + self.lp = LoadParm() + self.creds = Credentials() + self.creds.guess(self.lp) + + def remove_files(self, *files): + for f in files: + assert(f.startswith(self.tempdir)) + os.unlink(f) + + def test_write_search_url(self): + pass + + def test_ldif_to_samdb(self): + dburl = os.path.join(self.tempdir, "ldap") + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF) + self.assertIsInstance(samdb, SamDB) + + dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," + "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + + ntds_guid = misc.GUID(samdb.get_ntds_GUID()) + self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) + + service_name_res = samdb.search(base="", + scope=ldb.SCOPE_BASE, + attrs=["dsServiceName"]) + dn = ldb.Dn(samdb, + service_name_res[0]["dsServiceName"][0].decode('utf8')) + self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) + self.remove_files(dburl) + + def test_ldif_to_samdb_forced_local_dsa(self): + for dsa, site in MULTISITE_LDIF_DSAS: + dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa" + "-%s" % dsa) + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF, + forced_local_dsa=dsa) + self.assertIsInstance(samdb, SamDB) + self.assertEqual(samdb.server_site_name(), site) + + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + + ntds_guid = misc.GUID(samdb.get_ntds_GUID()) + self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) + + service_name_res = samdb.search(base="", + scope=ldb.SCOPE_BASE, + attrs=["dsServiceName"]) + dn = ldb.Dn(samdb, + service_name_res[0]["dsServiceName"][0].decode('utf8')) + self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) + self.remove_files(dburl) + + def test_samdb_to_ldif_file(self): + dburl = os.path.join(self.tempdir, "ldap") + dburl2 = os.path.join(self.tempdir, "ldap_roundtrip") + ldif_file = os.path.join(self.tempdir, "ldif") + samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, + MULTISITE_LDIF) + self.assertIsInstance(samdb, SamDB) + ldif_import_export.samdb_to_ldif_file(samdb, dburl, + lp=self.lp, creds=None, + ldif_file=ldif_file) + self.assertGreater(os.path.getsize(ldif_file), 1000, + "LDIF should be larger than 1000 bytes") + samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp, + ldif_file) + self.assertIsInstance(samdb, SamDB) + dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," + "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") + res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), + scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) + self.remove_files(dburl) + self.remove_files(dburl2) + self.remove_files(ldif_file) + + +class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir): + def setUp(self): + super().setUp() + self.lp = LoadParm() + self.creds = Credentials() + self.creds.guess(self.lp) + + def remove_files(self, *files): + for f in files: + assert(f.startswith(self.tempdir)) + os.unlink(f) + + def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None): + # Note that setting read-only to False won't affect the ldif, + # only the temporary database that is created from it. + my_kcc = KCC(unix_now, readonly=readonly, verify=verify, + dot_file_dir=dot_file_dir) + tmpdb = os.path.join(self.tempdir, 'tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + self.remove_files(tmpdb) + return my_kcc + + def test_list_dsas(self): + my_kcc = self._get_kcc('test-list') + dsas = set(my_kcc.list_dsas()) + expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS) + self.assertEqual(dsas, expected_dsas) + + def test_verify(self): + """Check that the KCC generates graphs that pass its own verify + option. + """ + my_kcc = self._get_kcc('test-verify', verify=True) + tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + self.remove_files(tmpdb) + + def test_unconnected_db(self): + """Check that the KCC generates errors on a unconnected db + """ + my_kcc = self._get_kcc('test-verify', verify=True) + tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') + my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF) + + try: + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + except samba.kcc.graph_utils.GraphError: + pass + except Exception: + self.fail("Did not expect this error.") + finally: + self.remove_files(tmpdb) + + def test_dotfiles(self): + """Check that KCC writes dot_files when asked. + """ + my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir) + tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb') + files = [tmpdb] + my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) + my_kcc.run(None, + self.lp, self.creds, + attempt_live_connections=False) + + dot = '/usr/bin/dot' + for fn in os.listdir(self.tempdir): + if fn.endswith('.dot'): + ffn = os.path.join(self.tempdir, fn) + if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0: + r = subprocess.call([dot, '-Tcanon', ffn]) + self.assertEqual(r, 0) + + # even if dot is not there, at least check the file is non-empty + size = os.stat(ffn).st_size + self.assertNotEqual(size, 0) + files.append(ffn) + + self.remove_files(*files) -- cgit v1.2.3