diff options
Diffstat (limited to 'python/samba/netcmd/domain/models')
-rw-r--r-- | python/samba/netcmd/domain/models/__init__.py | 32 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/auth_policy.py | 109 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/auth_silo.py | 104 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/claim_type.py | 58 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/exceptions.py | 64 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/fields.py | 507 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/group.py | 42 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/model.py | 426 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/query.py | 81 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/schema.py | 124 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/site.py | 47 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/subnet.py | 45 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/user.py | 75 | ||||
-rw-r--r-- | python/samba/netcmd/domain/models/value_type.py | 96 |
14 files changed, 1810 insertions, 0 deletions
diff --git a/python/samba/netcmd/domain/models/__init__.py b/python/samba/netcmd/domain/models/__init__.py new file mode 100644 index 0000000..8a6b254 --- /dev/null +++ b/python/samba/netcmd/domain/models/__init__.py @@ -0,0 +1,32 @@ +# Unix SMB/CIFS implementation. +# +# Samba domain models. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .auth_policy import AuthenticationPolicy +from .auth_silo import AuthenticationSilo +from .claim_type import ClaimType +from .group import Group +from .model import MODELS +from .schema import AttributeSchema, ClassSchema +from .site import Site +from .subnet import Subnet +from .user import User +from .value_type import ValueType diff --git a/python/samba/netcmd/domain/models/auth_policy.py b/python/samba/netcmd/domain/models/auth_policy.py new file mode 100644 index 0000000..c56966c --- /dev/null +++ b/python/samba/netcmd/domain/models/auth_policy.py @@ -0,0 +1,109 @@ +# Unix SMB/CIFS implementation. +# +# Authentication policy model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from enum import IntEnum +from ldb import Dn + +from .fields import (BooleanField, EnumField, IntegerField, SDDLField, + StringField) +from .model import Model + +# Ticket-Granting-Ticket lifetimes. +MIN_TGT_LIFETIME = 45 +MAX_TGT_LIFETIME = 2147483647 + + +class StrongNTLMPolicy(IntEnum): + DISABLED = 0 + OPTIONAL = 1 + REQUIRED = 2 + + @classmethod + def get_choices(cls): + return sorted([choice.capitalize() for choice in cls._member_names_]) + + @classmethod + def choices_str(cls): + return ", ".join(cls.get_choices()) + + +class AuthenticationPolicy(Model): + description = StringField("description") + enforced = BooleanField("msDS-AuthNPolicyEnforced") + strong_ntlm_policy = EnumField("msDS-StrongNTLMPolicy", StrongNTLMPolicy) + user_allow_ntlm_network_auth = BooleanField( + "msDS-UserAllowedNTLMNetworkAuthentication") + user_tgt_lifetime = IntegerField("msDS-UserTGTLifetime") + service_allow_ntlm_network_auth = BooleanField( + "msDS-ServiceAllowedNTLMNetworkAuthentication") + service_tgt_lifetime = IntegerField("msDS-ServiceTGTLifetime") + computer_tgt_lifetime = IntegerField("msDS-ComputerTGTLifetime") + user_allowed_to_authenticate_from = SDDLField( + "msDS-UserAllowedToAuthenticateFrom", allow_device_in_sddl=False) + user_allowed_to_authenticate_to = SDDLField( + "msDS-UserAllowedToAuthenticateTo") + service_allowed_to_authenticate_from = SDDLField( + "msDS-ServiceAllowedToAuthenticateFrom", allow_device_in_sddl=False) + service_allowed_to_authenticate_to = SDDLField( + "msDS-ServiceAllowedToAuthenticateTo") + computer_allowed_to_authenticate_to = SDDLField( + "msDS-ComputerAllowedToAuthenticateTo") + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the AuthenticationPolicy model. + + :param ldb: Ldb connection + :return: Dn object of container + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child( + "CN=AuthN Policies,CN=AuthN Policy Configuration,CN=Services") + return base_dn + + @staticmethod + def get_object_class(): + return "msDS-AuthNPolicy" + + @staticmethod + def lookup(ldb, name): + """Helper function to return auth policy or raise LookupError. + + :param ldb: Ldb connection + :param name: Either DN or name of Authentication Policy + :raises: LookupError if not found + :raises: ValueError if name is not set + """ + if not name: + raise ValueError("Attribute 'name' is required.") + + try: + # It's possible name is already a Dn. + dn = name if isinstance(name, Dn) else Dn(ldb, name) + policy = AuthenticationPolicy.get(ldb, dn=dn) + except ValueError: + policy = AuthenticationPolicy.get(ldb, cn=name) + + if policy is None: + raise LookupError(f"Authentication policy {name} not found.") + + return policy diff --git a/python/samba/netcmd/domain/models/auth_silo.py b/python/samba/netcmd/domain/models/auth_silo.py new file mode 100644 index 0000000..9747671 --- /dev/null +++ b/python/samba/netcmd/domain/models/auth_silo.py @@ -0,0 +1,104 @@ +# Unix SMB/CIFS implementation. +# +# Authentication silo model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, LdbError, Message, MessageElement + +from samba.sd_utils import escaped_claim_id + +from .exceptions import GrantMemberError, RevokeMemberError +from .fields import DnField, BooleanField, StringField +from .model import Model + + +class AuthenticationSilo(Model): + description = StringField("description") + enforced = BooleanField("msDS-AuthNPolicySiloEnforced") + user_authentication_policy = DnField("msDS-UserAuthNPolicy") + service_authentication_policy = DnField("msDS-ServiceAuthNPolicy") + computer_authentication_policy = DnField("msDS-ComputerAuthNPolicy") + members = DnField("msDS-AuthNPolicySiloMembers", many=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the AuthenticationSilo model. + + :param ldb: Ldb connection + :return: Dn object of container + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child( + "CN=AuthN Silos,CN=AuthN Policy Configuration,CN=Services") + return base_dn + + @staticmethod + def get_object_class(): + return "msDS-AuthNPolicySilo" + + def grant(self, ldb, member): + """Grant a member access to the Authentication Silo. + + Rather than saving the silo object and writing the entire member + list out again, just add one member only. + + :param ldb: Ldb connection + :param member: Member to grant access to silo + """ + # Create a message with only an add member operation. + message = Message(dn=self.dn) + message.add(MessageElement(str(member.dn), FLAG_MOD_ADD, + "msDS-AuthNPolicySiloMembers")) + + # Update authentication silo. + try: + ldb.modify(message) + except LdbError as e: + raise GrantMemberError(f"Failed to grant access to silo member: {e}") + + # If the modify operation was successful refresh members field. + self.refresh(ldb, fields=["members"]) + + def revoke(self, ldb, member): + """Revoke a member from the Authentication Silo. + + Rather than saving the silo object and writing the entire member + list out again, just remove one member only. + + :param ldb: Ldb connection + :param member: Member to revoke from silo + """ + # Create a message with only a remove member operation. + message = Message(dn=self.dn) + message.add(MessageElement(str(member.dn), FLAG_MOD_DELETE, + "msDS-AuthNPolicySiloMembers")) + + # Update authentication silo. + try: + ldb.modify(message) + except LdbError as e: + raise RevokeMemberError(f"Failed to revoke silo member: {e}") + + # If the modify operation was successful refresh members field. + self.refresh(ldb, fields=["members"]) + + def get_authentication_sddl(self): + return ('O:SYG:SYD:(XA;OICI;CR;;;WD;(@USER.ad://ext/' + f'AuthenticationSilo == "{escaped_claim_id(self.name)}"))') diff --git a/python/samba/netcmd/domain/models/claim_type.py b/python/samba/netcmd/domain/models/claim_type.py new file mode 100644 index 0000000..7e1c816 --- /dev/null +++ b/python/samba/netcmd/domain/models/claim_type.py @@ -0,0 +1,58 @@ +# Unix SMB/CIFS implementation. +# +# Claim type model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BooleanField, DnField, IntegerField,\ + PossibleClaimValuesField, StringField +from .model import Model + + +class ClaimType(Model): + enabled = BooleanField("Enabled") + description = StringField("description") + display_name = StringField("displayName") + claim_attribute_source = DnField("msDS-ClaimAttributeSource") + claim_is_single_valued = BooleanField("msDS-ClaimIsSingleValued") + claim_is_value_space_restricted = BooleanField( + "msDS-ClaimIsValueSpaceRestricted") + claim_possible_values = PossibleClaimValuesField("msDS-ClaimPossibleValues") + claim_source_type = StringField("msDS-ClaimSourceType") + claim_type_applies_to_class = DnField( + "msDS-ClaimTypeAppliesToClass", many=True) + claim_value_type = IntegerField("msDS-ClaimValueType") + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the ClaimType model. + + :param ldb: Ldb connection + :return: Dn object of container + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child("CN=Claim Types,CN=Claims Configuration,CN=Services") + return base_dn + + @staticmethod + def get_object_class(): + return "msDS-ClaimType" + + def __str__(self): + return str(self.display_name) diff --git a/python/samba/netcmd/domain/models/exceptions.py b/python/samba/netcmd/domain/models/exceptions.py new file mode 100644 index 0000000..14ebd77 --- /dev/null +++ b/python/samba/netcmd/domain/models/exceptions.py @@ -0,0 +1,64 @@ +# Unix SMB/CIFS implementation. +# +# Model and ORM exceptions. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +class ModelError(Exception): + pass + + +class FieldError(ModelError): + """A ModelError on a specific field.""" + + def __init__(self, *args, field=None): + self.field = field + super().__init__(*args) + + def __str__(self): + message = super().__str__() + return f"{self.field.name}: {message}" + + +class MultipleObjectsReturned(ModelError): + pass + + +class DoesNotExist(ModelError): + pass + + +class GrantMemberError(ModelError): + pass + + +class RevokeMemberError(ModelError): + pass + + +class ProtectError(ModelError): + pass + + +class UnprotectError(ModelError): + pass + + +class DeleteError(ModelError): + pass diff --git a/python/samba/netcmd/domain/models/fields.py b/python/samba/netcmd/domain/models/fields.py new file mode 100644 index 0000000..0b7e1eb --- /dev/null +++ b/python/samba/netcmd/domain/models/fields.py @@ -0,0 +1,507 @@ +# Unix SMB/CIFS implementation. +# +# Model fields. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from enum import IntEnum + +import io +from abc import ABCMeta, abstractmethod +from datetime import datetime +from xml.etree import ElementTree + +from ldb import Dn, MessageElement, string_to_time, timestring +from samba.dcerpc import security +from samba.dcerpc.misc import GUID +from samba.ndr import ndr_pack, ndr_unpack + + +class Field(metaclass=ABCMeta): + """Base class for all fields. + + Each field will need to implement from_db_value and to_db_value. + + A field must correctly support converting both single valued fields, + and list type fields. + + The only thing many=True does is say the field "prefers" to be a list, + but really any field can be a list or single value. + """ + + def __init__(self, name, many=False, default=None, hidden=False, + readonly=False): + """Creates a new field, should be subclassed. + + :param name: Ldb field name. + :param many: If true always convert field to a list when loaded. + :param default: Default value or callback method (obj is first argument) + :param hidden: If this is True, exclude the field when calling as_dict() + :param readonly: If true don't write this value when calling save. + """ + self.name = name + self.many = many + self.hidden = hidden + self.readonly = readonly + + # This ensures that fields with many=True are always lists. + # If this is inconsistent anywhere, it isn't so great to use. + if self.many and default is None: + self.default = [] + else: + self.default = default + + @abstractmethod + def from_db_value(self, ldb, value): + """Converts value read from the database to Python value. + + :param ldb: Ldb connection + :param value: MessageElement value from the database + :returns: Parsed value as Python type + """ + pass + + @abstractmethod + def to_db_value(self, ldb, value, flags): + """Converts value to database value. + + This should return a MessageElement or None, where None means + the field will be unset on the next save. + + :param ldb: Ldb connection + :param value: Input value from Python field + :param flags: MessageElement flags + :returns: MessageElement or None + """ + pass + + +class IntegerField(Field): + """A simple integer field, can be an int or list of int.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement to int or list of int.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [int(item) for item in value] + else: + return int(value[0]) + + def to_db_value(self, ldb, value, flags): + """Convert int or list of int to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(item) for item in value], flags, self.name) + else: + return MessageElement(str(value), flags, self.name) + + +class BinaryField(Field): + """Similar to StringField but using bytes instead of str. + + This tends to be quite easy because a MessageElement already uses bytes. + """ + + def from_db_value(self, ldb, value): + """Convert MessageElement to bytes or list of bytes. + + The values on the MessageElement should already be bytes so the + cast to bytes() is likely not needed in from_db_value. + """ + if value is None: + return + elif len(value) > 1 or self.many: + return [bytes(item) for item in value] + else: + return bytes(value[0]) + + def to_db_value(self, ldb, value, flags): + """Convert bytes or list of bytes to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [bytes(item) for item in value], flags, self.name) + else: + return MessageElement(bytes(value), flags, self.name) + + +class StringField(Field): + """A simple string field, may contain str or list of str.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement to str or list of str.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [str(item) for item in value] + else: + return str(value) + + def to_db_value(self, ldb, value, flags): + """Convert str or list of str to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(item) for item in value], flags, self.name) + else: + return MessageElement(str(value), flags, self.name) + + +class EnumField(Field): + """A field based around Python's Enum type.""" + + def __init__(self, name, enum, many=False, default=None): + """Create a new EnumField for the given enum class.""" + self.enum = enum + super().__init__(name, many, default) + + def enum_from_value(self, value): + """Return Enum instance from value. + + Has a special case for IntEnum as the constructor only accepts int. + """ + if issubclass(self.enum, IntEnum): + return self.enum(int(str(value))) + else: + return self.enum(str(value)) + + def from_db_value(self, ldb, value): + """Convert MessageElement to enum or list of enum.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [self.enum_from_value(item) for item in value] + else: + return self.enum_from_value(value) + + def to_db_value(self, ldb, value, flags): + """Convert enum or list of enum to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(item.value) for item in value], flags, self.name) + else: + return MessageElement(str(value.value), flags, self.name) + + +class DateTimeField(Field): + """A field for parsing ldb timestamps into Python datetime.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement to datetime or list of datetime.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [datetime.fromtimestamp(string_to_time(str(item))) + for item in value] + else: + return datetime.fromtimestamp(string_to_time(str(value))) + + def to_db_value(self, ldb, value, flags): + """Convert datetime or list of datetime to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [timestring(int(datetime.timestamp(item))) for item in value], + flags, self.name) + else: + return MessageElement(timestring(int(datetime.timestamp(value))), + flags, self.name) + + +class RelatedField(Field): + """A field that automatically fetches the related objects. + + Use sparingly, can be a little slow. If in doubt just use DnField instead. + """ + + def __init__(self, name, model, many=False, default=None): + """Create a new RelatedField for the given model.""" + self.model = model + super().__init__(name, many, default) + + def from_db_value(self, ldb, value): + """Convert Message element to related object or list of objects. + + Note that fetching related items is not using any sort of lazy + loading so use this field sparingly. + """ + if value is None: + return + elif len(value) > 1 or self.many: + return [self.model.get(ldb, dn=Dn(ldb, str(item))) for item in value] + else: + return self.model.get(ldb, dn=Dn(ldb, str(value))) + + def to_db_value(self, ldb, value, flags): + """Convert related object or list of objects to MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(item.dn) for item in value], flags, self.name) + else: + return MessageElement(str(value.dn), flags, self.name) + + +class DnField(Field): + """A Dn field parses the current field into a Dn object.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement to a Dn object or list of Dn objects.""" + if value is None: + return + elif isinstance(value, Dn): + return value + elif len(value) > 1 or self.many: + return [Dn(ldb, str(item)) for item in value] + else: + return Dn(ldb, str(value)) + + def to_db_value(self, ldb, value, flags): + """Convert Dn object or list of Dn objects into a MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(item) for item in value], flags, self.name) + else: + return MessageElement(str(value), flags, self.name) + + +class GUIDField(Field): + """A GUID field decodes fields containing binary GUIDs.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement with a GUID into a str or list of str.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [str(ndr_unpack(GUID, item)) for item in value] + else: + return str(ndr_unpack(GUID, value[0])) + + def to_db_value(self, ldb, value, flags): + """Convert str with GUID into MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [ndr_pack(GUID(item)) for item in value], flags, self.name) + else: + return MessageElement(ndr_pack(GUID(value)), flags, self.name) + + +class SIDField(Field): + """A SID field encodes and decodes SID data.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement with a GUID into a str or list of str.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [str(ndr_unpack(security.dom_sid, item)) for item in value] + else: + return str(ndr_unpack(security.dom_sid, value[0])) + + def to_db_value(self, ldb, value, flags): + """Convert str with GUID into MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [ndr_pack(security.dom_sid(item)) for item in value], + flags, self.name) + else: + return MessageElement(ndr_pack(security.dom_sid(value)), + flags, self.name) + + +class SDDLField(Field): + """A SDDL field encodes and decodes SDDL data.""" + + def __init__(self, + name, + *, + many=False, + default=None, + hidden=False, + allow_device_in_sddl=True): + """Create a new SDDLField.""" + self.allow_device_in_sddl = allow_device_in_sddl + super().__init__(name, many=many, default=default, hidden=hidden) + + def from_db_value(self, ldb, value): + if value is None: + return + elif len(value) > 1 or self.many: + return [ndr_unpack(security.descriptor, item).as_sddl() + for item in value] + else: + return ndr_unpack(security.descriptor, value[0]).as_sddl() + + def to_db_value(self, ldb, value, flags): + domain_sid = security.dom_sid(ldb.get_domain_sid()) + if value is None: + return + elif isinstance(value, list): + return MessageElement([ndr_pack(security.descriptor.from_sddl( + item, + domain_sid, + allow_device_in_sddl=self.allow_device_in_sddl)) + for item in value], + flags, + self.name) + else: + return MessageElement( + ndr_pack(security.descriptor.from_sddl( + value, + domain_sid, + allow_device_in_sddl=self.allow_device_in_sddl)), + flags, + self.name + ) + + +class BooleanField(Field): + """A simple boolean field, can be a bool or list of bool.""" + + def from_db_value(self, ldb, value): + """Convert MessageElement into a bool or list of bool.""" + if value is None: + return + elif len(value) > 1 or self.many: + return [str(item) == "TRUE" for item in value] + else: + return str(value) == "TRUE" + + def to_db_value(self, ldb, value, flags): + """Convert bool or list of bool into a MessageElement.""" + if value is None: + return + elif isinstance(value, list): + return MessageElement( + [str(bool(item)).upper() for item in value], flags, self.name) + else: + return MessageElement(str(bool(value)).upper(), flags, self.name) + + +class PossibleClaimValuesField(Field): + """Field for parsing possible values XML for claim types. + + This field will be represented by a list of dicts as follows: + + [ + {"ValueGUID": <GUID>}, + {"ValueDisplayName: "Display name"}, + {"ValueDescription: "Optional description or None for no description"}, + {"Value": <Value>}, + ] + + Note that the GUID needs to be created client-side when adding entries, + leaving it as None then saving it doesn't generate the GUID. + + The field itself just converts the XML to list and vice versa, it doesn't + automatically generate GUIDs for entries, this is entirely up to the caller. + """ + + # Namespaces for PossibleValues xml parsing. + NAMESPACE = { + "xsd": "http://www.w3.org/2001/XMLSchema", + "xsi": "http://www.w3.org/2001/XMLSchema-instance", + "": "http://schemas.microsoft.com/2010/08/ActiveDirectory/PossibleValues" + } + + def from_db_value(self, ldb, value): + """Parse MessageElement with XML to list of dicts.""" + if value is not None: + root = ElementTree.fromstring(str(value)) + string_list = root.find("StringList", self.NAMESPACE) + + values = [] + for item in string_list.findall("Item", self.NAMESPACE): + values.append({ + "ValueGUID": item.find("ValueGUID", self.NAMESPACE).text, + "ValueDisplayName": item.find("ValueDisplayName", + self.NAMESPACE).text, + "ValueDescription": item.find("ValueDescription", + self.NAMESPACE).text, + "Value": item.find("Value", self.NAMESPACE).text, + }) + + return values + + def to_db_value(self, ldb, value, flags): + """Convert list of dicts back to XML as a MessageElement.""" + if value is None: + return + + # Possible values should always be a list of dict, but for consistency + # with other fields just wrap a single value into a list and continue. + if isinstance(value, list): + possible_values = value + else: + possible_values = [value] + + # No point storing XML of an empty list. + # Return None, the field will be unset on the next save. + if len(possible_values) == 0: + return + + # root node + root = ElementTree.Element("PossibleClaimValues") + for name, url in self.NAMESPACE.items(): + if name == "": + root.set("xmlns", url) + else: + root.set(f"xmlns:{name}", url) + + # StringList node + string_list = ElementTree.SubElement(root, "StringList") + + # List of values + for item_dict in possible_values: + item = ElementTree.SubElement(string_list, "Item") + item_guid = ElementTree.SubElement(item, "ValueGUID") + item_guid.text = item_dict["ValueGUID"] + item_name = ElementTree.SubElement(item, "ValueDisplayName") + item_name.text = item_dict["ValueDisplayName"] + item_desc = ElementTree.SubElement(item, "ValueDescription") + item_desc.text = item_dict["ValueDescription"] + item_value = ElementTree.SubElement(item, "Value") + item_value.text = item_dict["Value"] + + # NOTE: indent was only added in Python 3.9 so can't be used yet. + # ElementTree.indent(root, space="\t", level=0) + + out = io.BytesIO() + ElementTree.ElementTree(root).write(out, + encoding="utf-16", + xml_declaration=True, + short_empty_elements=False) + + # Back to str as that is what MessageElement needs. + return MessageElement(out.getvalue().decode("utf-16"), flags, self.name) diff --git a/python/samba/netcmd/domain/models/group.py b/python/samba/netcmd/domain/models/group.py new file mode 100644 index 0000000..9473127 --- /dev/null +++ b/python/samba/netcmd/domain/models/group.py @@ -0,0 +1,42 @@ +# Unix SMB/CIFS implementation. +# +# Group model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BooleanField, DnField, IntegerField, SIDField, StringField +from .model import Model + + +class Group(Model): + admin_count = IntegerField("adminCount") + description = StringField("description") + is_critical_system_object = BooleanField("isCriticalSystemObject", + default=False, readonly=True) + member = DnField("member", many=True) + object_sid = SIDField("objectSid") + system_flags = IntegerField("systemFlags") + + @staticmethod + def get_object_class(): + return "group" + + def get_authentication_sddl(self): + return "O:SYG:SYD:(XA;OICI;CR;;;WD;(Member_of_any {SID(%s)}))" % ( + self.object_sid) diff --git a/python/samba/netcmd/domain/models/model.py b/python/samba/netcmd/domain/models/model.py new file mode 100644 index 0000000..602c6ca --- /dev/null +++ b/python/samba/netcmd/domain/models/model.py @@ -0,0 +1,426 @@ +# Unix SMB/CIFS implementation. +# +# Model and basic ORM for the Ldb database. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import inspect +from abc import ABCMeta, abstractmethod + +from ldb import ERR_NO_SUCH_OBJECT, FLAG_MOD_ADD, FLAG_MOD_REPLACE, LdbError,\ + Message, MessageElement, SCOPE_BASE, SCOPE_SUBTREE, binary_encode +from samba.sd_utils import SDUtils + +from .exceptions import DeleteError, DoesNotExist, FieldError,\ + ProtectError, UnprotectError +from .fields import DateTimeField, DnField, Field, GUIDField, IntegerField,\ + StringField +from .query import Query + +# Keeps track of registered models. +# This gets populated by the ModelMeta class. +MODELS = {} + + +class ModelMeta(ABCMeta): + + def __new__(mcls, name, bases, namespace, **kwargs): + cls = super().__new__(mcls, name, bases, namespace, **kwargs) + + if cls.__name__ != "Model": + cls.fields = dict(inspect.getmembers(cls, lambda f: isinstance(f, Field))) + cls.meta = mcls + MODELS[name] = cls + + return cls + + +class Model(metaclass=ModelMeta): + cn = StringField("cn") + distinguished_name = DnField("distinguishedName") + dn = DnField("dn") + ds_core_propagation_data = DateTimeField("dsCorePropagationData", + hidden=True) + instance_type = IntegerField("instanceType") + name = StringField("name") + object_category = DnField("objectCategory") + object_class = StringField("objectClass", + default=lambda obj: obj.get_object_class()) + object_guid = GUIDField("objectGUID") + usn_changed = IntegerField("uSNChanged", hidden=True) + usn_created = IntegerField("uSNCreated", hidden=True) + when_changed = DateTimeField("whenChanged", hidden=True) + when_created = DateTimeField("whenCreated", hidden=True) + + def __init__(self, **kwargs): + """Create a new model instance and optionally populate fields. + + Does not save the object to the database, call .save() for that. + + :param kwargs: Optional input fields to populate object with + """ + # Used by the _apply method, holds the original ldb Message, + # which is used by save() to determine what fields changed. + self._message = None + + for field_name, field in self.fields.items(): + if field_name in kwargs: + default = kwargs[field_name] + elif callable(field.default): + default = field.default(self) + else: + default = field.default + + setattr(self, field_name, default) + + def __repr__(self): + """Return object representation for this model.""" + return f"<{self.__class__.__name__}: {self}>" + + def __str__(self): + """Stringify model instance to implement in each model.""" + return str(self.cn) + + def __eq__(self, other): + """Basic object equality check only really checks if the dn matches. + + :param other: The other object to compare with + """ + if other is None: + return False + else: + return self.dn == other.dn + + def __json__(self): + """Automatically called by custom JSONEncoder class. + + When turning an object into json any fields of type RelatedField + will also end up calling this method. + """ + if self.dn is not None: + return str(self.dn) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the container of this model. + + :param ldb: Ldb connection + :return: Dn to use for new objects + """ + return ldb.get_default_basedn() + + @classmethod + def get_search_dn(cls, ldb): + """Return the DN used for querying. + + By default, this just calls get_base_dn, but it is possible to + return a different Dn for querying. + + :param ldb: Ldb connection + :return: Dn to use for searching + """ + return cls.get_base_dn(ldb) + + @staticmethod + @abstractmethod + def get_object_class(): + """Returns the objectClass for this model.""" + pass + + @classmethod + def from_message(cls, ldb, message): + """Create a new model instance from the Ldb Message object. + + :param ldb: Ldb connection + :param message: Ldb Message object to create instance from + """ + obj = cls() + obj._apply(ldb, message) + return obj + + def _apply(self, ldb, message): + """Internal method to apply Ldb Message to current object. + + :param ldb: Ldb connection + :param message: Ldb Message object to apply + """ + # Store the ldb Message so that in save we can see what changed. + self._message = message + + for attr, field in self.fields.items(): + if field.name in message: + setattr(self, attr, field.from_db_value(ldb, message[field.name])) + + def refresh(self, ldb, fields=None): + """Refresh object from database. + + :param ldb: Ldb connection + :param fields: Optional list of field names to refresh + """ + attrs = [self.fields[f].name for f in fields] if fields else None + + # This shouldn't normally happen but in case the object refresh fails. + try: + res = ldb.search(self.dn, scope=SCOPE_BASE, attrs=attrs) + except LdbError as e: + if e.args[0] == ERR_NO_SUCH_OBJECT: + raise DoesNotExist(f"Refresh failed, object gone: {self.dn}") + raise + + self._apply(ldb, res[0]) + + def as_dict(self, include_hidden=False): + """Returns a dict representation of the model. + + :param include_hidden: Include fields with hidden=True when set + :returns: dict representation of model using Ldb field names as keys + """ + obj_dict = {} + + for attr, field in self.fields.items(): + if not field.hidden or include_hidden: + value = getattr(self, attr) + if value is not None: + obj_dict[field.name] = value + + return obj_dict + + @classmethod + def build_expression(cls, **kwargs): + """Build LDAP search expression from kwargs. + + :kwargs: fields to use for expression using model field names + """ + # Take a copy, never modify the original if it can be avoided. + # Then always add the object_class to the search criteria. + criteria = dict(kwargs) + criteria["object_class"] = cls.get_object_class() + + # Build search expression. + num_fields = len(criteria) + expression = "" if num_fields == 1 else "(&" + + for field_name, value in criteria.items(): + field = cls.fields.get(field_name) + if not field: + raise ValueError(f"Unknown field '{field_name}'") + expression += f"({field.name}={binary_encode(value)})" + + if num_fields > 1: + expression += ")" + + return expression + + @classmethod + def query(cls, ldb, **kwargs): + """Returns a search query for this model. + + :param ldb: Ldb connection + :param kwargs: Search criteria as keyword args + """ + base_dn = cls.get_search_dn(ldb) + + # If the container does not exist produce a friendly error message. + try: + result = ldb.search(base_dn, + scope=SCOPE_SUBTREE, + expression=cls.build_expression(**kwargs)) + except LdbError as e: + if e.args[0] == ERR_NO_SUCH_OBJECT: + raise DoesNotExist(f"Container does not exist: {base_dn}") + raise + + return Query(cls, ldb, result) + + @classmethod + def get(cls, ldb, **kwargs): + """Get one object, must always return one item. + + Either find object by dn=, or any combination of attributes via kwargs. + If there are more than one result, MultipleObjectsReturned is raised. + + :param ldb: Ldb connection + :param kwargs: Search criteria as keyword args + :returns: Model instance or None if not found + :raises: MultipleObjects returned if there are more than one results + """ + # If a DN is provided use that to get the object directly. + # Otherwise, build a search expression using kwargs provided. + dn = kwargs.get("dn") + + if dn: + # Handle LDAP error 32 LDAP_NO_SUCH_OBJECT, but raise for the rest. + # Return None if the User does not exist. + try: + res = ldb.search(dn, scope=SCOPE_BASE) + except LdbError as e: + if e.args[0] == ERR_NO_SUCH_OBJECT: + return None + else: + raise + + return cls.from_message(ldb, res[0]) + else: + return cls.query(ldb, **kwargs).get() + + @classmethod + def create(cls, ldb, **kwargs): + """Create object constructs object and calls save straight after. + + :param ldb: Ldb connection + :param kwargs: Fields to populate object from + :returns: object + """ + obj = cls(**kwargs) + obj.save(ldb) + return obj + + @classmethod + def get_or_create(cls, ldb, defaults=None, **kwargs): + """Retrieve object and if it doesn't exist create a new instance. + + :param ldb: Ldb connection + :param defaults: Attributes only used for create but not search + :param kwargs: Attributes used for searching existing object + :returns: (object, bool created) + """ + obj = cls.get(ldb, **kwargs) + if obj is None: + attrs = dict(kwargs) + if defaults is not None: + attrs.update(defaults) + return cls.create(ldb, **attrs), True + else: + return obj, False + + def save(self, ldb): + """Save model to Ldb database. + + The save operation will save all fields excluding fields that + return None when calling their `to_db_value` methods. + + The `to_db_value` method can either return a ldb Message object, + or None if the field is to be excluded. + + For updates, the existing object is fetched and only fields + that are changed are included in the update ldb Message. + + Also for updates, any fields that currently have a value, + but are to be set to None will be seen as a delete operation. + + After the save operation the object is refreshed from the server, + as often the server will populate some fields. + + :param ldb: Ldb connection + """ + if self.dn is None: + dn = self.get_base_dn(ldb) + dn.add_child(f"CN={self.cn or self.name}") + self.dn = dn + + message = Message(dn=self.dn) + for attr, field in self.fields.items(): + if attr != "dn" and not field.readonly: + value = getattr(self, attr) + try: + db_value = field.to_db_value(ldb, value, FLAG_MOD_ADD) + except ValueError as e: + raise FieldError(e, field=field) + + # Don't add empty fields. + if db_value is not None and len(db_value): + message.add(db_value) + + # Create object + ldb.add(message) + + # Fetching object refreshes any automatically populated fields. + res = ldb.search(dn, scope=SCOPE_BASE) + self._apply(ldb, res[0]) + else: + # Existing Message was stored to work out what fields changed. + existing_obj = self.from_message(ldb, self._message) + + # Only modify replace or modify fields that have changed. + # Any fields that are set to None or an empty list get unset. + message = Message(dn=self.dn) + for attr, field in self.fields.items(): + if attr != "dn" and not field.readonly: + value = getattr(self, attr) + old_value = getattr(existing_obj, attr) + + if value != old_value: + try: + db_value = field.to_db_value(ldb, value, + FLAG_MOD_REPLACE) + except ValueError as e: + raise FieldError(e, field=field) + + # When a field returns None or empty list, delete attr. + if db_value in (None, []): + db_value = MessageElement([], + FLAG_MOD_REPLACE, + field.name) + message.add(db_value) + + # Saving nothing only triggers an error. + if len(message): + ldb.modify(message) + + # Fetching object refreshes any automatically populated fields. + self.refresh(ldb) + + def delete(self, ldb): + """Delete item from Ldb database. + + If self.dn is None then the object has not yet been saved. + + :param ldb: Ldb connection + """ + if self.dn is None: + raise DeleteError("Cannot delete object that doesn't have a dn.") + + try: + ldb.delete(self.dn) + except LdbError as e: + raise DeleteError(f"Delete failed: {e}") + + def protect(self, ldb): + """Protect object from accidental deletion. + + :param ldb: Ldb connection + """ + utils = SDUtils(ldb) + + try: + utils.dacl_add_ace(self.dn, "(D;;DTSD;;;WD)") + except LdbError as e: + raise ProtectError(f"Failed to protect object: {e}") + + def unprotect(self, ldb): + """Unprotect object from accidental deletion. + + :param ldb: Ldb connection + """ + utils = SDUtils(ldb) + + try: + utils.dacl_delete_aces(self.dn, "(D;;DTSD;;;WD)") + except LdbError as e: + raise UnprotectError(f"Failed to unprotect object: {e}") diff --git a/python/samba/netcmd/domain/models/query.py b/python/samba/netcmd/domain/models/query.py new file mode 100644 index 0000000..9cdb650 --- /dev/null +++ b/python/samba/netcmd/domain/models/query.py @@ -0,0 +1,81 @@ +# Unix SMB/CIFS implementation. +# +# Query class for the ORM to the Ldb database. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import re + +from .exceptions import DoesNotExist, MultipleObjectsReturned + +RE_SPLIT_CAMELCASE = re.compile(r"[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))") + + +class Query: + """Simple Query class used by the `Model.query` method.""" + + def __init__(self, model, ldb, result): + self.model = model + self.ldb = ldb + self.result = result + self.count = result.count + self.name = " ".join(RE_SPLIT_CAMELCASE.findall(model.__name__)).lower() + + def __iter__(self): + """Loop over Query class yields Model instances.""" + for message in self.result: + yield self.model.from_message(self.ldb, message) + + def first(self): + """Returns the first item in the Query or None for no results.""" + if self.result.count: + return self.model.from_message(self.ldb, self.result[0]) + + def last(self): + """Returns the last item in the Query or None for no results.""" + if self.result.count: + return self.model.from_message(self.ldb, self.result[-1]) + + def get(self): + """Returns one item or None if no results were found. + + :returns: Model instance or None if not found. + :raises MultipleObjectsReturned: if more than one results were returned + """ + if self.count > 1: + raise MultipleObjectsReturned( + f"More than one {self.name} objects returned (got {self.count}).") + elif self.count: + return self.model.from_message(self.ldb, self.result[0]) + + def one(self): + """Must return EXACTLY one item or raise an exception. + + :returns: Model instance + :raises DoesNotExist: if no results were returned + :raises MultipleObjectsReturned: if more than one results were returned + """ + if self.count < 1: + raise DoesNotExist( + f"{self.name.capitalize()} matching query not found") + elif self.count > 1: + raise MultipleObjectsReturned( + f"More than one {self.name} objects returned (got {self.count}).") + else: + return self.model.from_message(self.ldb, self.result[0]) diff --git a/python/samba/netcmd/domain/models/schema.py b/python/samba/netcmd/domain/models/schema.py new file mode 100644 index 0000000..59ece05 --- /dev/null +++ b/python/samba/netcmd/domain/models/schema.py @@ -0,0 +1,124 @@ +# Unix SMB/CIFS implementation. +# +# Class and attribute schema models. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BinaryField, BooleanField, DnField, GUIDField,\ + IntegerField, StringField +from .model import Model + + +class ClassSchema(Model): + default_object_category = DnField("defaultObjectCategory") + governs_id = StringField("governsID") + schema_id_guid = GUIDField("schemaIDGUID") + subclass_of = StringField("subclassOf") + admin_description = StringField("adminDescription") + admin_display_name = StringField("adminDisplayName") + default_hiding_value = BooleanField("defaultHidingValue") + default_security_descriptor = BinaryField("defaultSecurityDescriptor") + ldap_display_name = StringField("lDAPDisplayName") + may_contain = StringField("mayContain", many=True) + poss_superiors = StringField("possSuperiors", many=True) + rdn_att_id = StringField("rDNAttID") + show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly") + system_only = BooleanField("systemOnly", readonly=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the ClassSchema model. + + This is the same as AttributeSchema, but the objectClass is different. + + :param ldb: Ldb connection + :return: Dn object of container + """ + return ldb.get_schema_basedn() + + @staticmethod + def get_object_class(): + return "classSchema" + + @classmethod + def lookup(cls, ldb, name): + """Helper function to lookup class or raise LookupError. + + :param ldb: Ldb connection + :param name: Class name + :raises: LookupError if not found + :raises: ValueError if name is not provided + """ + if not name: + raise ValueError("Class name is required.") + + attr = cls.get(ldb, ldap_display_name=name) + if attr is None: + raise LookupError(f"Could not locate {name} in class schema.") + + return attr + + +class AttributeSchema(Model): + attribute_id = StringField("attributeID") + attribute_syntax = StringField("attributeSyntax") + is_single_valued = BooleanField("isSingleValued") + ldap_display_name = StringField("lDAPDisplayName") + om_syntax = IntegerField("oMSyntax") + admin_description = StringField("adminDescription") + admin_display_name = StringField("adminDisplayName") + attribute_security_guid = GUIDField("attributeSecurityGUID") + schema_flags_ex = IntegerField("schemaFlagsEx") + search_flags = IntegerField("searchFlags") + show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly") + system_flags = IntegerField("systemFlags", readonly=True) + system_only = BooleanField("systemOnly", readonly=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the AttributeSchema model. + + This is the same as ClassSchema, but the objectClass is different. + + :param ldb: Ldb connection + :return: Dn object of container + """ + return ldb.get_schema_basedn() + + @staticmethod + def get_object_class(): + return "attributeSchema" + + @classmethod + def lookup(cls, ldb, name): + """Helper function to lookup attribute or raise LookupError. + + :param ldb: Ldb connection + :param name: Attribute name + :raises: LookupError if not found + :raises: ValueError if name is not provided + """ + if not name: + raise ValueError("Attribute name is required.") + + attr = cls.get(ldb, ldap_display_name=name) + if attr is None: + raise LookupError(f"Could not locate {name} in attribute schema.") + + return attr diff --git a/python/samba/netcmd/domain/models/site.py b/python/samba/netcmd/domain/models/site.py new file mode 100644 index 0000000..44643f3 --- /dev/null +++ b/python/samba/netcmd/domain/models/site.py @@ -0,0 +1,47 @@ +# Unix SMB/CIFS implementation. +# +# Site model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BooleanField, DnField, IntegerField +from .model import Model + + +class Site(Model): + show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly") + system_flags = IntegerField("systemFlags", readonly=True) + + # Backlinks + site_object_bl = DnField("siteObjectBL", readonly=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the Site model. + + :param ldb: Ldb connection + :return: Dn to use for new objects + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child("CN=Sites") + return base_dn + + @staticmethod + def get_object_class(): + return "site" diff --git a/python/samba/netcmd/domain/models/subnet.py b/python/samba/netcmd/domain/models/subnet.py new file mode 100644 index 0000000..bb249d4 --- /dev/null +++ b/python/samba/netcmd/domain/models/subnet.py @@ -0,0 +1,45 @@ +# Unix SMB/CIFS implementation. +# +# Subnet model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BooleanField, DnField, IntegerField +from .model import Model + + +class Subnet(Model): + show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly") + site_object = DnField("siteObject") + system_flags = IntegerField("systemFlags", readonly=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the Subnet model. + + :param ldb: Ldb connection + :return: Dn to use for new objects + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child("CN=Subnets,CN=Sites") + return base_dn + + @staticmethod + def get_object_class(): + return "subnet" diff --git a/python/samba/netcmd/domain/models/user.py b/python/samba/netcmd/domain/models/user.py new file mode 100644 index 0000000..7b0785a --- /dev/null +++ b/python/samba/netcmd/domain/models/user.py @@ -0,0 +1,75 @@ +# Unix SMB/CIFS implementation. +# +# User model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from ldb import Dn + +from samba.dsdb import DS_GUID_USERS_CONTAINER + +from .fields import DnField, SIDField, StringField +from .model import Model + + +class User(Model): + username = StringField("sAMAccountName") + assigned_policy = DnField("msDS-AssignedAuthNPolicy") + assigned_silo = DnField("msDS-AssignedAuthNPolicySilo") + object_sid = SIDField("objectSid") + + def __str__(self): + """Return username rather than cn for User model.""" + return self.username + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the User model. + + :param ldb: Ldb connection + :return: Dn to use for new objects + """ + return ldb.get_wellknown_dn(ldb.get_default_basedn(), + DS_GUID_USERS_CONTAINER) + + @classmethod + def get_search_dn(cls, ldb): + """Return Dn used for searching so Computers will also be found. + + :param ldb: Ldb connection + :return: Dn to use for searching + """ + return ldb.get_root_basedn() + + @staticmethod + def get_object_class(): + return "user" + + @classmethod + def find(cls, ldb, name): + """Helper function to find a user first by Dn then username. + + If the Dn can't be parsed, use sAMAccountName instead. + """ + try: + query = {"dn": Dn(ldb, name)} + except ValueError: + query = {"username": name} + + return cls.get(ldb, **query) diff --git a/python/samba/netcmd/domain/models/value_type.py b/python/samba/netcmd/domain/models/value_type.py new file mode 100644 index 0000000..00a4e07 --- /dev/null +++ b/python/samba/netcmd/domain/models/value_type.py @@ -0,0 +1,96 @@ +# Unix SMB/CIFS implementation. +# +# Claim value type model. +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde <rob@catalyst.net.nz> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +from .fields import BooleanField, DnField, IntegerField, StringField +from .model import Model + +# LDAP Syntax to Value Type CN lookup table. +# These are the lookups used by known AD attributes, add new ones as required. +SYNTAX_TO_VALUE_TYPE_CN = { + "2.5.5.1": "MS-DS-Text", # Object(DS-DN) + "2.5.5.2": "MS-DS-Text", # String(Object-Identifier) + "2.5.5.8": "MS-DS-YesNo", # Boolean + "2.5.5.9": "MS-DS-Number", # Integer + "2.5.5.12": "MS-DS-Text", # String(Unicode) + "2.5.5.15": "MS-DS-Text", # String(NT-Sec-Desc) + "2.5.5.16": "MS-DS-Number", # LargeInteger +} + + +class ValueType(Model): + description = StringField("description") + display_name = StringField("displayName") + claim_is_single_valued = BooleanField("msDS-ClaimIsSingleValued") + claim_is_value_space_restricted = BooleanField( + "msDS-ClaimIsValueSpaceRestricted") + claim_value_type = IntegerField("msDS-ClaimValueType") + is_possible_values_present = BooleanField("msDS-IsPossibleValuesPresent") + show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly") + + # Backlinks + value_type_reference_bl = DnField( + "msDS-ValueTypeReferenceBL", readonly=True) + + @staticmethod + def get_base_dn(ldb): + """Return the base DN for the ValueType model. + + :param ldb: Ldb connection + :return: Dn object of container + """ + base_dn = ldb.get_config_basedn() + base_dn.add_child("CN=Value Types,CN=Claims Configuration,CN=Services") + return base_dn + + @staticmethod + def get_object_class(): + return "msDS-ValueType" + + @classmethod + def lookup(cls, ldb, attribute): + """Helper function to get ValueType by attribute or raise LookupError. + + :param ldb: Ldb connection + :param attribute: AttributeSchema object + :raises: LookupError if not found + :raises: ValueError for unknown attribute syntax + """ + # If attribute is None. + if not attribute: + raise ValueError("Attribute is required for value type lookup.") + + # Unknown attribute syntax as it isn't in the lookup table. + syntax = attribute.attribute_syntax + cn = SYNTAX_TO_VALUE_TYPE_CN.get(syntax) + if not cn: + raise ValueError(f"Unable to process attribute syntax {syntax}") + + # This should always return something but should still be handled. + value_type = cls.get(ldb, cn=cn) + if value_type is None: + raise LookupError( + f"Could not find claim value type for {attribute}.") + + return value_type + + def __str__(self): + return str(self.display_name) |