summaryrefslogtreecommitdiffstats
path: root/source4/torture/drs/python/getnc_unpriv.py
blob: c53906acd8fc4a0dbce74208e3ae3eea4e32b460 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Tests replication scenarios with different user privileges.
# We want to test every replication scenario we can think of against:
# - users with only GET_CHANGES privileges
# - users with only GET_ALL_CHANGES privileges
# - users with both GET_CHANGES and GET_ALL_CHANGES privileges
# - users with no privileges
#
# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

#
# Usage:
#  export DC1=dc1_dns_name
#  export DC2=dc2_dns_name
#  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
#  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_unpriv -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#

import drs_base
import samba.tests
from samba import werror, WERRORError

from samba import sd_utils
import ldb
from ldb import SCOPE_BASE
import random

from samba.dcerpc import drsuapi, security
from samba.credentials import DONT_USE_KERBEROS


class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
    """Confirm the behaviour of DsGetNCChanges for unprivileged users"""

    def setUp(self):
        super(DrsReplicaSyncUnprivTestCase, self).setUp()
        self.get_changes_user = "get-changes-user"
        self.base_dn = self.ldb_dc1.get_default_basedn()
        self.user_pass = samba.generate_random_password(12, 16)

        # add some randomness to the test OU. (Deletion of the last test's
        # objects can be slow to replicate out. So the OU created by a previous
        # testenv may still exist at this point).
        rand = random.randint(1, 10000000)
        test_ou = "OU=test_getnc_unpriv%d" % rand
        self.ou = "%s,%s" % (test_ou, self.base_dn)
        self.ldb_dc1.add({
            "dn": self.ou,
            "objectclass": "organizationalUnit"})
        self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
                             userou=test_ou)
        (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)

        self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
        self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
        user_sid = self.sd_utils.get_object_sid(self.user_dn)
        self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
                                                        str(user_sid))
        self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
                                                            str(user_sid))
        self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)

        # We set DONT_USE_KERBEROS to avoid a race with getting the
        # user replicated to our selected KDC
        self.user_creds = self.insta_creds(template=self.get_credentials(),
                                           username=self.get_changes_user,
                                           userpass=self.user_pass,
                                           kerberos_state=DONT_USE_KERBEROS)
        (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
                                                              self.user_creds)

    def tearDown(self):
        self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
        try:
            self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
        except ldb.LdbError as e1:
            (enum, string) = e1.args
            if enum == ldb.ERR_NO_SUCH_OBJECT:
                pass
        super(DrsReplicaSyncUnprivTestCase, self).tearDown()

    def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None,
                        partial_attribute_set=None):
        """
        Common function to send a replication request and check the result
        matches what's expected.
        """
        req8 = self._exop_req8(dest_dsa=dest_dsa,
                               invocation_id=self.ldb_dc1.get_invocation_id(),
                               nc_dn_str=repl_obj,
                               exop=exop,
                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP,
                               partial_attribute_set=partial_attribute_set)

        if expected_error is None:
            # user is OK, request should be accepted without throwing an error
            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
                                                        8, req8)
        else:
            # check the request is rejected (with the error we're expecting)
            try:
                (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
                                                            8, req8)
                self.fail("Should have failed with user denied access")
            except WERRORError as e:
                (enum, estr) = e.args
                self.assertTrue(enum in expected_error,
                                "Got unexpected error: %s" % estr)

    def _test_repl_single_obj(self, repl_obj, expected_error,
                              partial_attribute_set=None):
        """
        Checks that replication on a single object either succeeds or fails as
        expected (based on the user's access rights)
        """
        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
                             repl_obj=repl_obj,
                             expected_error=expected_error,
                             partial_attribute_set=partial_attribute_set)

    def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None):
        """
        Checks that REPL_SECRET on an object either succeeds or fails as
        expected (based on the user's access rights)
        """
        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
                             repl_obj=repl_obj,
                             expected_error=expected_error,
                             dest_dsa=dest_dsa)

    def _test_repl_full(self, expected_error, partial_attribute_set=None):
        """
        Checks that a full replication either succeeds or fails as expected
        (based on the user's access rights)
        """
        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
                             repl_obj=self.ldb_dc1.get_default_basedn(),
                             expected_error=expected_error,
                             partial_attribute_set=partial_attribute_set)

    def _test_repl_full_on_ou(self, repl_obj, expected_error):
        """
        Full replication on a specific OU should always fail (it should be done
        against a base NC). The error may vary based on the user's access rights
        """
        # Just try against the OU created in the test setup
        self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE,
                             repl_obj=repl_obj,
                             expected_error=expected_error)

    def test_repl_getchanges_userpriv(self):
        """
        Tests various replication requests made by a user with only GET_CHANGES
        rights. Some requests will be accepted, but most will be rejected.
        """

        # Assign the user GET_CHANGES rights
        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes)

        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        bad_ou = "OU=bad_obj,%s" % self.ou
        self._test_repl_single_obj(repl_obj=bad_ou,
                                   expected_error=[werror.WERR_DS_DRA_BAD_DN,
                                                   werror.WERR_DS_DRA_ACCESS_DENIED])

        self._test_repl_secret(repl_obj=self.ou,
                               expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_secret(repl_obj=self.user_dn,
                               expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_secret(repl_obj=self.user_dn,
                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
                               expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_secret(repl_obj=bad_ou,
                               expected_error=[werror.WERR_DS_DRA_BAD_DN])

        self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_full_on_ou(repl_obj=self.ou,
                                   expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC,
                                                   werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_full_on_ou(repl_obj=bad_ou,
                                   expected_error=[werror.WERR_DS_DRA_BAD_NC,
                                                   werror.WERR_DS_DRA_ACCESS_DENIED])

        # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we
        # expect the following to succeed
        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=None,
                                   partial_attribute_set=self.get_partial_attribute_set())
        self._test_repl_full(expected_error=None,
                             partial_attribute_set=self.get_partial_attribute_set())

    def test_repl_getallchanges_userpriv(self):
        """
        Tests various replication requests made by a user with only
        GET_ALL_CHANGES rights. Note that assigning these rights is possible,
        but doesn't make a lot of sense. We test it anyway for consistency.
        """

        # Assign the user GET_ALL_CHANGES rights
        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes)

        # We can expect to get the same responses as an unprivileged user,
        # i.e. we have permission to see the results, but don't have permission
        # to ask
        self.test_repl_no_userpriv()

    def test_repl_both_userpriv(self):
        """
        Tests various replication requests made by a privileged user (i.e. has
        both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests
        to be accepted.
        """

        # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights
        both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes
        self.sd_utils.dacl_add_ace(self.base_dn, both_rights)

        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=None)
        bad_ou = "OU=bad_obj,%s" % self.ou
        self._test_repl_single_obj(repl_obj=bad_ou,
                                   expected_error=[werror.WERR_DS_DRA_BAD_DN])

        # Microsoft returns DB_ERROR, Samba returns ACCESS_DENIED
        self._test_repl_secret(repl_obj=self.ou,
                               expected_error=[werror.WERR_DS_DRA_DB_ERROR,
                                               werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_secret(repl_obj=self.user_dn,
                               expected_error=[werror.WERR_DS_DRA_DB_ERROR,
                                               werror.WERR_DS_DRA_ACCESS_DENIED])
        # Note that Windows accepts this but Samba rejects it
        self._test_repl_secret(repl_obj=self.user_dn,
                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
                               expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])

        self._test_repl_secret(repl_obj=bad_ou,
                               expected_error=[werror.WERR_DS_DRA_BAD_DN])

        self._test_repl_full(expected_error=None)
        self._test_repl_full_on_ou(repl_obj=self.ou,
                                   expected_error=[werror.WERR_DS_CANT_FIND_EXPECTED_NC])
        self._test_repl_full_on_ou(repl_obj=bad_ou,
                                   expected_error=[werror.WERR_DS_DRA_BAD_NC,
                                                   werror.WERR_DS_DRA_BAD_DN])

        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=None,
                                   partial_attribute_set=self.get_partial_attribute_set())
        self._test_repl_full(expected_error=None,
                             partial_attribute_set=self.get_partial_attribute_set())

    def test_repl_no_userpriv(self):
        """
        Tests various replication requests made by a unprivileged user.
        We expect all these requests to be rejected.
        """

        # Microsoft usually returns BAD_DN, Samba returns ACCESS_DENIED
        usual_error = [werror.WERR_DS_DRA_BAD_DN, werror.WERR_DS_DRA_ACCESS_DENIED]

        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=usual_error)
        bad_ou = "OU=bad_obj,%s" % self.ou
        self._test_repl_single_obj(repl_obj=bad_ou,
                                   expected_error=usual_error)

        self._test_repl_secret(repl_obj=self.ou,
                               expected_error=usual_error)
        self._test_repl_secret(repl_obj=self.user_dn,
                               expected_error=usual_error)
        self._test_repl_secret(repl_obj=self.user_dn,
                               dest_dsa=self.ldb_dc1.get_ntds_GUID(),
                               expected_error=usual_error)
        self._test_repl_secret(repl_obj=bad_ou,
                               expected_error=usual_error)

        self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED])
        self._test_repl_full_on_ou(repl_obj=self.ou,
                                   expected_error=usual_error)
        self._test_repl_full_on_ou(repl_obj=bad_ou,
                                   expected_error=[werror.WERR_DS_DRA_BAD_NC,
                                                   werror.WERR_DS_DRA_ACCESS_DENIED])

        self._test_repl_single_obj(repl_obj=self.ou,
                                   expected_error=usual_error,
                                   partial_attribute_set=self.get_partial_attribute_set())
        self._test_repl_full(expected_error=[werror.WERR_DS_DRA_ACCESS_DENIED],
                             partial_attribute_set=self.get_partial_attribute_set())