summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/getncchanges.py
blob: 6b5456a66a8acd6defbdc30f3671c4172f83aba1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Tests various schema replication scenarios
#
# Copyright (C) Catalyst.Net Ltd. 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#
# Usage:
#  export DC1=dc1_dns_name
#  export DC2=dc2_dns_name
#  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
#  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
#       getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#

import drs_base
import samba.tests
import ldb
from ldb import SCOPE_BASE
import random

from samba.dcerpc import drsuapi, misc
from samba import WERRORError
from samba import werror

class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
    def setUp(self):
        super(DrsReplicaSyncIntegrityTestCase, self).setUp()

        self.init_test_state()

        # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
        # the vampire_dc), so we point this test directly at that DC
        self.set_test_ldb_dc(self.ldb_dc2)

        self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
                                                 "getncchanges." + self.id().rsplit(".", 1)[1]))

        self.addCleanup(self.ldb_dc2.delete, self.ou, ["tree_delete:1"])

        self.base_dn = self.test_ldb_dc.get_default_basedn()

        self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
        self.set_dc_connection(self.default_conn)

    def init_test_state(self):
        self.rxd_dn_list = []
        self.rxd_links = []
        self.rxd_guids = []
        self.last_ctr = None

        # 100 is the minimum max_objects that Microsoft seems to honour
        # (the max honoured is 400ish), so we use that in these tests
        self.max_objects = 100

        # store whether we used GET_TGT/GET_ANC flags in the requests
        self.used_get_tgt = False
        self.used_get_anc = False

    def add_object(self, dn, objectclass="organizationalunit"):
        """Adds an OU object"""
        self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
        res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
        self.assertEqual(len(res), 1)

    def modify_object(self, dn, attr, value):
        """Modifies an object's USN by adding an attribute value to it"""
        m = ldb.Message()
        m.dn = ldb.Dn(self.test_ldb_dc, dn)
        m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
        self.test_ldb_dc.modify(m)

    def delete_attribute(self, dn, attr, value):
        """Deletes an attribute from an object"""
        m = ldb.Message()
        m.dn = ldb.Dn(self.test_ldb_dc, dn)
        m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
        self.test_ldb_dc.modify(m)

    def start_new_repl_cycle(self):
        """Resets enough state info to start a new replication cycle"""
        # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
        # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
        # resolve
        self.rxd_links = []

        self.used_get_tgt = False
        self.used_get_anc = False
        # mostly preserve self.last_ctr, so that we use the last HWM
        if self.last_ctr is not None:
            self.last_ctr.more_data = True

    def create_object_range(self, start, end, prefix="",
                            children=None, parent_list=None):
        """
        Creates a block of objects. Object names are numbered sequentially,
        using the optional prefix supplied. If the children parameter is
        supplied it will create a parent-child hierarchy and return the
        top-level parents separately.
        """
        dn_list = []

        # Use dummy/empty lists if we're not creating a parent/child hierarchy
        if children is None:
            children = []

        if parent_list is None:
            parent_list = []

        # Create the parents first, then the children.
        # This makes it easier to see in debug when GET_ANC takes effect
        # because the parent/children become interleaved (by default,
        # this approach means the objects are organized into blocks of
        # parents and blocks of children together)
        for x in range(start, end):
            ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
            self.add_object(ou)
            dn_list.append(ou)

            # keep track of the top-level parents (if needed)
            parent_list.append(ou)

        # create the block of children (if needed)
        for x in range(start, end):
            for child in children:
                ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
                self.add_object(ou)
                dn_list.append(ou)

        return dn_list

    def assert_expected_data(self, expected_list):
        """
        Asserts that we received all the DNs that we expected and
        none are missing.
        """
        received_list = self.rxd_dn_list

        # Note that with GET_ANC Windows can end up sending the same parent
        # object multiple times, so this might be noteworthy but doesn't
        # warrant failing the test
        num_received = len(received_list)
        num_expected = len(expected_list)
        if num_received != num_expected:
            print("Note: received %d objects but expected %d" % (num_received,
                                                                 num_expected))

        # Check that we received every object that we were expecting
        for dn in expected_list:
            self.assertTrue(dn in received_list,
                            "DN '%s' missing from replication." % dn)

    def test_repl_integrity(self):
        """
        Modify the objects being replicated while the replication is still
        in progress and check that no object loss occurs.
        """

        # The server behaviour differs between samba and Windows. Samba returns
        # the objects in the original order (up to the pre-modify HWM). Windows
        # incorporates the modified objects and returns them in the new order
        # (i.e. modified objects last), up to the post-modify HWM. The
        # Microsoft docs state the Windows behaviour is optional.

        # Create a range of objects to replicate.
        expected_dn_list = self.create_object_range(0, 400)
        (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)

        # We ask for the first page of 100 objects.
        # For this test, we don't care what order we receive the objects in,
        # so long as by the end we've received everything
        self.repl_get_next()

        # Modify some of the second page of objects. This should bump the
        # highwatermark
        for x in range(100, 200):
            self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)

        (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
        self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)

        # Get the remaining blocks of data
        while not self.replication_complete():
            self.repl_get_next()

        # Check we still receive all the objects we're expecting
        self.assert_expected_data(expected_dn_list)

    def is_parent_known(self, dn, known_dn_list):
        """
        Returns True if the parent of the dn specified is in known_dn_list
        """

        # we can sometimes get system objects like the RID Manager returned.
        # Ignore anything that is not under the test OU we created
        if self.ou not in dn:
            return True

        # Remove the child portion from the name to get the parent's DN
        name_substrings = dn.split(",")
        del name_substrings[0]

        parent_dn = ",".join(name_substrings)

        # check either this object is a parent (it's parent is the top-level
        # test object), or its parent has been seen previously
        return parent_dn == self.ou or parent_dn in known_dn_list

    def _repl_send_request(self, get_anc=False, get_tgt=False):
        """
        Sends a GetNCChanges request for the next block of replication data.
        """

        # we're just trying to mimic regular client behaviour here, so just
        # use the highwatermark in the last response we received
        if self.last_ctr:
            highwatermark = self.last_ctr.new_highwatermark
            uptodateness_vector = self.last_ctr.uptodateness_vector
        else:
            # this is the first replication chunk
            highwatermark = None
            uptodateness_vector = None

        # Ask for the next block of replication data
        replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
        more_flags = 0

        if get_anc:
            replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
            self.used_get_anc = True

        if get_tgt:
            more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
            self.used_get_tgt = True

        # return the response from the DC
        return self._get_replication(replica_flags,
                                     max_objects=self.max_objects,
                                     highwatermark=highwatermark,
                                     uptodateness_vector=uptodateness_vector,

                                     more_flags=more_flags)

    def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
        """
        Requests the next block of replication data. This tries to simulate
        client behaviour - if we receive a replicated object that we don't know
        the parent of, then re-request the block with the GET_ANC flag set.
        If we don't know the target object for a linked attribute, then
        re-request with GET_TGT.
        """

        # send a request to the DC and get the response
        ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)

        # extract the object DNs and their GUIDs from the response
        rxd_dn_list = self._get_ctr6_dn_list(ctr6)
        rxd_guid_list = self._get_ctr6_object_guids(ctr6)

        # we'll add new objects as we discover them, so take a copy of the
        # ones we already know about, so we can modify these lists safely
        known_objects = self.rxd_dn_list[:]
        known_guids = self.rxd_guids[:]

        # check that we know the parent for every object received
        for i in range(0, len(rxd_dn_list)):

            dn = rxd_dn_list[i]
            guid = rxd_guid_list[i]

            if self.is_parent_known(dn, known_objects):

                # the new DN is now known so add it to the list.
                # It may be the parent of another child in this block
                known_objects.append(dn)
                known_guids.append(guid)
            else:
                # If we've already set the GET_ANC flag then it should mean
                # we receive the parents before the child
                self.assertFalse(get_anc, "Unknown parent for object %s" % dn)

                print("Unknown parent for %s - try GET_ANC" % dn)

                # try the same thing again with the GET_ANC flag set this time
                return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
                                          assert_links=assert_links)

        # check we know about references to any objects in the linked attrs
        received_links = self._get_ctr6_links(ctr6)

        # This is so that older versions of Samba fail - we want the links to
        # be sent roughly with the objects, rather than getting all links at
        # the end
        if assert_links:
            self.assertTrue(len(received_links) > 0,
                            "Links were expected in the GetNCChanges response")

        for link in received_links:

            # skip any links that aren't part of the test
            if self.ou not in link.targetDN:
                continue

            # check the source object is known (Windows can actually send links
            # where we don't know the source object yet). Samba shouldn't ever
            # hit this case because it gets the links based on the source
            if link.identifier not in known_guids:

                # If we've already set the GET_ANC flag then it should mean
                # this case doesn't happen
                self.assertFalse(get_anc, "Unknown source object for GUID %s"
                                 % link.identifier)

                print("Unknown source GUID %s - try GET_ANC" % link.identifier)

                # try the same thing again with the GET_ANC flag set this time
                return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
                                          assert_links=assert_links)

            # check we know the target object
            if link.targetGUID not in known_guids:

                # If we've already set the GET_TGT flag then we should have
                # already received any objects we need to know about
                self.assertFalse(get_tgt, "Unknown linked target for object %s"
                                 % link.targetDN)

                print("Unknown target for %s - try GET_TGT" % link.targetDN)

                # try the same thing again with the GET_TGT flag set this time
                return self.repl_get_next(get_anc=get_anc, get_tgt=True,
                                          assert_links=assert_links)

        # store the last successful result so we know what HWM to request next
        self.last_ctr = ctr6

        # store the objects, GUIDs, and links we received
        self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
        self.rxd_links += self._get_ctr6_links(ctr6)
        self.rxd_guids += self._get_ctr6_object_guids(ctr6)

        return ctr6

    def replication_complete(self):
        """Returns True if the current/last replication cycle is complete"""

        if self.last_ctr is None or self.last_ctr.more_data:
            return False
        else:
            return True

    def test_repl_integrity_get_anc(self):
        """
        Modify the parent objects being replicated while the replication is
        still in progress (using GET_ANC) and check that no object loss occurs.
        """

        # Note that GET_ANC behaviour varies between Windows and Samba.
        # On Samba GET_ANC results in the replication restarting from the very
        # beginning. After that, Samba remembers GET_ANC and also sends the
        # parents in subsequent requests (regardless of whether GET_ANC is
        # specified in the later request).
        # Windows only sends the parents if GET_ANC was specified in the last
        # request. It will also resend a parent, even if it's already sent the
        # parent in a previous response (whereas Samba doesn't).

        # Create a small block of 50 parents, each with 2 children (A and B)
        # This is so that we receive some children in the first block, so we
        # can resend with GET_ANC before we learn too many parents
        parent_dn_list = []
        expected_dn_list = self.create_object_range(0, 50, prefix="parent",
                                                    children=("A", "B"),
                                                    parent_list=parent_dn_list)

        # create the remaining parents and children
        expected_dn_list += self.create_object_range(50, 150, prefix="parent",
                                                     children=("A", "B"),
                                                     parent_list=parent_dn_list)

        # We've now got objects in the following order:
        # [50 parents][100 children][100 parents][200 children]

        # Modify the first parent so that it's now ordered last by USN
        # This means we set the GET_ANC flag pretty much straight away
        # because we receive the first child before the first parent
        self.modify_object(parent_dn_list[0], "displayName", "OU0")

        # modify a later block of parents so they also get reordered
        for x in range(50, 100):
            self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)

        # Get the first block of objects - this should resend the request with
        # GET_ANC set because we won't know about the first child's parent.
        # On samba GET_ANC essentially starts the sync from scratch again, so
        # we get this over with early before we learn too many parents
        self.repl_get_next()

        # modify the last chunk of parents. They should now have a USN higher
        # than the highwater-mark for the replication cycle
        for x in range(100, 150):
            self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)

        # Get the remaining blocks of data - this will resend the request with
        # GET_ANC if it encounters an object it doesn't have the parent for.
        while not self.replication_complete():
            self.repl_get_next()

        # The way the test objects have been created should force
        # self.repl_get_next() to use the GET_ANC flag. If this doesn't
        # actually happen, then the test isn't doing its job properly
        self.assertTrue(self.used_get_anc,
                        "Test didn't use the GET_ANC flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(expected_dn_list)

    def assert_expected_links(self, objects_with_links, link_attr="managedBy",
                              num_expected=None):
        """
        Asserts that a GetNCChanges response contains any expected links
        for the objects it contains.
        """
        received_links = self.rxd_links

        if num_expected is None:
            num_expected = len(objects_with_links)

        self.assertTrue(len(received_links) == num_expected,
                        "Received %d links but expected %d"
                        % (len(received_links), num_expected))

        for dn in objects_with_links:
            self.assert_object_has_link(dn, link_attr, received_links)

    def assert_object_has_link(self, dn, link_attr, received_links):
        """
        Queries the object in the DB and asserts there is a link in the
        GetNCChanges response that matches.
        """

        # Look up the link attribute in the DB
        # The extended_dn option will dump the GUID info for the link
        # attribute (as a hex blob)
        res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
                                      attrs=[link_attr],
                                      controls=['extended_dn:1:0'],
                                      scope=ldb.SCOPE_BASE)

        # We didn't find the expected link attribute in the DB for the object.
        # Something has gone wrong somewhere...
        self.assertTrue(link_attr in res[0],
                        "%s in DB doesn't have attribute %s" % (dn, link_attr))

        # find the received link in the list and assert that the target and
        # source GUIDs match what's in the DB
        for val in [str(val) for val in res[0][link_attr]]:
            # Work out the expected source and target GUIDs for the DB link
            target_dn = ldb.Dn(self.test_ldb_dc, val)
            targetGUID_blob = target_dn.get_extended_component("GUID")
            sourceGUID_blob = res[0].dn.get_extended_component("GUID")

            found = False

            for link in received_links:
                if link.selfGUID_blob == sourceGUID_blob and \
                   link.targetGUID_blob == targetGUID_blob:

                    found = True

                    if self._debug:
                        print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
                    break

            self.assertTrue(found,
                            "Did not receive expected link for DN %s" % dn)

    def test_repl_get_tgt(self):
        """
        Creates a scenario where we should receive the linked attribute before
        we know about the target object, and therefore need to use GET_TGT.
        Note: Samba currently avoids this problem by sending all its links last
        """

        # create the test objects
        reportees = self.create_object_range(0, 100, prefix="reportee")
        managers = self.create_object_range(0, 100, prefix="manager")
        all_objects = managers + reportees
        expected_links = reportees

        # add a link attribute to each reportee object that points to the
        # corresponding manager object as the target
        for i in range(0, 100):
            self.modify_object(reportees[i], "managedBy", managers[i])

        # touch the managers (the link-target objects) again to make sure the
        # reportees (link source objects) get returned first by the replication
        for i in range(0, 100):
            self.modify_object(managers[i], "displayName", "OU%d" % i)

        links_expected = True

        # Get all the replication data - this code should resend the requests
        # with GET_TGT
        while not self.replication_complete():

            # get the next block of replication data (this sets GET_TGT
            # if needed)
            self.repl_get_next(assert_links=links_expected)
            links_expected = len(self.rxd_links) < len(expected_links)

        # The way the test objects have been created should force
        # self.repl_get_next() to use the GET_TGT flag. If this doesn't
        # actually happen, then the test isn't doing its job properly
        self.assertTrue(self.used_get_tgt,
                        "Test didn't use the GET_TGT flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(all_objects)

        # Check we received links for all the reportees
        self.assert_expected_links(expected_links)

    def test_repl_get_tgt_chain(self):
        """
        Tests the behaviour of GET_TGT with a more complicated scenario.
        Here we create a chain of objects linked together, so if we follow
        the link target, then we'd traverse ~200 objects each time.
        """

        # create the test objects
        objectsA = self.create_object_range(0, 100, prefix="AAA")
        objectsB = self.create_object_range(0, 100, prefix="BBB")
        objectsC = self.create_object_range(0, 100, prefix="CCC")

        # create a complex set of object links:
        #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
        # Basically each object-A should link to a circular chain of 200 B/C
        # objects. We create the links in separate chunks here, as it makes it
        # clearer what happens with the USN (links on Windows have their own
        # USN, so this approach means the A->B/B->C links aren't interleaved)
        for i in range(0, 100):
            self.modify_object(objectsA[i], "managedBy", objectsB[i])

        for i in range(0, 100):
            self.modify_object(objectsB[i], "managedBy",
                               objectsC[(i + 1) % 100])

        for i in range(0, 100):
            self.modify_object(objectsC[i], "managedBy",
                               objectsB[(i + 1) % 100])

        all_objects = objectsA + objectsB + objectsC
        expected_links = all_objects

        # the default order the objects now get returned in should be:
        # [A0-A99][B0-B99][C0-C99]

        links_expected = True

        # Get all the replication data - this code should resend the requests
        # with GET_TGT
        while not self.replication_complete():

            # get the next block of replication data (this sets GET_TGT
            # if needed)
            self.repl_get_next(assert_links=links_expected)
            links_expected = len(self.rxd_links) < len(expected_links)

        # The way the test objects have been created should force
        # self.repl_get_next() to use the GET_TGT flag. If this doesn't
        # actually happen, then the test isn't doing its job properly
        self.assertTrue(self.used_get_tgt,
                        "Test didn't use the GET_TGT flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(all_objects)

        # Check we received links for all the reportees
        self.assert_expected_links(expected_links)

    def test_repl_integrity_link_attr(self):
        """
        Tests adding links to new objects while a replication is in progress.
        """

        # create some source objects for the linked attributes, sandwiched
        # between 2 blocks of filler objects
        filler = self.create_object_range(0, 100, prefix="filler")
        reportees = self.create_object_range(0, 100, prefix="reportee")
        filler += self.create_object_range(100, 200, prefix="filler")

        # Start the replication and get the first block of filler objects
        # (We're being mean here and setting the GET_TGT flag right from the
        # start. On earlier Samba versions, if the client encountered an
        # unknown target object and retried with GET_TGT, it would restart the
        # replication cycle from scratch, which avoids the problem).
        self.repl_get_next(get_tgt=True)

        # create the target objects and add the links. These objects should be
        # outside the scope of the Samba replication cycle, but the links
        # should still get sent with the source object
        managers = self.create_object_range(0, 100, prefix="manager")

        for i in range(0, 100):
            self.modify_object(reportees[i], "managedBy", managers[i])

        expected_objects = managers + reportees + filler
        expected_links = reportees

        # complete the replication
        while not self.replication_complete():
            self.repl_get_next(get_tgt=True)

        # If we didn't receive the most recently created objects in the last
        # replication cycle, then kick off another replication to get them
        if len(self.rxd_dn_list) < len(expected_objects):
            self.repl_get_next()

            while not self.replication_complete():
                self.repl_get_next()

        # Check we get all the objects we're expecting
        self.assert_expected_data(expected_objects)

        # Check we received links for all the parents
        self.assert_expected_links(expected_links)

    def test_repl_get_anc_link_attr(self):
        """
        A basic GET_ANC test where the parents have linked attributes
        """

        # Create a block of 100 parents and 100 children
        parent_dn_list = []
        expected_dn_list = self.create_object_range(0, 100, prefix="parent",
                                                    children=("A"),
                                                    parent_list=parent_dn_list)

        # Add links from the parents to the children
        for x in range(0, 100):
            self.modify_object(parent_dn_list[x], "managedBy",
                               expected_dn_list[x + 100])

        # add some filler objects at the end. This allows us to easily see
        # which chunk the links get sent in
        expected_dn_list += self.create_object_range(0, 100, prefix="filler")

        # We've now got objects in the following order:
        # [100 x children][100 x parents][100 x filler]

        # Get the replication data - because the block of children come first,
        # this should retry the request with GET_ANC
        while not self.replication_complete():
            self.repl_get_next()

        self.assertTrue(self.used_get_anc,
                        "Test didn't use the GET_ANC flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(expected_dn_list)

        # Check we received links for all the parents
        self.assert_expected_links(parent_dn_list)

    def test_repl_get_tgt_and_anc(self):
        """
        Check we can resolve an unknown ancestor when fetching the link target,
        i.e. tests using GET_TGT and GET_ANC in combination
        """

        # Create some parent/child objects (the child will be the link target)
        parents = []
        all_objects = self.create_object_range(0, 100, prefix="parent",
                                               children=["la_tgt"],
                                               parent_list=parents)

        children = [item for item in all_objects if item not in parents]

        # create the link source objects and link them to the child/target
        la_sources = self.create_object_range(0, 100, prefix="la_src")
        all_objects += la_sources

        for i in range(0, 100):
            self.modify_object(la_sources[i], "managedBy", children[i])

        expected_links = la_sources

        # modify the children/targets so they come after the link source
        for x in range(0, 100):
            self.modify_object(children[x], "displayName", "OU%d" % x)

        # modify the parents, so they now come last in the replication
        for x in range(0, 100):
            self.modify_object(parents[x], "displayName", "OU%d" % x)

        # We've now got objects in the following order:
        # [100 la_source][100 la_target][100 parents (of la_target)]

        links_expected = True

        # Get all the replication data - this code should resend the requests
        # with GET_TGT and GET_ANC
        while not self.replication_complete():

            # get the next block of replication data (this sets
            # GET_TGT/GET_ANC)
            self.repl_get_next(assert_links=links_expected)
            links_expected = len(self.rxd_links) < len(expected_links)

        # The way the test objects have been created should force
        # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
        # doesn't actually happen, then the test isn't doing its job properly
        self.assertTrue(self.used_get_tgt,
                        "Test didn't use the GET_TGT flag as expected")
        self.assertTrue(self.used_get_anc,
                        "Test didn't use the GET_ANC flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(all_objects)

        # Check we received links for all the link sources
        self.assert_expected_links(expected_links)

        # Second part of test. Add some extra objects and kick off another
        # replication. The test code will use the HWM from the last replication
        # so we'll only receive the objects we modify below
        self.start_new_repl_cycle()

        # add an extra level of grandchildren that hang off a child
        # that got created last time
        new_parent = "OU=test_new_parent,%s" % children[0]
        self.add_object(new_parent)
        new_children = []

        for x in range(0, 50):
            dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
            self.add_object(dn)
            new_children.append(dn)

        # replace half of the links to point to the new children
        for x in range(0, 50):
            self.delete_attribute(la_sources[x], "managedBy", children[x])
            self.modify_object(la_sources[x], "managedBy", new_children[x])

        # add some filler objects to fill up the 1st chunk
        filler = self.create_object_range(0, 100, prefix="filler")

        # modify the new children/targets so they come after the link source
        for x in range(0, 50):
            self.modify_object(new_children[x], "displayName", "OU-%d" % x)

        # modify the parent, so it now comes last in the replication
        self.modify_object(new_parent, "displayName", "OU%d" % x)

        # We should now get the modified objects in the following order:
        # [50 links (x 2)][100 filler][50 new children][new parent]
        # Note that the link sources aren't actually sent (their new linked
        # attributes are sent, but apart from that, nothing has changed)
        all_objects = filler + new_children + [new_parent]
        expected_links = la_sources[:50]

        links_expected = True

        while not self.replication_complete():
            self.repl_get_next(assert_links=links_expected)
            links_expected = len(self.rxd_links) < len(expected_links)

        self.assertTrue(self.used_get_tgt,
                        "Test didn't use the GET_TGT flag as expected")
        self.assertTrue(self.used_get_anc,
                        "Test didn't use the GET_ANC flag as expected")

        # Check we get all the objects we're expecting
        self.assert_expected_data(all_objects)

        # Check we received links (50 deleted links and 50 new)
        self.assert_expected_links(expected_links, num_expected=100)

    def _repl_integrity_obj_deletion(self, delete_link_source=True):
        """
        Tests deleting link objects while a replication is in progress.
        """

        # create some objects and link them together, with some filler
        # object in between the link sources
        la_sources = self.create_object_range(0, 100, prefix="la_source")
        la_targets = self.create_object_range(0, 100, prefix="la_targets")

        for i in range(0, 50):
            self.modify_object(la_sources[i], "managedBy", la_targets[i])

        filler = self.create_object_range(0, 100, prefix="filler")

        for i in range(50, 100):
            self.modify_object(la_sources[i], "managedBy", la_targets[i])

        # touch the targets so that the sources get replicated first
        for i in range(0, 100):
            self.modify_object(la_targets[i], "displayName", "OU%d" % i)

        # objects should now be in the following USN order:
        # [50 la_source][100 filler][50 la_source][100 la_target]

        # Get the first block containing 50 link sources
        self.repl_get_next()

        # delete either the link targets or link source objects
        if delete_link_source:
            objects_to_delete = la_sources
            # in GET_TGT testenvs we only receive the first 50 source objects
            expected_objects = la_sources[:50] + la_targets + filler
        else:
            objects_to_delete = la_targets
            expected_objects = la_sources + filler

        for obj in objects_to_delete:
            self.ldb_dc2.delete(obj)

        # complete the replication
        while not self.replication_complete():
            self.repl_get_next()

        # Check we get all the objects we're expecting
        self.assert_expected_data(expected_objects)

        # we can't use assert_expected_links() here because it tries to check
        # against the deleted objects on the DC. (Although we receive some
        # links from the first block processed, the Samba client should end up
        # deleting these, as the source/target object involved is deleted)
        self.assertTrue(len(self.rxd_links) == 50,
                        "Expected 50 links, not %d" % len(self.rxd_links))

    def test_repl_integrity_src_obj_deletion(self):
        self._repl_integrity_obj_deletion(delete_link_source=True)

    def test_repl_integrity_tgt_obj_deletion(self):
        self._repl_integrity_obj_deletion(delete_link_source=False)

    def restore_deleted_object(self, guid, new_dn):
        """Re-animates a deleted object"""

        guid_str = self._GUID_string(guid)
        res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
                                      attrs=["isDeleted"],
                                      controls=['show_deleted:1'],
                                      scope=ldb.SCOPE_BASE)
        if len(res) != 1:
            return

        msg = ldb.Message()
        msg.dn = res[0].dn
        msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
                                              "isDeleted")
        msg["distinguishedName"] = ldb.MessageElement([new_dn],
                                                      ldb.FLAG_MOD_REPLACE,
                                                      "distinguishedName")
        self.test_ldb_dc.modify(msg, ["show_deleted:1"])

    def sync_DCs(self, nc_dn=None):
        # make sure DC1 has all the changes we've made to DC2
        self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
                                nc_dn=nc_dn)

    def get_object_guid(self, dn):
        res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
                                      scope=ldb.SCOPE_BASE)
        return res[0]['objectGUID'][0]

    def set_dc_connection(self, conn):
        """
        Switches over the connection state info that the underlying drs_base
        class uses so that we replicate with a different DC.
        """
        self.default_hwm = conn.default_hwm
        self.default_utdv = conn.default_utdv
        self.drs = conn.drs
        self.drs_handle = conn.drs_handle
        self.set_test_ldb_dc(conn.ldb_dc)

    def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
                                             expected_links):
        """
        Replicates against both the primary and secondary DCs in the testenv
        and checks that both return the expected results.
        """
        print("Checking replication against primary test DC...")

        # get the replication data from the test DC first
        while not self.replication_complete():
            self.repl_get_next()

        # Check we get all the objects and links we're expecting
        self.assert_expected_data(all_objects)
        self.assert_expected_links(expected_links)

        # switch over the DC state info so we now talk to the peer DC
        self.set_dc_connection(peer_conn)
        self.init_test_state()

        print("Checking replication against secondary test DC...")

        # check that we get the same information from the 2nd DC
        while not self.replication_complete():
            self.repl_get_next()

        self.assert_expected_data(all_objects)
        self.assert_expected_links(expected_links)

        # switch back to using the default connection
        self.set_dc_connection(self.default_conn)

    def test_repl_integrity_obj_reanimation(self):
        """
        Checks receiving links for a re-animated object doesn't lose links.
        We test this against the peer DC to make sure it doesn't drop links.
        """

        # This test is a little different in that we're particularly interested
        # in exercising the replmd client code on the second DC.
        # First, make sure the peer DC has the base OU, then connect to it (so
        # we store its initial HWM)
        self.sync_DCs()
        peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)

        # create the link source/target objects
        la_sources = self.create_object_range(0, 100, prefix="la_src")
        la_targets = self.create_object_range(0, 100, prefix="la_tgt")

        # store the target object's GUIDs (we need to know these to
        # reanimate them)
        target_guids = []

        for dn in la_targets:
            target_guids.append(self.get_object_guid(dn))

        # delete the link target
        for x in range(0, 100):
            self.ldb_dc2.delete(la_targets[x])

        # sync the DCs, then disable replication. We want the peer DC to get
        # all the following changes in a single replication cycle
        self.sync_DCs()
        self._disable_all_repl(self.dnsname_dc2)

        # restore the target objects for the linked attributes again
        for x in range(0, 100):
            self.restore_deleted_object(target_guids[x], la_targets[x])

        # add the links
        for x in range(0, 100):
            self.modify_object(la_sources[x], "managedBy", la_targets[x])

        # create some additional filler objects
        filler = self.create_object_range(0, 100, prefix="filler")

        # modify the targets so they now come last
        for x in range(0, 100):
            self.modify_object(la_targets[x], "displayName", "OU-%d" % x)

        # the objects should now be sent in the following order:
        # [la sources + links][filler][la targets]
        all_objects = la_sources + la_targets + filler
        expected_links = la_sources

        # Enable replication again make sure the 2 DCs are back in sync
        self._enable_all_repl(self.dnsname_dc2)
        self.sync_DCs()

        # Get the replication data from each DC in turn.
        # Check that both give us all the objects and links we're expecting,
        # i.e. no links were lost
        self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
                                                  expected_links)

    def _test_repl_integrity_cross_partition_links(self, get_tgt=False):
        """
        Checks that a cross-partition link to an unknown target object does
        not result in missing links.
        """

        # check the peer DC is up-to-date, then connect (storing its HWM)
        self.sync_DCs()
        peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)

        # stop replication so the peer gets the following objects in one go
        self._disable_all_repl(self.dnsname_dc2)

        # optionally force the client-side to use GET_TGT locally, by adding a
        # one-way link to a missing/deleted target object
        if get_tgt:
            missing_target = "OU=missing_tgt,%s" % self.ou
            self.add_object(missing_target)
            get_tgt_source = "CN=get_tgt_src,%s" % self.ou
            self.add_object(get_tgt_source,
                            objectclass="msExchConfigurationContainer")
            self.modify_object(get_tgt_source, "addressBookRoots2",
                               missing_target)
            self.test_ldb_dc.delete(missing_target)

        # create a link source object in the main NC
        la_source = "OU=cross_nc_src,%s" % self.ou
        self.add_object(la_source)

        # create the link target (a server object) in the config NC
        sites_dn = "CN=Sites,%s" % self.config_dn
        servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
        rand = random.randint(1, 10000000)
        la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
        self.add_object(la_target, objectclass="server")

        # add a cross-partition link between the two
        self.modify_object(la_source, "managedBy", la_target)

        # First, sync to the peer the NC containing the link source object
        self.sync_DCs()

        # Now, before the peer has received the partition containing the target
        # object, try replicating from the peer. It will only know about half
        # of the link at this point, but it should be a valid scenario
        self.set_dc_connection(peer_conn)

        while not self.replication_complete():
            # pretend we've received other link targets out of order and that's
            # forced us to use GET_TGT. This checks the peer doesn't fail
            # trying to fetch a cross-partition target object that doesn't
            # exist
            self.repl_get_next(get_tgt=True)

        self.set_dc_connection(self.default_conn)

        # delete the GET_TGT test object. We're not interested in asserting its
        # links - it was just there to make the client use GET_TGT (and it
        # creates an inconsistency because one DC correctly ignores the link,
        # because it points to a deleted object)
        if get_tgt:
            self.test_ldb_dc.delete(get_tgt_source)

        self.init_test_state()

        # Now sync across the partition containing the link target object
        self.sync_DCs(nc_dn=self.config_dn)
        self._enable_all_repl(self.dnsname_dc2)

        # Get the replication data from each DC in turn.
        # Check that both return the cross-partition link (note we're not
        # checking the config domain NC here for simplicity)
        self.assert_DCs_replication_is_consistent(peer_conn,
                                                  all_objects=[la_source],
                                                  expected_links=[la_source])

        # the cross-partition linked attribute has a missing backlink. Check
        # that we can still delete it successfully
        self.delete_attribute(la_source, "managedBy", la_target)
        self.sync_DCs()

        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
                                      attrs=["managedBy"],
                                      controls=['extended_dn:1:0'],
                                      scope=ldb.SCOPE_BASE)
        self.assertFalse("managedBy" in res[0],
                         "%s in DB still has managedBy attribute" % la_source)
        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
                                      attrs=["managedBy"],
                                      controls=['extended_dn:1:0'],
                                      scope=ldb.SCOPE_BASE)
        self.assertFalse("managedBy" in res[0],
                         "%s in DB still has managedBy attribute" % la_source)

        # Check receiving a cross-partition link to a deleted target.
        # Delete the target and make sure the deletion is sync'd between DCs
        target_guid = self.get_object_guid(la_target)
        self.test_ldb_dc.delete(la_target)
        self.sync_DCs(nc_dn=self.config_dn)
        self._disable_all_repl(self.dnsname_dc2)

        # re-animate the target
        self.restore_deleted_object(target_guid, la_target)
        self.modify_object(la_source, "managedBy", la_target)

        # now sync the link - because the target is in another partition, the
        # peer DC receives a link for a deleted target, which it should accept
        self.sync_DCs()
        res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
                                      attrs=["managedBy"],
                                      controls=['extended_dn:1:0'],
                                      scope=ldb.SCOPE_BASE)
        self.assertTrue("managedBy" in res[0],
                        "%s in DB missing managedBy attribute" % la_source)

        # cleanup the server object we created in the Configuration partition
        self.test_ldb_dc.delete(la_target)
        self._enable_all_repl(self.dnsname_dc2)

    def test_repl_integrity_cross_partition_links(self):
        self._test_repl_integrity_cross_partition_links(get_tgt=False)

    def test_repl_integrity_cross_partition_links_with_tgt(self):
        self._test_repl_integrity_cross_partition_links(get_tgt=True)

    def test_repl_get_tgt_multivalued_links(self):
        """Tests replication with multi-valued link attributes."""

        # create the target/source objects and link them together
        la_targets = self.create_object_range(0, 500, prefix="la_tgt")
        la_source = "CN=la_src,%s" % self.ou
        self.add_object(la_source, objectclass="msExchConfigurationContainer")

        for tgt in la_targets:
            self.modify_object(la_source, "addressBookRoots2", tgt)

        filler = self.create_object_range(0, 100, prefix="filler")

        # We should receive the objects/links in the following order:
        # [500 targets + 1 source][500 links][100 filler]
        expected_objects = la_targets + [la_source] + filler
        link_only_chunk = False

        # First do the replication without needing GET_TGT
        while not self.replication_complete():
            ctr6 = self.repl_get_next()

            if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
                link_only_chunk = True

        # we should receive one chunk that contains only links
        self.assertTrue(link_only_chunk,
                        "Expected to receive a chunk containing only links")

        # check we received all the expected objects/links
        self.assert_expected_data(expected_objects)
        self.assert_expected_links([la_source], link_attr="addressBookRoots2",
                                   num_expected=500)

        # Do the replication again, forcing the use of GET_TGT this time
        self.init_test_state()

        for x in range(0, 500):
            self.modify_object(la_targets[x], "displayName", "OU-%d" % x)

        # The objects/links should get sent in the following order:
        # [1 source][500 targets][500 links][100 filler]

        while not self.replication_complete():
            ctr6 = self.repl_get_next()

        self.assertTrue(self.used_get_tgt,
                        "Test didn't use the GET_TGT flag as expected")

        # check we received all the expected objects/links
        self.assert_expected_data(expected_objects)
        self.assert_expected_links([la_source], link_attr="addressBookRoots2",
                                   num_expected=500)


    def test_InvalidNC_DummyDN_InvalidGUID_full_repl(self):
        """Test full replication on a totally invalid GUID fails with the right error code"""
        dc_guid_1 = self.ldb_dc1.get_invocation_id()
        drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)

        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
                               invocation_id=dc_guid_1,
                               nc_dn_str="DummyDN",
                               nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
                               exop=drsuapi.DRSUAPI_EXOP_NONE,
                               max_objects=1)

        (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
        try:
            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
        except WERRORError as e1:
            (enum, estr) = e1.args
            self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)

    def test_DummyDN_valid_GUID_full_repl(self):
        dc_guid_1 = self.ldb_dc1.get_invocation_id()
        drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)

        res = self.ldb_dc1.search(base=self.base_dn, scope=SCOPE_BASE,
                                  attrs=["objectGUID"])

        guid = misc.GUID(res[0]["objectGUID"][0])

        req8 = self._exop_req8(dest_dsa=None,
                               invocation_id=dc_guid_1,
                               nc_dn_str="DummyDN",
                               nc_guid=guid,
                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP |
                               drsuapi.DRSUAPI_DRS_GET_ANC,
                               exop=drsuapi.DRSUAPI_EXOP_NONE,
                               max_objects=1)

        try:
            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
        except WERRORError as e1:
            (enum, estr) = e1.args
            self.fail(f"Failed to call GetNCChanges with DummyDN and a GUID: {estr}")

        # The NC should be the first object returned due to GET_ANC
        self.assertEqual(ctr.first_object.object.identifier.guid, guid)

    def _test_do_full_repl_no_overlap(self, mix=True, get_anc=False):
        self.default_hwm = drsuapi.DsReplicaHighWaterMark()

        # We set get_anc=True so we can assert the BASE DN will be the
        # first object
        ctr6 = self._repl_send_request(get_anc=get_anc)
        guid_list_1 = self._get_ctr6_object_guids(ctr6)

        if mix:
            dc_guid_1 = self.ldb_dc1.get_invocation_id()

            req8 = self._exop_req8(dest_dsa=None,
                                   invocation_id=dc_guid_1,
                                   nc_dn_str=self.ldb_dc1.get_default_basedn(),
                                   exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)

            (level, ctr_repl_obj) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)

            self.assertEqual(ctr_repl_obj.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)

            repl_obj_guid_list = self._get_ctr6_object_guids(ctr_repl_obj)

            self.assertEqual(len(repl_obj_guid_list), 1)

            # This should be the first object in the main replication due
            # to get_anc=True above in one case, and a rule that the NC must be first regardless otherwise
            self.assertEqual(repl_obj_guid_list[0], guid_list_1[0])

        self.last_ctr = ctr6
        ctr6 = self._repl_send_request(get_anc=True)
        guid_list_2 = self._get_ctr6_object_guids(ctr6)

        self.assertNotEqual(guid_list_1, guid_list_2)

    def test_do_full_repl_no_overlap_get_anc(self):
        """
        Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes
        """
        self._test_do_full_repl_no_overlap(mix=False, get_anc=True)

    def test_do_full_repl_no_overlap(self):
        """
        Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes
        """
        self._test_do_full_repl_no_overlap(mix=False)

    def test_do_full_repl_mix_no_overlap(self):
        """
        Make sure that a full replication on an nc succeeds to the goal despite needing multiple passes

        Assert this is true even if we do a REPL_OBJ in between the replications

        """
        self._test_do_full_repl_no_overlap(mix=True)

    def nc_change(self):
        old_base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
                                                       scope=SCOPE_BASE,
                                                       attrs=["oEMInformation"])
        rec_cleanup = {"dn": self.base_dn,
                       "oEMInformation": old_base_msg[0]["oEMInformation"][0]}
        m_cleanup = ldb.Message.from_dict(self.default_conn.ldb_dc,
                                          rec_cleanup,
                                          ldb.FLAG_MOD_REPLACE)

        self.addCleanup(self.default_conn.ldb_dc.modify, m_cleanup)

        rec = {"dn": self.base_dn,
               "oEMInformation": f"Tortured by Samba's getncchanges.py {self.id()} against {self.default_conn.dnsname_dc}"}
        m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
        self.default_conn.ldb_dc.modify(m)

    def _test_repl_nc_is_first(self, start_at_zero=True, nc_change=True, ou_change=True, mid_change=False):
        """Tests that the NC is always replicated first, but does not move the
           tmp_highest_usn at that point, just like 'early' GET_ANC objects.
        """

        # create objects, twice more than the page size of 133
        objs = self.create_object_range(0, 300, prefix="obj")

        if nc_change:
            self.nc_change()

        if mid_change:
            # create even more objects
            objs = self.create_object_range(301, 450, prefix="obj2")

        base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
                                                   scope=SCOPE_BASE,
                                                   attrs=["uSNChanged",
                                                          "objectGUID"])

        base_guid = misc.GUID(base_msg[0]["objectGUID"][0])
        base_usn = int(base_msg[0]["uSNChanged"][0])

        if ou_change:
            # Make one more modification.  We want to assert we have
            # caught up to the base DN, but Windows both promotes the NC
            # to the front and skips including it in the tmp_highest_usn,
            # so we make a later modification that will be to show we get
            # this change.
            rec = {"dn": self.ou,
                   "postalCode": "0"}
            m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
            self.default_conn.ldb_dc.modify(m)

        ou_msg = self.default_conn.ldb_dc.search(base=self.ou,
                                                   scope=SCOPE_BASE,
                                                   attrs=["uSNChanged",
                                                          "objectGUID"])

        ou_guid = misc.GUID(ou_msg[0]["objectGUID"][0])
        ou_usn = int(ou_msg[0]["uSNChanged"][0])

        # Check some predicates about USN ordering that the below tests will rely on
        if ou_change and nc_change:
            self.assertGreater(ou_usn, base_usn)
        elif not ou_change and nc_change:
            self.assertGreater(base_usn, ou_usn)

        ctr6 = self.repl_get_next()

        guid_list_1 = self._get_ctr6_object_guids(ctr6)
        if nc_change or start_at_zero:
            self.assertEqual(base_guid, misc.GUID(guid_list_1[0]))
            self.assertIn(str(base_guid), guid_list_1)
            self.assertNotIn(str(base_guid), guid_list_1[1:])
        else:
            self.assertNotEqual(base_guid, misc.GUID(guid_list_1[0]))
            self.assertNotIn(str(base_guid), guid_list_1)

        self.assertTrue(ctr6.more_data)

        if not ou_change and nc_change:
            self.assertLess(ctr6.new_highwatermark.tmp_highest_usn, base_usn)

        i = 0
        while not self.replication_complete():
            i = i + 1
            last_tmp_highest_usn = ctr6.new_highwatermark.tmp_highest_usn
            ctr6 = self.repl_get_next()
            guid_list_2 = self._get_ctr6_object_guids(ctr6)
            if len(guid_list_2) > 0:
                self.assertNotEqual(last_tmp_highest_usn, ctr6.new_highwatermark.tmp_highest_usn)

            if (nc_change or start_at_zero) and base_usn > last_tmp_highest_usn:
                self.assertEqual(base_guid, misc.GUID(guid_list_2[0]),
                                 f"pass={i} more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
                self.assertIn(str(base_guid), guid_list_2,
                                 f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
            else:
                self.assertNotIn(str(base_guid), guid_list_2,
                                 f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")

        if ou_change:
            # The modification to the base OU should be in the final chunk
            self.assertIn(str(ou_guid), guid_list_2)
            self.assertGreaterEqual(ctr6.new_highwatermark.highest_usn,
                                    ou_usn)
        else:
            # Show that the NC root change does not show up in the
            # highest_usn.  We either get the change before or after
            # it.
            self.assertNotEqual(ctr6.new_highwatermark.highest_usn,
                                base_usn)
        self.assertEqual(ctr6.new_highwatermark.highest_usn,
                          ctr6.new_highwatermark.tmp_highest_usn)

        self.assertFalse(ctr6.more_data)

    def test_repl_nc_is_first_start_zero_nc_change(self):
        self.default_hwm = drsuapi.DsReplicaHighWaterMark()
        self._test_repl_nc_is_first(start_at_zero=True, nc_change=True, ou_change=True)

    def test_repl_nc_is_first_start_zero(self):
        # Get the NC change in the middle of the replication stream, certainly not at the start or end
        self.nc_change()
        self.default_hwm = drsuapi.DsReplicaHighWaterMark()
        self._test_repl_nc_is_first(start_at_zero=True, nc_change=False, ou_change=False)

    def test_repl_nc_is_first_mid(self):
        # This is a modification of the next test, that Samba
        # will pass as it will always include the NC in the
        # tmp_highest_usn at the point where it belongs
        self._test_repl_nc_is_first(start_at_zero=False,
                                    nc_change=True,
                                    ou_change=True,
                                    mid_change=True)

    def test_repl_nc_is_first(self):
        # This is a modification of the next test, that Samba
        # will pass as it will always include the NC in the
        # tmp_highest_usn at the point where it belongs
        self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=True)

    def test_repl_nc_is_first_nc_change_only(self):
        # This shows that the NC change is not reflected in the tmp_highest_usn
        self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=False)

    def test_repl_nc_is_first_no_change(self):
        # The NC should not be present in this replication
        self._test_repl_nc_is_first(start_at_zero=False, nc_change=False, ou_change=False)

class DcConnection:
    """Helper class to track a connection to another DC"""

    def __init__(self, drs_base, ldb_dc, dnsname_dc):
        self.ldb_dc = ldb_dc
        (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
        (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
        self.default_utdv = utdv
        self.dnsname_dc = dnsname_dc