summaryrefslogtreecommitdiffstats
path: root/python/samba/tests/samba_tool/visualize_drs.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 17:20:00 +0000
commit8daa83a594a2e98f39d764422bfbdbc62c9efd44 (patch)
tree4099e8021376c7d8c05bdf8503093d80e9c7bad0 /python/samba/tests/samba_tool/visualize_drs.py
parentInitial commit. (diff)
downloadsamba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.tar.xz
samba-8daa83a594a2e98f39d764422bfbdbc62c9efd44.zip
Adding upstream version 2:4.20.0+dfsg.upstream/2%4.20.0+dfsg
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'python/samba/tests/samba_tool/visualize_drs.py')
-rw-r--r--python/samba/tests/samba_tool/visualize_drs.py636
1 files changed, 636 insertions, 0 deletions
diff --git a/python/samba/tests/samba_tool/visualize_drs.py b/python/samba/tests/samba_tool/visualize_drs.py
new file mode 100644
index 0000000..64b2cdb
--- /dev/null
+++ b/python/samba/tests/samba_tool/visualize_drs.py
@@ -0,0 +1,636 @@
+# -*- coding: utf-8 -*-
+# Originally based on tests for samba.kcc.ldif_import_export.
+# Copyright (C) Andrew Bartlett 2015, 2018
+#
+# by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""Tests for samba-tool visualize using the vampire DC and promoted DC
+environments. For most tests we assume we can't assert much about what
+state they are in, so we mainly check for command failure, but for
+others we try to grasp control of replication and make more specific
+assertions.
+"""
+
+import os
+import re
+import json
+import random
+import subprocess
+from samba.tests.samba_tool.base import SambaToolCmdTest
+
+VERBOSE = False
+
+ENV_DSAS = {
+ 'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
+ 'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
+ 'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
+ 'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
+}
+
+PARTITION_NAMES = [
+ "DOMAIN",
+ "CONFIGURATION",
+ "SCHEMA",
+ "DNSDOMAIN",
+ "DNSFOREST",
+]
+
+def adjust_cmd_for_py_version(parts):
+ if os.getenv("PYTHON", None):
+ parts.insert(0, os.environ["PYTHON"])
+ return parts
+
+def set_auto_replication(dc, allow):
+ credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ on_or_off = '-' if allow else '+'
+
+ for opt in ['DISABLE_INBOUND_REPL',
+ 'DISABLE_OUTBOUND_REPL']:
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'drs', 'options',
+ credstring, dc,
+ "--dsa-option=%s%s" % (on_or_off, opt)])
+
+ subprocess.check_call(cmd)
+
+
+def force_replication(src, dest, base):
+ credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ cmd = adjust_cmd_for_py_version(['bin/samba-tool',
+ 'drs', 'replicate',
+ dest, src, base,
+ credstring,
+ '--sync-forced'])
+
+ subprocess.check_call(cmd)
+
+
+def get_utf8_matrix(s):
+ # parse the graphical table *just* well enough for our tests
+ # decolourise first
+ s = re.sub("\033" r"\[[^m]+m", '', s)
+ lines = s.split('\n')
+ # matrix rows have '·' on the diagonal
+ rows = [x.strip().replace('·', '0') for x in lines if '·' in x]
+ names = []
+ values = []
+ for r in rows:
+ parts = r.rsplit(None, len(rows))
+ k, v = parts[0], parts[1:]
+ # we want the FOO in 'CN=FOO+' or 'CN=FOO,CN=x,DC=...'
+ k = re.match(r'cn=([^+,]+)', k.lower()).group(1)
+ names.append(k)
+ if len(v) == 1: # this is a single-digit matrix, no spaces
+ v = list(v[0])
+ values.append([int(x) if x.isdigit() else 1e999 for x in v])
+
+ d = {}
+ for n1, row in zip(names, values):
+ d[n1] = {}
+ for n2, v in zip(names, row):
+ d[n1][n2] = v
+
+ return d
+
+
+class SambaToolVisualizeDrsTest(SambaToolCmdTest):
+
+ def test_ntdsconn(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
+ '-H', server,
+ '-U', creds,
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_ntdsconn_remote(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
+ '-H', server,
+ '-U', creds,
+ '--color=no', '-S', '-r')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_reps(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "reps",
+ '-H', server,
+ '-U', creds,
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_uptodateness_all_partitions(self):
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ # We will check that the visualisation works for the two
+ # stopped DCs, but we can't make assertions that the output
+ # will be the same because there may be replication between
+ # the two calls. Stopping the replication on these ones is not
+ # enough because there are other DCs about.
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc2,
+ '-U', creds,
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_uptodateness_partitions(self):
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ for part in PARTITION_NAMES:
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=no', '-S',
+ '--partition', part)
+ self.assertCmdSuccess(result, out, err)
+
+ def test_drs_uptodateness(self):
+ """
+ Test cmd `drs uptodateness`
+
+ It should print info like this:
+
+ DNSDOMAIN failure: 4 median: 1.5 maximum: 2
+ SCHEMA failure: 4 median: 220.0 maximum: 439
+ DOMAIN failure: 1 median: 25 maximum: 25
+ CONFIGURATION failure: 1 median: 25 maximum: 25
+ DNSFOREST failure: 4 median: 1.5 maximum: 2
+
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds)
+ self.assertCmdSuccess(result, out, err)
+ # each partition name should be in output
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, out, msg=out)
+
+ for line in out.splitlines():
+ # check keyword in output
+ for attr in ['maximum', 'median', 'failure']:
+ self.assertIn(attr, line)
+
+ def test_drs_uptodateness_partition(self):
+ """
+ Test cmd `drs uptodateness --partition DOMAIN`
+
+ It should print info like this:
+
+ DOMAIN failure: 1 median: 25 maximum: 25
+
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--partition', 'DOMAIN')
+ self.assertCmdSuccess(result, out, err)
+
+ lines = out.splitlines()
+ self.assertEqual(len(lines), 1)
+
+ line = lines[0]
+ self.assertTrue(line.startswith('DOMAIN'))
+
+ def test_drs_uptodateness_json(self):
+ """
+ Test cmd `drs uptodateness --json`
+
+ Example output:
+
+ {
+ "DNSDOMAIN": {
+ "failure": 0,
+ "median": 0.0,
+ "maximum": 0
+ },
+ ...
+ "SCHEMA": {
+ "failure": 0,
+ "median": 0.0,
+ "maximum": 0
+ }
+ }
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--json')
+ self.assertCmdSuccess(result, out, err)
+ # should be json format
+ obj = json.loads(out)
+ # each partition name should be in json obj
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, obj)
+ summary_obj = obj[part_name]
+ for attr in ['maximum', 'median', 'failure']:
+ self.assertIn(attr, summary_obj)
+
+ def test_drs_uptodateness_json_median(self):
+ """
+ Test cmd `drs uptodateness --json --median`
+
+ drs uptodateness --json --median
+
+ {
+ "DNSDOMAIN": {
+ "median": 0.0
+ },
+ ...
+ "SCHEMA": {
+ "median": 0.0
+ }
+ }
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--json', '--median')
+ self.assertCmdSuccess(result, out, err)
+ # should be json format
+ obj = json.loads(out)
+ # each partition name should be in json obj
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, obj)
+ summary_obj = obj[part_name]
+ self.assertIn('median', summary_obj)
+ self.assertNotIn('maximum', summary_obj)
+ self.assertNotIn('failure', summary_obj)
+
+ def assert_matrix_validity(self, matrix, dcs=()):
+ for dc in dcs:
+ self.assertIn(dc, matrix)
+ for k, row in matrix.items():
+ self.assertEqual(row[k], 0)
+
+ def test_uptodateness_stop_replication_domain(self):
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ self.addCleanup(set_auto_replication, dc1, True)
+ self.addCleanup(set_auto_replication, dc2, True)
+
+ def display(heading, out):
+ if VERBOSE:
+ print("========", heading, "=========")
+ print(out)
+
+ samdb1 = self.getSamDB("-H", "ldap://%s" % dc1, "-U", creds)
+ samdb2 = self.getSamDB("-H", "ldap://%s" % dc2, "-U", creds)
+
+ domain_dn = samdb1.domain_dn()
+ self.assertTrue(domain_dn == samdb2.domain_dn(),
+ "We expected the same domain_dn across DCs")
+
+ ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
+ ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
+ samdb1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ samdb2.add({
+ "dn": ou2,
+ "objectclass": "organizationalUnit"
+ })
+
+ set_auto_replication(dc1, False)
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("dc1 replication is now off", out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+
+ force_replication(dc2, dc1, domain_dn)
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("forced replication %s -> %s" % (dc2, dc1), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ self.assertEqual(matrix[dc1][dc2], 0)
+
+ force_replication(dc1, dc2, domain_dn)
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("forced replication %s -> %s" % (dc2, dc1), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ self.assertEqual(matrix[dc2][dc1], 0)
+
+ dn1 = 'cn=u1.%%d,%s' % (ou1)
+ dn2 = 'cn=u2.%%d,%s' % (ou2)
+
+ for i in range(10):
+ samdb1.add({
+ "dn": dn1 % i,
+ "objectclass": "user"
+ })
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("added 10 users on %s" % dc1, out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # dc2's view of dc1 should now be 10 changes out of date
+ self.assertEqual(matrix[dc2][dc1], 10)
+
+ for i in range(10):
+ samdb2.add({
+ "dn": dn2 % i,
+ "objectclass": "user"
+ })
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("added 10 users on %s" % dc2, out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # dc1's view of dc2 is probably 11 changes out of date
+ self.assertGreaterEqual(matrix[dc1][dc2], 10)
+
+ for i in range(10, 101):
+ samdb1.add({
+ "dn": dn1 % i,
+ "objectclass": "user"
+ })
+ samdb2.add({
+ "dn": dn2 % i,
+ "objectclass": "user"
+ })
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("added 91 users on both", out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # the difference here should be ~101.
+ self.assertGreaterEqual(matrix[dc1][dc2], 100)
+ self.assertGreaterEqual(matrix[dc2][dc1], 100)
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN',
+ '--max-digits', '2')
+ display("with --max-digits 2", out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # visualising with 2 digits mean these overflow into infinity
+ self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
+ self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN',
+ '--max-digits', '1')
+ display("with --max-digits 1", out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # visualising with 1 digit means these overflow into infinity
+ self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
+ self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
+
+ force_replication(dc2, dc1, samdb1.domain_dn())
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+
+ display("forced replication %s -> %s" % (dc2, dc1), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ self.assertEqual(matrix[dc1][dc2], 0)
+
+ force_replication(dc1, dc2, samdb2.domain_dn())
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+
+ display("forced replication %s -> %s" % (dc1, dc2), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ self.assertEqual(matrix[dc2][dc1], 0)
+
+ samdb1.delete(ou1, ['tree_delete:1'])
+ samdb2.delete(ou2, ['tree_delete:1'])
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("tree delete both ous on %s" % (dc1,), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ self.assertGreaterEqual(matrix[dc1][dc2], 100)
+ self.assertGreaterEqual(matrix[dc2][dc1], 100)
+
+ set_auto_replication(dc1, True)
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("replication is now on", out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+ # We can't assert actual values after this because
+ # auto-replication is on and things will change underneath us.
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc2,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+
+ display("%s's view" % dc2, out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+
+ force_replication(dc1, dc2, samdb2.domain_dn())
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+
+ display("forced replication %s -> %s" % (dc1, dc2), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+
+ force_replication(dc2, dc1, samdb2.domain_dn())
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc1,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("forced replication %s -> %s" % (dc2, dc1), out)
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+
+ (result, out, err) = self.runsubcmd("visualize", "uptodateness",
+ "-r",
+ '-H', "ldap://%s" % dc2,
+ '-U', creds,
+ '--color=yes',
+ '--utf8', '-S',
+ '--partition', 'DOMAIN')
+ display("%s's view" % dc2, out)
+
+ self.assertCmdSuccess(result, out, err)
+ matrix = get_utf8_matrix(out)
+ self.assert_matrix_validity(matrix, [dc1, dc2])
+
+ def test_reps_remote(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "reps",
+ '-H', server,
+ '-U', creds,
+ '--color=no', '-S', '-r')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_ntdsconn_dot(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
+ '-H', server,
+ '-U', creds, '--dot',
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_ntdsconn_remote_dot(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
+ '-H', server,
+ '-U', creds, '--dot',
+ '--color=no', '-S', '-r')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_reps_dot(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "reps",
+ '-H', server,
+ '-U', creds, '--dot',
+ '--color=no', '-S')
+ self.assertCmdSuccess(result, out, err)
+
+ def test_reps_remote_dot(self):
+ server = "ldap://%s" % os.environ["SERVER"]
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ (result, out, err) = self.runsubcmd("visualize", "reps",
+ '-H', server,
+ '-U', creds, '--dot',
+ '--color=no', '-S', '-r')
+ self.assertCmdSuccess(result, out, err)