summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/getnc_schema.py
blob: 60062f9077b27bcdcdf4adc81d5c407ff7c04ac1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import drs_base
import ldb
import time
import random
import os

break_me = os.getenv("PLEASE_BREAK_MY_WINDOWS") == "1"
assert break_me, ("This test breaks Windows active directory after "
                  "a few runs.  Set PLEASE_BREAK_MY_WINDOWS=1 to run.")

# This test runs against Windows.  To run, set up two Windows AD DCs, join one
# to the other, and make sure the passwords are the same.  SMB_CONF_PATH must
# also be set to any smb.conf file. Set DC1 to the PDC's hostname, and DC2 to
# the join'd DC's hostname. Example:
# PLEASE_BREAK_MY_WINDOWS=1
# DC1=pdc DC2=joindc
# SMB_CONF_PATH=st/ad_dc/etc/smb.conf
# PYTHONPATH=$PYTHONPATH:./source4/torture/drs/python
# python3 ./source4/scripting/bin/subunitrun getnc_schema
# -UAdministrator%Password

class SchemaReplicationTests(drs_base.DrsBaseTestCase):

    def setUp(self):
        super(SchemaReplicationTests, self).setUp()
        self.creds = self.get_credentials()
        self.cmdline_auth = "-U{}%{}".format(self.creds.get_username(),
                                             self.creds.get_password())

        self.from_ldb = self.ldb_dc1
        self.dest_ldb = self.ldb_dc2
        self._disable_inbound_repl(self.url_dc1)
        self._disable_all_repl(self.url_dc1)
        self.free_offset = 0

    def tearDown(self):
        self._enable_inbound_repl(self.url_dc1)
        self._enable_all_repl(self.url_dc1)

    def do_repl(self, partition_dn):
        self._enable_inbound_repl(self.url_dc1)
        self._enable_all_repl(self.url_dc1)

        samba_tool_cmd = ["drs", "replicate", self.url_dc2, self.url_dc1]
        samba_tool_cmd += [partition_dn]
        username = self.creds.get_username()
        password = self.creds.get_password()
        samba_tool_cmd += ["-U{0}%{1}".format(username, password)]

        (result, out, err) = self.runsubcmd(*samba_tool_cmd)

        try:
            self.assertCmdSuccess(result, out, err)
        except AssertionError:
            print("Failed repl, retrying in 10s")
            time.sleep(10)
            (result, out, err) = self.runsubcmd(*samba_tool_cmd)

        self._disable_inbound_repl(self.url_dc1)
        self._disable_all_repl(self.url_dc1)

        self.assertCmdSuccess(result, out, err)

    # Get a unique prefix for some search expression like "([att]=[pref]{i}*)"
    def get_unique(self, expr_templ):
        found = True
        while found:
            i = random.randint(0, 65535)
            res = self.from_ldb.search(base=self.schema_dn,
                                       scope=ldb.SCOPE_SUBTREE,
                                       expression=expr_templ.format(i=i))
            found = len(res) > 0

        return str(i)

    def unique_gov_id_prefix(self):
        prefix = "1.3.6.1.4.1.7165.4.6.2.8."
        return prefix + self.get_unique("(governsId=" + prefix + "{i}.*)")

    def unique_cn_prefix(self, prefix="testobj"):
        return prefix + self.get_unique("(cn=" + prefix + "{i}x*)") + "x"

    # Make test schema classes linked to each other in a line, then modify
    # them in reverse order so when we repl, a link crosses the chunk
    # boundary.  Chunk size is 133 by default so we do 150.
    def test_poss_superiors_across_chunk(self):
        num_schema_objects_to_add = 150
        class_name = self.unique_cn_prefix()

        ldif_template = """
dn: CN={class_name}{i},{schema_dn}
objectClass: top
objectClass: classSchema
adminDescription: {class_name}{i}
adminDisplayName: {class_name}{i}
cn: {class_name}{i}
governsId: {gov_id}.{i}
instanceType: 4
objectClassCategory: 1
systemFlags: 16
systemOnly: FALSE
"""

        ldif_kwargs = {'class_name': class_name,
                       'schema_dn': self.schema_dn}
        gov_id = self.unique_gov_id_prefix()
        ldif = ldif_template.format(i=0, gov_id=gov_id, **ldif_kwargs)
        self.from_ldb.add_ldif(ldif)

        ldif_template += "systemPossSuperiors: {possSup}\n"

        ids = list(range(num_schema_objects_to_add))
        got_no_such_attrib = False
        for i in ids[1:]:
            last_class_name = class_name + str(i-1)
            ldif = ldif_template.format(i=i, gov_id=gov_id,
                                        possSup=last_class_name,
                                        **ldif_kwargs)

            try:
                self.from_ldb.add_ldif(ldif)
                if got_no_such_attrib:
                    self.from_ldb.set_schema_update_now()
            except ldb.LdbError as e:
                if e.args[0] != ldb.ERR_NO_SUCH_ATTRIBUTE:
                    self.fail(e)
                if got_no_such_attrib:
                    self.fail(("got NO_SUCH_ATTRIB even after "
                               "setting schemaUpdateNow", str(e)))
                print("got NO_SUCH_ATTRIB, trying schemaUpdateNow")
                got_no_such_attrib = True
                self.from_ldb.set_schema_update_now()
                self.from_ldb.add_ldif(ldif)
                self.from_ldb.set_schema_update_now()

        ldif_template = """
dn: CN={class_name}{i},{schema_dn}
changetype: modify
replace: adminDescription
adminDescription: new_description
"""

        for i in reversed(ids):
            ldif = ldif_template.format(i=i, **ldif_kwargs)
            self.from_ldb.modify_ldif(ldif)

        self.do_repl(self.schema_dn)

        dn_templ = "CN={class_name}{i},{schema_dn}"
        for i in ids:
            dn = dn_templ.format(i=i, **ldif_kwargs)
            res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
            self.assertEqual(len(res), 1)

    # Test for method of adding linked attributes in schema partition
    # required by other tests.
    def test_create_linked_attribute_in_schema(self):
        # Make an object outside of the schema partition that we can link to
        user_name = self.unique_cn_prefix("user")
        user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)

        ldif_template = """
dn: {user_dn}
objectClass: person
objectClass: user"""
        ldif = ldif_template.format(user_dn=user_dn)
        self.from_ldb.add_ldif(ldif)

        # Make test class name unique so test can run multiple times
        class_name = self.unique_cn_prefix("class")

        kwargs = {'class_name': class_name,
                  'schema_dn': self.schema_dn,
                  'user_dn': user_dn}

        # Add an auxiliary schemaClass (cat 3) class and give it managedBy
        # so we can create schema objects with linked attributes.
        ldif_template = """
dn: CN={class_name},{schema_dn}
objectClass: classSchema
governsId: {gov_id}.0
instanceType: 4
systemFlags: 16
systemOnly: FALSE
objectClassCategory: 3
mayContain: managedBy
"""

        gov_id = self.unique_gov_id_prefix()
        ldif = ldif_template.format(gov_id=gov_id, **kwargs)
        self.from_ldb.add_ldif(ldif)

        # Now make an instance that points back to the user with managedBy,
        # thus creating an object in the schema with a linked attribute
        ldif_template = """
dn: CN=link{class_name},{schema_dn}
objectClass: classSchema
objectClass: {class_name}
instanceType: 4
governsId: {gov_id}.0
systemFlags: 16
managedBy: {user_dn}
"""

        gov_id = self.unique_gov_id_prefix()
        ldif = ldif_template.format(gov_id=gov_id, **kwargs)
        self.from_ldb.add_ldif(ldif)

        # Check link exists on test schema object
        dn_templ = "CN=link{class_name},{schema_dn}"
        dn = dn_templ.format(**kwargs)
        res = self.from_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
        self.assertEqual(len(res), 1)
        self.assertIsNotNone(res[0].get("managedBy"))
        self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)

        # Check backlink on user object
        res = self.from_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
        self.assertEqual(len(res), 1)
        managed_objs = res[0].get("managedObjects")
        self.assertEqual(len(managed_objs), 1)
        managed_objs = [str(o) for o in managed_objs]
        self.assertEqual(managed_objs, [dn_templ.format(**kwargs)])

    def test_schema_linked_attributes(self):
        num_test_objects = 9

        # Make an object outside of the schema partition that we can link to
        user_name = self.unique_cn_prefix("user")
        user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)

        ldif_template = """
dn: {user_dn}
objectClass: person
objectClass: user"""
        ldif = ldif_template.format(user_dn=user_dn)
        self.from_ldb.add_ldif(ldif)

        self.do_repl(self.domain_dn)

        # Make test object name prefixes unique so test can run multiple times
        # in a single testenv (can't delete schema objects)
        class_name = self.unique_cn_prefix("class")
        link_class_name = self.unique_cn_prefix("linkClass")

        kwargs = {'class_name': class_name,
                  'schema_dn': self.schema_dn,
                  'link_class_name': link_class_name,
                  'user_dn': user_dn}

        # Add an auxiliary schemaClass (cat 3) class and give it managedBy
        # so we can create schema objects with linked attributes.
        ldif_template = """
dn: CN={class_name},{schema_dn}
objectClass: classSchema
governsId: {gov_id}.0
instanceType: 4
systemFlags: 16
systemOnly: FALSE
objectClassCategory: 3
mayContain: managedBy
"""

        gov_id = self.unique_gov_id_prefix()
        ldif = ldif_template.format(gov_id=gov_id, **kwargs)
        self.from_ldb.add_ldif(ldif)

        # Now make instances that point back to the user with managedBy,
        # thus creating objects in the schema with linked attributes
        ldif_template = """
dn: CN={link_class_name}{i},{schema_dn}
objectClass: classSchema
objectClass: {class_name}
instanceType: 4
governsId: {gov_id}.0
systemFlags: 16
managedBy: {user_dn}
"""

        id_range = list(range(num_test_objects))
        for i in id_range:
            gov_id = self.unique_gov_id_prefix()
            ldif = ldif_template.format(i=i, gov_id=gov_id, **kwargs)
            self.from_ldb.add_ldif(ldif)

        self.do_repl(self.schema_dn)

        # Check link exists in each test schema objects at destination DC
        dn_templ = "CN={link_class_name}{i},{schema_dn}"
        for i in id_range:
            dn = dn_templ.format(i=i, **kwargs)
            res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
            self.assertEqual(len(res), 1)
            self.assertIsNotNone(res[0].get("managedBy"))
            self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)

        # Check backlinks list on user object contains DNs of test objects.
        res = self.dest_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
        self.assertEqual(len(res), 1)
        managed_objs = res[0].get("managedObjects")
        self.assertIsNotNone(managed_objs)
        managed_objs_set = {str(el) for el in managed_objs}
        expected = {dn_templ.format(i=i, **kwargs) for i in id_range}
        self.assertEqual(managed_objs_set, expected)