summaryrefslogtreecommitdiffstats
path: root/source4/dsdb/tests/python/acl_modify.py
blob: c85748a764fabdfcf63416f67a74d6dff25c0499 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import optparse
import sys
sys.path.insert(0, "bin/python")
import samba

from samba.tests.subunitrun import SubunitOptions, TestProgram

import samba.getopt as options

from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba.dcerpc import security

from samba.auth import system_session
from samba import gensec, sd_utils
from samba.samdb import SamDB
from samba.credentials import Credentials, DONT_USE_KERBEROS
import samba.tests
import samba.dsdb


parser = optparse.OptionParser("acl.py [options] <host>")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
parser.add_option_group(options.VersionOptions(parser))

# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
subunitopts = SubunitOptions(parser)
parser.add_option_group(subunitopts)

opts, args = parser.parse_args()

if len(args) < 1:
    parser.print_usage()
    sys.exit(1)

host = args[0]
if "://" not in host:
    ldaphost = "ldap://%s" % host
else:
    ldaphost = host
    start = host.rindex("://")
    host = host.lstrip(start + 3)

lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)

#
# Tests start here
#


class AclTests(samba.tests.TestCase):

    def setUp(self):
        super(AclTests, self).setUp()

        strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True)
        if strict_checking is None:
            strict_checking = '1'
        self.strict_checking = bool(int(strict_checking))

        self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
        self.base_dn = self.ldb_admin.domain_dn()
        self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
        self.user_pass = "samba123@"
        self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
        self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
        self.addCleanup(self.delete_admin_connection)
        # used for anonymous login
        self.creds_tmp = Credentials()
        self.creds_tmp.set_username("")
        self.creds_tmp.set_password("")
        self.creds_tmp.set_domain(creds.get_domain())
        self.creds_tmp.set_realm(creds.get_realm())
        self.creds_tmp.set_workstation(creds.get_workstation())
        print("baseDN: %s" % self.base_dn)

        # set AttributeAuthorizationOnLDAPAdd and BlockOwnerImplicitRights
        self.set_heuristic(samba.dsdb.DS_HR_ATTR_AUTHZ_ON_LDAP_ADD, b'11')

    def set_heuristic(self, index, values):
        self.assertGreater(index, 0)
        self.assertLess(index, 30)
        self.assertIsInstance(values, bytes)

        # Get the old "dSHeuristics" if it was set
        dsheuristics = self.ldb_admin.get_dsheuristics()
        # Reset the "dSHeuristics" as they were before
        self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
        # Set the "dSHeuristics" to activate the correct behaviour
        default_heuristics = b"000000000100000000020000000003"
        if dsheuristics is None:
            dsheuristics = b""
        dsheuristics += default_heuristics[len(dsheuristics):]
        dsheuristics = (dsheuristics[:index - 1] +
                        values +
                        dsheuristics[index - 1 + len(values):])
        self.ldb_admin.set_dsheuristics(dsheuristics)

    def get_user_dn(self, name):
        return "CN=%s,CN=Users,%s" % (name, self.base_dn)

    def get_ldb_connection(self, target_username, target_password):
        creds_tmp = Credentials()
        creds_tmp.set_username(target_username)
        creds_tmp.set_password(target_password)
        creds_tmp.set_domain(creds.get_domain())
        creds_tmp.set_realm(creds.get_realm())
        creds_tmp.set_workstation(creds.get_workstation())
        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
                                      | gensec.FEATURE_SEAL)
        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
        return ldb_target

    # Test if we have any additional groups for users than default ones
    def assert_user_no_group_member(self, username):
        res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
        try:
            self.assertEqual(res[0]["memberOf"][0], "")
        except KeyError:
            pass
        else:
            self.fail()

    def delete_admin_connection(self):
        del self.sd_utils
        del self.ldb_admin


class AclModifyTests(AclTests):

    def setup_computer_with_hostname(self, account_name):
        ou_dn = f'OU={account_name},{self.base_dn}'
        dn = f'CN={account_name},{ou_dn}'

        user, password = "mouse", "mus musculus 123!"
        self.addCleanup(self.ldb_admin.deleteuser, user)

        self.ldb_admin.newuser(user, password)
        self.ldb_user = self.get_ldb_connection(user, password)

        self.addCleanup(self.ldb_admin.delete, ou_dn,
                        controls=["tree_delete:0"])
        self.ldb_admin.create_ou(ou_dn)

        self.ldb_admin.add({
            'dn': dn,
            'objectClass': 'computer',
            'sAMAccountName': account_name + '$',
        })

        host_name = f'{account_name}.{self.ldb_user.domain_dns_name()}'

        m = Message(Dn(self.ldb_admin, dn))
        m['dNSHostName'] = MessageElement(host_name,
                                          FLAG_MOD_REPLACE,
                                          'dNSHostName')

        self.ldb_admin.modify(m)
        return host_name, dn

    def test_modify_delete_dns_host_name_specified(self):
        '''Test deleting dNSHostName'''
        account_name = self.id().rsplit(".", 1)[1][:63]
        host_name, dn = self.setup_computer_with_hostname(account_name)

        m = Message(Dn(self.ldb_user, dn))
        m['dNSHostName'] = MessageElement(host_name,
                                          FLAG_MOD_DELETE,
                                          'dNSHostName')

        self.assertRaisesLdbError(
            ERR_INSUFFICIENT_ACCESS_RIGHTS,
            "User able to delete dNSHostName (with specified name)",
            self.ldb_user.modify, m)

    def test_modify_delete_dns_host_name_unspecified(self):
        '''Test deleting dNSHostName'''
        account_name = self.id().rsplit(".", 1)[1][:63]
        host_name, dn = self.setup_computer_with_hostname(account_name)

        m = Message(Dn(self.ldb_user, dn))
        m['dNSHostName'] = MessageElement([],
                                          FLAG_MOD_DELETE,
                                          'dNSHostName')

        self.assertRaisesLdbError(
            ERR_INSUFFICIENT_ACCESS_RIGHTS,
            "User able to delete dNSHostName (without specified name)",
            self.ldb_user.modify, m)

    def test_modify_delete_dns_host_name_ldif_specified(self):
        '''Test deleting dNSHostName'''
        account_name = self.id().rsplit(".", 1)[1][:63]
        host_name, dn = self.setup_computer_with_hostname(account_name)

        ldif = f"""
dn: {dn}
changetype: modify
delete: dNSHostName
dNSHostName: {host_name}
"""
        self.assertRaisesLdbError(
            ERR_INSUFFICIENT_ACCESS_RIGHTS,
            "User able to delete dNSHostName (with specified name)",
            self.ldb_user.modify_ldif, ldif)

    def test_modify_delete_dns_host_name_ldif_unspecified(self):
        '''Test deleting dNSHostName'''
        account_name = self.id().rsplit(".", 1)[1][:63]
        host_name, dn = self.setup_computer_with_hostname(account_name)

        ldif = f"""
dn: {dn}
changetype: modify
delete: dNSHostName
"""
        self.assertRaisesLdbError(
            ERR_INSUFFICIENT_ACCESS_RIGHTS,
            "User able to delete dNSHostName (without specific name)",
            self.ldb_user.modify_ldif, ldif)


ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)

TestProgram(module=__name__, opts=subunitopts)