summaryrefslogtreecommitdiffstats
path: root/python/samba/netcmd/domain/models
diff options
context:
space:
mode:
Diffstat (limited to 'python/samba/netcmd/domain/models')
-rw-r--r--python/samba/netcmd/domain/models/__init__.py32
-rw-r--r--python/samba/netcmd/domain/models/auth_policy.py109
-rw-r--r--python/samba/netcmd/domain/models/auth_silo.py104
-rw-r--r--python/samba/netcmd/domain/models/claim_type.py58
-rw-r--r--python/samba/netcmd/domain/models/exceptions.py64
-rw-r--r--python/samba/netcmd/domain/models/fields.py507
-rw-r--r--python/samba/netcmd/domain/models/group.py42
-rw-r--r--python/samba/netcmd/domain/models/model.py426
-rw-r--r--python/samba/netcmd/domain/models/query.py81
-rw-r--r--python/samba/netcmd/domain/models/schema.py124
-rw-r--r--python/samba/netcmd/domain/models/site.py47
-rw-r--r--python/samba/netcmd/domain/models/subnet.py45
-rw-r--r--python/samba/netcmd/domain/models/user.py75
-rw-r--r--python/samba/netcmd/domain/models/value_type.py96
14 files changed, 1810 insertions, 0 deletions
diff --git a/python/samba/netcmd/domain/models/__init__.py b/python/samba/netcmd/domain/models/__init__.py
new file mode 100644
index 0000000..8a6b254
--- /dev/null
+++ b/python/samba/netcmd/domain/models/__init__.py
@@ -0,0 +1,32 @@
+# Unix SMB/CIFS implementation.
+#
+# Samba domain models.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .auth_policy import AuthenticationPolicy
+from .auth_silo import AuthenticationSilo
+from .claim_type import ClaimType
+from .group import Group
+from .model import MODELS
+from .schema import AttributeSchema, ClassSchema
+from .site import Site
+from .subnet import Subnet
+from .user import User
+from .value_type import ValueType
diff --git a/python/samba/netcmd/domain/models/auth_policy.py b/python/samba/netcmd/domain/models/auth_policy.py
new file mode 100644
index 0000000..c56966c
--- /dev/null
+++ b/python/samba/netcmd/domain/models/auth_policy.py
@@ -0,0 +1,109 @@
+# Unix SMB/CIFS implementation.
+#
+# Authentication policy model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from enum import IntEnum
+from ldb import Dn
+
+from .fields import (BooleanField, EnumField, IntegerField, SDDLField,
+ StringField)
+from .model import Model
+
+# Ticket-Granting-Ticket lifetimes.
+MIN_TGT_LIFETIME = 45
+MAX_TGT_LIFETIME = 2147483647
+
+
+class StrongNTLMPolicy(IntEnum):
+ DISABLED = 0
+ OPTIONAL = 1
+ REQUIRED = 2
+
+ @classmethod
+ def get_choices(cls):
+ return sorted([choice.capitalize() for choice in cls._member_names_])
+
+ @classmethod
+ def choices_str(cls):
+ return ", ".join(cls.get_choices())
+
+
+class AuthenticationPolicy(Model):
+ description = StringField("description")
+ enforced = BooleanField("msDS-AuthNPolicyEnforced")
+ strong_ntlm_policy = EnumField("msDS-StrongNTLMPolicy", StrongNTLMPolicy)
+ user_allow_ntlm_network_auth = BooleanField(
+ "msDS-UserAllowedNTLMNetworkAuthentication")
+ user_tgt_lifetime = IntegerField("msDS-UserTGTLifetime")
+ service_allow_ntlm_network_auth = BooleanField(
+ "msDS-ServiceAllowedNTLMNetworkAuthentication")
+ service_tgt_lifetime = IntegerField("msDS-ServiceTGTLifetime")
+ computer_tgt_lifetime = IntegerField("msDS-ComputerTGTLifetime")
+ user_allowed_to_authenticate_from = SDDLField(
+ "msDS-UserAllowedToAuthenticateFrom", allow_device_in_sddl=False)
+ user_allowed_to_authenticate_to = SDDLField(
+ "msDS-UserAllowedToAuthenticateTo")
+ service_allowed_to_authenticate_from = SDDLField(
+ "msDS-ServiceAllowedToAuthenticateFrom", allow_device_in_sddl=False)
+ service_allowed_to_authenticate_to = SDDLField(
+ "msDS-ServiceAllowedToAuthenticateTo")
+ computer_allowed_to_authenticate_to = SDDLField(
+ "msDS-ComputerAllowedToAuthenticateTo")
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the AuthenticationPolicy model.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child(
+ "CN=AuthN Policies,CN=AuthN Policy Configuration,CN=Services")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "msDS-AuthNPolicy"
+
+ @staticmethod
+ def lookup(ldb, name):
+ """Helper function to return auth policy or raise LookupError.
+
+ :param ldb: Ldb connection
+ :param name: Either DN or name of Authentication Policy
+ :raises: LookupError if not found
+ :raises: ValueError if name is not set
+ """
+ if not name:
+ raise ValueError("Attribute 'name' is required.")
+
+ try:
+ # It's possible name is already a Dn.
+ dn = name if isinstance(name, Dn) else Dn(ldb, name)
+ policy = AuthenticationPolicy.get(ldb, dn=dn)
+ except ValueError:
+ policy = AuthenticationPolicy.get(ldb, cn=name)
+
+ if policy is None:
+ raise LookupError(f"Authentication policy {name} not found.")
+
+ return policy
diff --git a/python/samba/netcmd/domain/models/auth_silo.py b/python/samba/netcmd/domain/models/auth_silo.py
new file mode 100644
index 0000000..9747671
--- /dev/null
+++ b/python/samba/netcmd/domain/models/auth_silo.py
@@ -0,0 +1,104 @@
+# Unix SMB/CIFS implementation.
+#
+# Authentication silo model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, LdbError, Message, MessageElement
+
+from samba.sd_utils import escaped_claim_id
+
+from .exceptions import GrantMemberError, RevokeMemberError
+from .fields import DnField, BooleanField, StringField
+from .model import Model
+
+
+class AuthenticationSilo(Model):
+ description = StringField("description")
+ enforced = BooleanField("msDS-AuthNPolicySiloEnforced")
+ user_authentication_policy = DnField("msDS-UserAuthNPolicy")
+ service_authentication_policy = DnField("msDS-ServiceAuthNPolicy")
+ computer_authentication_policy = DnField("msDS-ComputerAuthNPolicy")
+ members = DnField("msDS-AuthNPolicySiloMembers", many=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the AuthenticationSilo model.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child(
+ "CN=AuthN Silos,CN=AuthN Policy Configuration,CN=Services")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "msDS-AuthNPolicySilo"
+
+ def grant(self, ldb, member):
+ """Grant a member access to the Authentication Silo.
+
+ Rather than saving the silo object and writing the entire member
+ list out again, just add one member only.
+
+ :param ldb: Ldb connection
+ :param member: Member to grant access to silo
+ """
+ # Create a message with only an add member operation.
+ message = Message(dn=self.dn)
+ message.add(MessageElement(str(member.dn), FLAG_MOD_ADD,
+ "msDS-AuthNPolicySiloMembers"))
+
+ # Update authentication silo.
+ try:
+ ldb.modify(message)
+ except LdbError as e:
+ raise GrantMemberError(f"Failed to grant access to silo member: {e}")
+
+ # If the modify operation was successful refresh members field.
+ self.refresh(ldb, fields=["members"])
+
+ def revoke(self, ldb, member):
+ """Revoke a member from the Authentication Silo.
+
+ Rather than saving the silo object and writing the entire member
+ list out again, just remove one member only.
+
+ :param ldb: Ldb connection
+ :param member: Member to revoke from silo
+ """
+ # Create a message with only a remove member operation.
+ message = Message(dn=self.dn)
+ message.add(MessageElement(str(member.dn), FLAG_MOD_DELETE,
+ "msDS-AuthNPolicySiloMembers"))
+
+ # Update authentication silo.
+ try:
+ ldb.modify(message)
+ except LdbError as e:
+ raise RevokeMemberError(f"Failed to revoke silo member: {e}")
+
+ # If the modify operation was successful refresh members field.
+ self.refresh(ldb, fields=["members"])
+
+ def get_authentication_sddl(self):
+ return ('O:SYG:SYD:(XA;OICI;CR;;;WD;(@USER.ad://ext/'
+ f'AuthenticationSilo == "{escaped_claim_id(self.name)}"))')
diff --git a/python/samba/netcmd/domain/models/claim_type.py b/python/samba/netcmd/domain/models/claim_type.py
new file mode 100644
index 0000000..7e1c816
--- /dev/null
+++ b/python/samba/netcmd/domain/models/claim_type.py
@@ -0,0 +1,58 @@
+# Unix SMB/CIFS implementation.
+#
+# Claim type model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BooleanField, DnField, IntegerField,\
+ PossibleClaimValuesField, StringField
+from .model import Model
+
+
+class ClaimType(Model):
+ enabled = BooleanField("Enabled")
+ description = StringField("description")
+ display_name = StringField("displayName")
+ claim_attribute_source = DnField("msDS-ClaimAttributeSource")
+ claim_is_single_valued = BooleanField("msDS-ClaimIsSingleValued")
+ claim_is_value_space_restricted = BooleanField(
+ "msDS-ClaimIsValueSpaceRestricted")
+ claim_possible_values = PossibleClaimValuesField("msDS-ClaimPossibleValues")
+ claim_source_type = StringField("msDS-ClaimSourceType")
+ claim_type_applies_to_class = DnField(
+ "msDS-ClaimTypeAppliesToClass", many=True)
+ claim_value_type = IntegerField("msDS-ClaimValueType")
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the ClaimType model.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child("CN=Claim Types,CN=Claims Configuration,CN=Services")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "msDS-ClaimType"
+
+ def __str__(self):
+ return str(self.display_name)
diff --git a/python/samba/netcmd/domain/models/exceptions.py b/python/samba/netcmd/domain/models/exceptions.py
new file mode 100644
index 0000000..14ebd77
--- /dev/null
+++ b/python/samba/netcmd/domain/models/exceptions.py
@@ -0,0 +1,64 @@
+# Unix SMB/CIFS implementation.
+#
+# Model and ORM exceptions.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+class ModelError(Exception):
+ pass
+
+
+class FieldError(ModelError):
+ """A ModelError on a specific field."""
+
+ def __init__(self, *args, field=None):
+ self.field = field
+ super().__init__(*args)
+
+ def __str__(self):
+ message = super().__str__()
+ return f"{self.field.name}: {message}"
+
+
+class MultipleObjectsReturned(ModelError):
+ pass
+
+
+class DoesNotExist(ModelError):
+ pass
+
+
+class GrantMemberError(ModelError):
+ pass
+
+
+class RevokeMemberError(ModelError):
+ pass
+
+
+class ProtectError(ModelError):
+ pass
+
+
+class UnprotectError(ModelError):
+ pass
+
+
+class DeleteError(ModelError):
+ pass
diff --git a/python/samba/netcmd/domain/models/fields.py b/python/samba/netcmd/domain/models/fields.py
new file mode 100644
index 0000000..0b7e1eb
--- /dev/null
+++ b/python/samba/netcmd/domain/models/fields.py
@@ -0,0 +1,507 @@
+# Unix SMB/CIFS implementation.
+#
+# Model fields.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from enum import IntEnum
+
+import io
+from abc import ABCMeta, abstractmethod
+from datetime import datetime
+from xml.etree import ElementTree
+
+from ldb import Dn, MessageElement, string_to_time, timestring
+from samba.dcerpc import security
+from samba.dcerpc.misc import GUID
+from samba.ndr import ndr_pack, ndr_unpack
+
+
+class Field(metaclass=ABCMeta):
+ """Base class for all fields.
+
+ Each field will need to implement from_db_value and to_db_value.
+
+ A field must correctly support converting both single valued fields,
+ and list type fields.
+
+ The only thing many=True does is say the field "prefers" to be a list,
+ but really any field can be a list or single value.
+ """
+
+ def __init__(self, name, many=False, default=None, hidden=False,
+ readonly=False):
+ """Creates a new field, should be subclassed.
+
+ :param name: Ldb field name.
+ :param many: If true always convert field to a list when loaded.
+ :param default: Default value or callback method (obj is first argument)
+ :param hidden: If this is True, exclude the field when calling as_dict()
+ :param readonly: If true don't write this value when calling save.
+ """
+ self.name = name
+ self.many = many
+ self.hidden = hidden
+ self.readonly = readonly
+
+ # This ensures that fields with many=True are always lists.
+ # If this is inconsistent anywhere, it isn't so great to use.
+ if self.many and default is None:
+ self.default = []
+ else:
+ self.default = default
+
+ @abstractmethod
+ def from_db_value(self, ldb, value):
+ """Converts value read from the database to Python value.
+
+ :param ldb: Ldb connection
+ :param value: MessageElement value from the database
+ :returns: Parsed value as Python type
+ """
+ pass
+
+ @abstractmethod
+ def to_db_value(self, ldb, value, flags):
+ """Converts value to database value.
+
+ This should return a MessageElement or None, where None means
+ the field will be unset on the next save.
+
+ :param ldb: Ldb connection
+ :param value: Input value from Python field
+ :param flags: MessageElement flags
+ :returns: MessageElement or None
+ """
+ pass
+
+
+class IntegerField(Field):
+ """A simple integer field, can be an int or list of int."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to int or list of int."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [int(item) for item in value]
+ else:
+ return int(value[0])
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert int or list of int to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(item) for item in value], flags, self.name)
+ else:
+ return MessageElement(str(value), flags, self.name)
+
+
+class BinaryField(Field):
+ """Similar to StringField but using bytes instead of str.
+
+ This tends to be quite easy because a MessageElement already uses bytes.
+ """
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to bytes or list of bytes.
+
+ The values on the MessageElement should already be bytes so the
+ cast to bytes() is likely not needed in from_db_value.
+ """
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [bytes(item) for item in value]
+ else:
+ return bytes(value[0])
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert bytes or list of bytes to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [bytes(item) for item in value], flags, self.name)
+ else:
+ return MessageElement(bytes(value), flags, self.name)
+
+
+class StringField(Field):
+ """A simple string field, may contain str or list of str."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to str or list of str."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [str(item) for item in value]
+ else:
+ return str(value)
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert str or list of str to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(item) for item in value], flags, self.name)
+ else:
+ return MessageElement(str(value), flags, self.name)
+
+
+class EnumField(Field):
+ """A field based around Python's Enum type."""
+
+ def __init__(self, name, enum, many=False, default=None):
+ """Create a new EnumField for the given enum class."""
+ self.enum = enum
+ super().__init__(name, many, default)
+
+ def enum_from_value(self, value):
+ """Return Enum instance from value.
+
+ Has a special case for IntEnum as the constructor only accepts int.
+ """
+ if issubclass(self.enum, IntEnum):
+ return self.enum(int(str(value)))
+ else:
+ return self.enum(str(value))
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to enum or list of enum."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [self.enum_from_value(item) for item in value]
+ else:
+ return self.enum_from_value(value)
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert enum or list of enum to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(item.value) for item in value], flags, self.name)
+ else:
+ return MessageElement(str(value.value), flags, self.name)
+
+
+class DateTimeField(Field):
+ """A field for parsing ldb timestamps into Python datetime."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to datetime or list of datetime."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [datetime.fromtimestamp(string_to_time(str(item)))
+ for item in value]
+ else:
+ return datetime.fromtimestamp(string_to_time(str(value)))
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert datetime or list of datetime to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [timestring(int(datetime.timestamp(item))) for item in value],
+ flags, self.name)
+ else:
+ return MessageElement(timestring(int(datetime.timestamp(value))),
+ flags, self.name)
+
+
+class RelatedField(Field):
+ """A field that automatically fetches the related objects.
+
+ Use sparingly, can be a little slow. If in doubt just use DnField instead.
+ """
+
+ def __init__(self, name, model, many=False, default=None):
+ """Create a new RelatedField for the given model."""
+ self.model = model
+ super().__init__(name, many, default)
+
+ def from_db_value(self, ldb, value):
+ """Convert Message element to related object or list of objects.
+
+ Note that fetching related items is not using any sort of lazy
+ loading so use this field sparingly.
+ """
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [self.model.get(ldb, dn=Dn(ldb, str(item))) for item in value]
+ else:
+ return self.model.get(ldb, dn=Dn(ldb, str(value)))
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert related object or list of objects to MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(item.dn) for item in value], flags, self.name)
+ else:
+ return MessageElement(str(value.dn), flags, self.name)
+
+
+class DnField(Field):
+ """A Dn field parses the current field into a Dn object."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement to a Dn object or list of Dn objects."""
+ if value is None:
+ return
+ elif isinstance(value, Dn):
+ return value
+ elif len(value) > 1 or self.many:
+ return [Dn(ldb, str(item)) for item in value]
+ else:
+ return Dn(ldb, str(value))
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert Dn object or list of Dn objects into a MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(item) for item in value], flags, self.name)
+ else:
+ return MessageElement(str(value), flags, self.name)
+
+
+class GUIDField(Field):
+ """A GUID field decodes fields containing binary GUIDs."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement with a GUID into a str or list of str."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [str(ndr_unpack(GUID, item)) for item in value]
+ else:
+ return str(ndr_unpack(GUID, value[0]))
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert str with GUID into MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [ndr_pack(GUID(item)) for item in value], flags, self.name)
+ else:
+ return MessageElement(ndr_pack(GUID(value)), flags, self.name)
+
+
+class SIDField(Field):
+ """A SID field encodes and decodes SID data."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement with a GUID into a str or list of str."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [str(ndr_unpack(security.dom_sid, item)) for item in value]
+ else:
+ return str(ndr_unpack(security.dom_sid, value[0]))
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert str with GUID into MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [ndr_pack(security.dom_sid(item)) for item in value],
+ flags, self.name)
+ else:
+ return MessageElement(ndr_pack(security.dom_sid(value)),
+ flags, self.name)
+
+
+class SDDLField(Field):
+ """A SDDL field encodes and decodes SDDL data."""
+
+ def __init__(self,
+ name,
+ *,
+ many=False,
+ default=None,
+ hidden=False,
+ allow_device_in_sddl=True):
+ """Create a new SDDLField."""
+ self.allow_device_in_sddl = allow_device_in_sddl
+ super().__init__(name, many=many, default=default, hidden=hidden)
+
+ def from_db_value(self, ldb, value):
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [ndr_unpack(security.descriptor, item).as_sddl()
+ for item in value]
+ else:
+ return ndr_unpack(security.descriptor, value[0]).as_sddl()
+
+ def to_db_value(self, ldb, value, flags):
+ domain_sid = security.dom_sid(ldb.get_domain_sid())
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement([ndr_pack(security.descriptor.from_sddl(
+ item,
+ domain_sid,
+ allow_device_in_sddl=self.allow_device_in_sddl))
+ for item in value],
+ flags,
+ self.name)
+ else:
+ return MessageElement(
+ ndr_pack(security.descriptor.from_sddl(
+ value,
+ domain_sid,
+ allow_device_in_sddl=self.allow_device_in_sddl)),
+ flags,
+ self.name
+ )
+
+
+class BooleanField(Field):
+ """A simple boolean field, can be a bool or list of bool."""
+
+ def from_db_value(self, ldb, value):
+ """Convert MessageElement into a bool or list of bool."""
+ if value is None:
+ return
+ elif len(value) > 1 or self.many:
+ return [str(item) == "TRUE" for item in value]
+ else:
+ return str(value) == "TRUE"
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert bool or list of bool into a MessageElement."""
+ if value is None:
+ return
+ elif isinstance(value, list):
+ return MessageElement(
+ [str(bool(item)).upper() for item in value], flags, self.name)
+ else:
+ return MessageElement(str(bool(value)).upper(), flags, self.name)
+
+
+class PossibleClaimValuesField(Field):
+ """Field for parsing possible values XML for claim types.
+
+ This field will be represented by a list of dicts as follows:
+
+ [
+ {"ValueGUID": <GUID>},
+ {"ValueDisplayName: "Display name"},
+ {"ValueDescription: "Optional description or None for no description"},
+ {"Value": <Value>},
+ ]
+
+ Note that the GUID needs to be created client-side when adding entries,
+ leaving it as None then saving it doesn't generate the GUID.
+
+ The field itself just converts the XML to list and vice versa, it doesn't
+ automatically generate GUIDs for entries, this is entirely up to the caller.
+ """
+
+ # Namespaces for PossibleValues xml parsing.
+ NAMESPACE = {
+ "xsd": "http://www.w3.org/2001/XMLSchema",
+ "xsi": "http://www.w3.org/2001/XMLSchema-instance",
+ "": "http://schemas.microsoft.com/2010/08/ActiveDirectory/PossibleValues"
+ }
+
+ def from_db_value(self, ldb, value):
+ """Parse MessageElement with XML to list of dicts."""
+ if value is not None:
+ root = ElementTree.fromstring(str(value))
+ string_list = root.find("StringList", self.NAMESPACE)
+
+ values = []
+ for item in string_list.findall("Item", self.NAMESPACE):
+ values.append({
+ "ValueGUID": item.find("ValueGUID", self.NAMESPACE).text,
+ "ValueDisplayName": item.find("ValueDisplayName",
+ self.NAMESPACE).text,
+ "ValueDescription": item.find("ValueDescription",
+ self.NAMESPACE).text,
+ "Value": item.find("Value", self.NAMESPACE).text,
+ })
+
+ return values
+
+ def to_db_value(self, ldb, value, flags):
+ """Convert list of dicts back to XML as a MessageElement."""
+ if value is None:
+ return
+
+ # Possible values should always be a list of dict, but for consistency
+ # with other fields just wrap a single value into a list and continue.
+ if isinstance(value, list):
+ possible_values = value
+ else:
+ possible_values = [value]
+
+ # No point storing XML of an empty list.
+ # Return None, the field will be unset on the next save.
+ if len(possible_values) == 0:
+ return
+
+ # root node
+ root = ElementTree.Element("PossibleClaimValues")
+ for name, url in self.NAMESPACE.items():
+ if name == "":
+ root.set("xmlns", url)
+ else:
+ root.set(f"xmlns:{name}", url)
+
+ # StringList node
+ string_list = ElementTree.SubElement(root, "StringList")
+
+ # List of values
+ for item_dict in possible_values:
+ item = ElementTree.SubElement(string_list, "Item")
+ item_guid = ElementTree.SubElement(item, "ValueGUID")
+ item_guid.text = item_dict["ValueGUID"]
+ item_name = ElementTree.SubElement(item, "ValueDisplayName")
+ item_name.text = item_dict["ValueDisplayName"]
+ item_desc = ElementTree.SubElement(item, "ValueDescription")
+ item_desc.text = item_dict["ValueDescription"]
+ item_value = ElementTree.SubElement(item, "Value")
+ item_value.text = item_dict["Value"]
+
+ # NOTE: indent was only added in Python 3.9 so can't be used yet.
+ # ElementTree.indent(root, space="\t", level=0)
+
+ out = io.BytesIO()
+ ElementTree.ElementTree(root).write(out,
+ encoding="utf-16",
+ xml_declaration=True,
+ short_empty_elements=False)
+
+ # Back to str as that is what MessageElement needs.
+ return MessageElement(out.getvalue().decode("utf-16"), flags, self.name)
diff --git a/python/samba/netcmd/domain/models/group.py b/python/samba/netcmd/domain/models/group.py
new file mode 100644
index 0000000..9473127
--- /dev/null
+++ b/python/samba/netcmd/domain/models/group.py
@@ -0,0 +1,42 @@
+# Unix SMB/CIFS implementation.
+#
+# Group model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BooleanField, DnField, IntegerField, SIDField, StringField
+from .model import Model
+
+
+class Group(Model):
+ admin_count = IntegerField("adminCount")
+ description = StringField("description")
+ is_critical_system_object = BooleanField("isCriticalSystemObject",
+ default=False, readonly=True)
+ member = DnField("member", many=True)
+ object_sid = SIDField("objectSid")
+ system_flags = IntegerField("systemFlags")
+
+ @staticmethod
+ def get_object_class():
+ return "group"
+
+ def get_authentication_sddl(self):
+ return "O:SYG:SYD:(XA;OICI;CR;;;WD;(Member_of_any {SID(%s)}))" % (
+ self.object_sid)
diff --git a/python/samba/netcmd/domain/models/model.py b/python/samba/netcmd/domain/models/model.py
new file mode 100644
index 0000000..602c6ca
--- /dev/null
+++ b/python/samba/netcmd/domain/models/model.py
@@ -0,0 +1,426 @@
+# Unix SMB/CIFS implementation.
+#
+# Model and basic ORM for the Ldb database.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import inspect
+from abc import ABCMeta, abstractmethod
+
+from ldb import ERR_NO_SUCH_OBJECT, FLAG_MOD_ADD, FLAG_MOD_REPLACE, LdbError,\
+ Message, MessageElement, SCOPE_BASE, SCOPE_SUBTREE, binary_encode
+from samba.sd_utils import SDUtils
+
+from .exceptions import DeleteError, DoesNotExist, FieldError,\
+ ProtectError, UnprotectError
+from .fields import DateTimeField, DnField, Field, GUIDField, IntegerField,\
+ StringField
+from .query import Query
+
+# Keeps track of registered models.
+# This gets populated by the ModelMeta class.
+MODELS = {}
+
+
+class ModelMeta(ABCMeta):
+
+ def __new__(mcls, name, bases, namespace, **kwargs):
+ cls = super().__new__(mcls, name, bases, namespace, **kwargs)
+
+ if cls.__name__ != "Model":
+ cls.fields = dict(inspect.getmembers(cls, lambda f: isinstance(f, Field)))
+ cls.meta = mcls
+ MODELS[name] = cls
+
+ return cls
+
+
+class Model(metaclass=ModelMeta):
+ cn = StringField("cn")
+ distinguished_name = DnField("distinguishedName")
+ dn = DnField("dn")
+ ds_core_propagation_data = DateTimeField("dsCorePropagationData",
+ hidden=True)
+ instance_type = IntegerField("instanceType")
+ name = StringField("name")
+ object_category = DnField("objectCategory")
+ object_class = StringField("objectClass",
+ default=lambda obj: obj.get_object_class())
+ object_guid = GUIDField("objectGUID")
+ usn_changed = IntegerField("uSNChanged", hidden=True)
+ usn_created = IntegerField("uSNCreated", hidden=True)
+ when_changed = DateTimeField("whenChanged", hidden=True)
+ when_created = DateTimeField("whenCreated", hidden=True)
+
+ def __init__(self, **kwargs):
+ """Create a new model instance and optionally populate fields.
+
+ Does not save the object to the database, call .save() for that.
+
+ :param kwargs: Optional input fields to populate object with
+ """
+ # Used by the _apply method, holds the original ldb Message,
+ # which is used by save() to determine what fields changed.
+ self._message = None
+
+ for field_name, field in self.fields.items():
+ if field_name in kwargs:
+ default = kwargs[field_name]
+ elif callable(field.default):
+ default = field.default(self)
+ else:
+ default = field.default
+
+ setattr(self, field_name, default)
+
+ def __repr__(self):
+ """Return object representation for this model."""
+ return f"<{self.__class__.__name__}: {self}>"
+
+ def __str__(self):
+ """Stringify model instance to implement in each model."""
+ return str(self.cn)
+
+ def __eq__(self, other):
+ """Basic object equality check only really checks if the dn matches.
+
+ :param other: The other object to compare with
+ """
+ if other is None:
+ return False
+ else:
+ return self.dn == other.dn
+
+ def __json__(self):
+ """Automatically called by custom JSONEncoder class.
+
+ When turning an object into json any fields of type RelatedField
+ will also end up calling this method.
+ """
+ if self.dn is not None:
+ return str(self.dn)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the container of this model.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for new objects
+ """
+ return ldb.get_default_basedn()
+
+ @classmethod
+ def get_search_dn(cls, ldb):
+ """Return the DN used for querying.
+
+ By default, this just calls get_base_dn, but it is possible to
+ return a different Dn for querying.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for searching
+ """
+ return cls.get_base_dn(ldb)
+
+ @staticmethod
+ @abstractmethod
+ def get_object_class():
+ """Returns the objectClass for this model."""
+ pass
+
+ @classmethod
+ def from_message(cls, ldb, message):
+ """Create a new model instance from the Ldb Message object.
+
+ :param ldb: Ldb connection
+ :param message: Ldb Message object to create instance from
+ """
+ obj = cls()
+ obj._apply(ldb, message)
+ return obj
+
+ def _apply(self, ldb, message):
+ """Internal method to apply Ldb Message to current object.
+
+ :param ldb: Ldb connection
+ :param message: Ldb Message object to apply
+ """
+ # Store the ldb Message so that in save we can see what changed.
+ self._message = message
+
+ for attr, field in self.fields.items():
+ if field.name in message:
+ setattr(self, attr, field.from_db_value(ldb, message[field.name]))
+
+ def refresh(self, ldb, fields=None):
+ """Refresh object from database.
+
+ :param ldb: Ldb connection
+ :param fields: Optional list of field names to refresh
+ """
+ attrs = [self.fields[f].name for f in fields] if fields else None
+
+ # This shouldn't normally happen but in case the object refresh fails.
+ try:
+ res = ldb.search(self.dn, scope=SCOPE_BASE, attrs=attrs)
+ except LdbError as e:
+ if e.args[0] == ERR_NO_SUCH_OBJECT:
+ raise DoesNotExist(f"Refresh failed, object gone: {self.dn}")
+ raise
+
+ self._apply(ldb, res[0])
+
+ def as_dict(self, include_hidden=False):
+ """Returns a dict representation of the model.
+
+ :param include_hidden: Include fields with hidden=True when set
+ :returns: dict representation of model using Ldb field names as keys
+ """
+ obj_dict = {}
+
+ for attr, field in self.fields.items():
+ if not field.hidden or include_hidden:
+ value = getattr(self, attr)
+ if value is not None:
+ obj_dict[field.name] = value
+
+ return obj_dict
+
+ @classmethod
+ def build_expression(cls, **kwargs):
+ """Build LDAP search expression from kwargs.
+
+ :kwargs: fields to use for expression using model field names
+ """
+ # Take a copy, never modify the original if it can be avoided.
+ # Then always add the object_class to the search criteria.
+ criteria = dict(kwargs)
+ criteria["object_class"] = cls.get_object_class()
+
+ # Build search expression.
+ num_fields = len(criteria)
+ expression = "" if num_fields == 1 else "(&"
+
+ for field_name, value in criteria.items():
+ field = cls.fields.get(field_name)
+ if not field:
+ raise ValueError(f"Unknown field '{field_name}'")
+ expression += f"({field.name}={binary_encode(value)})"
+
+ if num_fields > 1:
+ expression += ")"
+
+ return expression
+
+ @classmethod
+ def query(cls, ldb, **kwargs):
+ """Returns a search query for this model.
+
+ :param ldb: Ldb connection
+ :param kwargs: Search criteria as keyword args
+ """
+ base_dn = cls.get_search_dn(ldb)
+
+ # If the container does not exist produce a friendly error message.
+ try:
+ result = ldb.search(base_dn,
+ scope=SCOPE_SUBTREE,
+ expression=cls.build_expression(**kwargs))
+ except LdbError as e:
+ if e.args[0] == ERR_NO_SUCH_OBJECT:
+ raise DoesNotExist(f"Container does not exist: {base_dn}")
+ raise
+
+ return Query(cls, ldb, result)
+
+ @classmethod
+ def get(cls, ldb, **kwargs):
+ """Get one object, must always return one item.
+
+ Either find object by dn=, or any combination of attributes via kwargs.
+ If there are more than one result, MultipleObjectsReturned is raised.
+
+ :param ldb: Ldb connection
+ :param kwargs: Search criteria as keyword args
+ :returns: Model instance or None if not found
+ :raises: MultipleObjects returned if there are more than one results
+ """
+ # If a DN is provided use that to get the object directly.
+ # Otherwise, build a search expression using kwargs provided.
+ dn = kwargs.get("dn")
+
+ if dn:
+ # Handle LDAP error 32 LDAP_NO_SUCH_OBJECT, but raise for the rest.
+ # Return None if the User does not exist.
+ try:
+ res = ldb.search(dn, scope=SCOPE_BASE)
+ except LdbError as e:
+ if e.args[0] == ERR_NO_SUCH_OBJECT:
+ return None
+ else:
+ raise
+
+ return cls.from_message(ldb, res[0])
+ else:
+ return cls.query(ldb, **kwargs).get()
+
+ @classmethod
+ def create(cls, ldb, **kwargs):
+ """Create object constructs object and calls save straight after.
+
+ :param ldb: Ldb connection
+ :param kwargs: Fields to populate object from
+ :returns: object
+ """
+ obj = cls(**kwargs)
+ obj.save(ldb)
+ return obj
+
+ @classmethod
+ def get_or_create(cls, ldb, defaults=None, **kwargs):
+ """Retrieve object and if it doesn't exist create a new instance.
+
+ :param ldb: Ldb connection
+ :param defaults: Attributes only used for create but not search
+ :param kwargs: Attributes used for searching existing object
+ :returns: (object, bool created)
+ """
+ obj = cls.get(ldb, **kwargs)
+ if obj is None:
+ attrs = dict(kwargs)
+ if defaults is not None:
+ attrs.update(defaults)
+ return cls.create(ldb, **attrs), True
+ else:
+ return obj, False
+
+ def save(self, ldb):
+ """Save model to Ldb database.
+
+ The save operation will save all fields excluding fields that
+ return None when calling their `to_db_value` methods.
+
+ The `to_db_value` method can either return a ldb Message object,
+ or None if the field is to be excluded.
+
+ For updates, the existing object is fetched and only fields
+ that are changed are included in the update ldb Message.
+
+ Also for updates, any fields that currently have a value,
+ but are to be set to None will be seen as a delete operation.
+
+ After the save operation the object is refreshed from the server,
+ as often the server will populate some fields.
+
+ :param ldb: Ldb connection
+ """
+ if self.dn is None:
+ dn = self.get_base_dn(ldb)
+ dn.add_child(f"CN={self.cn or self.name}")
+ self.dn = dn
+
+ message = Message(dn=self.dn)
+ for attr, field in self.fields.items():
+ if attr != "dn" and not field.readonly:
+ value = getattr(self, attr)
+ try:
+ db_value = field.to_db_value(ldb, value, FLAG_MOD_ADD)
+ except ValueError as e:
+ raise FieldError(e, field=field)
+
+ # Don't add empty fields.
+ if db_value is not None and len(db_value):
+ message.add(db_value)
+
+ # Create object
+ ldb.add(message)
+
+ # Fetching object refreshes any automatically populated fields.
+ res = ldb.search(dn, scope=SCOPE_BASE)
+ self._apply(ldb, res[0])
+ else:
+ # Existing Message was stored to work out what fields changed.
+ existing_obj = self.from_message(ldb, self._message)
+
+ # Only modify replace or modify fields that have changed.
+ # Any fields that are set to None or an empty list get unset.
+ message = Message(dn=self.dn)
+ for attr, field in self.fields.items():
+ if attr != "dn" and not field.readonly:
+ value = getattr(self, attr)
+ old_value = getattr(existing_obj, attr)
+
+ if value != old_value:
+ try:
+ db_value = field.to_db_value(ldb, value,
+ FLAG_MOD_REPLACE)
+ except ValueError as e:
+ raise FieldError(e, field=field)
+
+ # When a field returns None or empty list, delete attr.
+ if db_value in (None, []):
+ db_value = MessageElement([],
+ FLAG_MOD_REPLACE,
+ field.name)
+ message.add(db_value)
+
+ # Saving nothing only triggers an error.
+ if len(message):
+ ldb.modify(message)
+
+ # Fetching object refreshes any automatically populated fields.
+ self.refresh(ldb)
+
+ def delete(self, ldb):
+ """Delete item from Ldb database.
+
+ If self.dn is None then the object has not yet been saved.
+
+ :param ldb: Ldb connection
+ """
+ if self.dn is None:
+ raise DeleteError("Cannot delete object that doesn't have a dn.")
+
+ try:
+ ldb.delete(self.dn)
+ except LdbError as e:
+ raise DeleteError(f"Delete failed: {e}")
+
+ def protect(self, ldb):
+ """Protect object from accidental deletion.
+
+ :param ldb: Ldb connection
+ """
+ utils = SDUtils(ldb)
+
+ try:
+ utils.dacl_add_ace(self.dn, "(D;;DTSD;;;WD)")
+ except LdbError as e:
+ raise ProtectError(f"Failed to protect object: {e}")
+
+ def unprotect(self, ldb):
+ """Unprotect object from accidental deletion.
+
+ :param ldb: Ldb connection
+ """
+ utils = SDUtils(ldb)
+
+ try:
+ utils.dacl_delete_aces(self.dn, "(D;;DTSD;;;WD)")
+ except LdbError as e:
+ raise UnprotectError(f"Failed to unprotect object: {e}")
diff --git a/python/samba/netcmd/domain/models/query.py b/python/samba/netcmd/domain/models/query.py
new file mode 100644
index 0000000..9cdb650
--- /dev/null
+++ b/python/samba/netcmd/domain/models/query.py
@@ -0,0 +1,81 @@
+# Unix SMB/CIFS implementation.
+#
+# Query class for the ORM to the Ldb database.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import re
+
+from .exceptions import DoesNotExist, MultipleObjectsReturned
+
+RE_SPLIT_CAMELCASE = re.compile(r"[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))")
+
+
+class Query:
+ """Simple Query class used by the `Model.query` method."""
+
+ def __init__(self, model, ldb, result):
+ self.model = model
+ self.ldb = ldb
+ self.result = result
+ self.count = result.count
+ self.name = " ".join(RE_SPLIT_CAMELCASE.findall(model.__name__)).lower()
+
+ def __iter__(self):
+ """Loop over Query class yields Model instances."""
+ for message in self.result:
+ yield self.model.from_message(self.ldb, message)
+
+ def first(self):
+ """Returns the first item in the Query or None for no results."""
+ if self.result.count:
+ return self.model.from_message(self.ldb, self.result[0])
+
+ def last(self):
+ """Returns the last item in the Query or None for no results."""
+ if self.result.count:
+ return self.model.from_message(self.ldb, self.result[-1])
+
+ def get(self):
+ """Returns one item or None if no results were found.
+
+ :returns: Model instance or None if not found.
+ :raises MultipleObjectsReturned: if more than one results were returned
+ """
+ if self.count > 1:
+ raise MultipleObjectsReturned(
+ f"More than one {self.name} objects returned (got {self.count}).")
+ elif self.count:
+ return self.model.from_message(self.ldb, self.result[0])
+
+ def one(self):
+ """Must return EXACTLY one item or raise an exception.
+
+ :returns: Model instance
+ :raises DoesNotExist: if no results were returned
+ :raises MultipleObjectsReturned: if more than one results were returned
+ """
+ if self.count < 1:
+ raise DoesNotExist(
+ f"{self.name.capitalize()} matching query not found")
+ elif self.count > 1:
+ raise MultipleObjectsReturned(
+ f"More than one {self.name} objects returned (got {self.count}).")
+ else:
+ return self.model.from_message(self.ldb, self.result[0])
diff --git a/python/samba/netcmd/domain/models/schema.py b/python/samba/netcmd/domain/models/schema.py
new file mode 100644
index 0000000..59ece05
--- /dev/null
+++ b/python/samba/netcmd/domain/models/schema.py
@@ -0,0 +1,124 @@
+# Unix SMB/CIFS implementation.
+#
+# Class and attribute schema models.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BinaryField, BooleanField, DnField, GUIDField,\
+ IntegerField, StringField
+from .model import Model
+
+
+class ClassSchema(Model):
+ default_object_category = DnField("defaultObjectCategory")
+ governs_id = StringField("governsID")
+ schema_id_guid = GUIDField("schemaIDGUID")
+ subclass_of = StringField("subclassOf")
+ admin_description = StringField("adminDescription")
+ admin_display_name = StringField("adminDisplayName")
+ default_hiding_value = BooleanField("defaultHidingValue")
+ default_security_descriptor = BinaryField("defaultSecurityDescriptor")
+ ldap_display_name = StringField("lDAPDisplayName")
+ may_contain = StringField("mayContain", many=True)
+ poss_superiors = StringField("possSuperiors", many=True)
+ rdn_att_id = StringField("rDNAttID")
+ show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly")
+ system_only = BooleanField("systemOnly", readonly=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the ClassSchema model.
+
+ This is the same as AttributeSchema, but the objectClass is different.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ return ldb.get_schema_basedn()
+
+ @staticmethod
+ def get_object_class():
+ return "classSchema"
+
+ @classmethod
+ def lookup(cls, ldb, name):
+ """Helper function to lookup class or raise LookupError.
+
+ :param ldb: Ldb connection
+ :param name: Class name
+ :raises: LookupError if not found
+ :raises: ValueError if name is not provided
+ """
+ if not name:
+ raise ValueError("Class name is required.")
+
+ attr = cls.get(ldb, ldap_display_name=name)
+ if attr is None:
+ raise LookupError(f"Could not locate {name} in class schema.")
+
+ return attr
+
+
+class AttributeSchema(Model):
+ attribute_id = StringField("attributeID")
+ attribute_syntax = StringField("attributeSyntax")
+ is_single_valued = BooleanField("isSingleValued")
+ ldap_display_name = StringField("lDAPDisplayName")
+ om_syntax = IntegerField("oMSyntax")
+ admin_description = StringField("adminDescription")
+ admin_display_name = StringField("adminDisplayName")
+ attribute_security_guid = GUIDField("attributeSecurityGUID")
+ schema_flags_ex = IntegerField("schemaFlagsEx")
+ search_flags = IntegerField("searchFlags")
+ show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly")
+ system_flags = IntegerField("systemFlags", readonly=True)
+ system_only = BooleanField("systemOnly", readonly=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the AttributeSchema model.
+
+ This is the same as ClassSchema, but the objectClass is different.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ return ldb.get_schema_basedn()
+
+ @staticmethod
+ def get_object_class():
+ return "attributeSchema"
+
+ @classmethod
+ def lookup(cls, ldb, name):
+ """Helper function to lookup attribute or raise LookupError.
+
+ :param ldb: Ldb connection
+ :param name: Attribute name
+ :raises: LookupError if not found
+ :raises: ValueError if name is not provided
+ """
+ if not name:
+ raise ValueError("Attribute name is required.")
+
+ attr = cls.get(ldb, ldap_display_name=name)
+ if attr is None:
+ raise LookupError(f"Could not locate {name} in attribute schema.")
+
+ return attr
diff --git a/python/samba/netcmd/domain/models/site.py b/python/samba/netcmd/domain/models/site.py
new file mode 100644
index 0000000..44643f3
--- /dev/null
+++ b/python/samba/netcmd/domain/models/site.py
@@ -0,0 +1,47 @@
+# Unix SMB/CIFS implementation.
+#
+# Site model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BooleanField, DnField, IntegerField
+from .model import Model
+
+
+class Site(Model):
+ show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly")
+ system_flags = IntegerField("systemFlags", readonly=True)
+
+ # Backlinks
+ site_object_bl = DnField("siteObjectBL", readonly=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the Site model.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for new objects
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child("CN=Sites")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "site"
diff --git a/python/samba/netcmd/domain/models/subnet.py b/python/samba/netcmd/domain/models/subnet.py
new file mode 100644
index 0000000..bb249d4
--- /dev/null
+++ b/python/samba/netcmd/domain/models/subnet.py
@@ -0,0 +1,45 @@
+# Unix SMB/CIFS implementation.
+#
+# Subnet model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BooleanField, DnField, IntegerField
+from .model import Model
+
+
+class Subnet(Model):
+ show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly")
+ site_object = DnField("siteObject")
+ system_flags = IntegerField("systemFlags", readonly=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the Subnet model.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for new objects
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child("CN=Subnets,CN=Sites")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "subnet"
diff --git a/python/samba/netcmd/domain/models/user.py b/python/samba/netcmd/domain/models/user.py
new file mode 100644
index 0000000..7b0785a
--- /dev/null
+++ b/python/samba/netcmd/domain/models/user.py
@@ -0,0 +1,75 @@
+# Unix SMB/CIFS implementation.
+#
+# User model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from ldb import Dn
+
+from samba.dsdb import DS_GUID_USERS_CONTAINER
+
+from .fields import DnField, SIDField, StringField
+from .model import Model
+
+
+class User(Model):
+ username = StringField("sAMAccountName")
+ assigned_policy = DnField("msDS-AssignedAuthNPolicy")
+ assigned_silo = DnField("msDS-AssignedAuthNPolicySilo")
+ object_sid = SIDField("objectSid")
+
+ def __str__(self):
+ """Return username rather than cn for User model."""
+ return self.username
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the User model.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for new objects
+ """
+ return ldb.get_wellknown_dn(ldb.get_default_basedn(),
+ DS_GUID_USERS_CONTAINER)
+
+ @classmethod
+ def get_search_dn(cls, ldb):
+ """Return Dn used for searching so Computers will also be found.
+
+ :param ldb: Ldb connection
+ :return: Dn to use for searching
+ """
+ return ldb.get_root_basedn()
+
+ @staticmethod
+ def get_object_class():
+ return "user"
+
+ @classmethod
+ def find(cls, ldb, name):
+ """Helper function to find a user first by Dn then username.
+
+ If the Dn can't be parsed, use sAMAccountName instead.
+ """
+ try:
+ query = {"dn": Dn(ldb, name)}
+ except ValueError:
+ query = {"username": name}
+
+ return cls.get(ldb, **query)
diff --git a/python/samba/netcmd/domain/models/value_type.py b/python/samba/netcmd/domain/models/value_type.py
new file mode 100644
index 0000000..00a4e07
--- /dev/null
+++ b/python/samba/netcmd/domain/models/value_type.py
@@ -0,0 +1,96 @@
+# Unix SMB/CIFS implementation.
+#
+# Claim value type model.
+#
+# Copyright (C) Catalyst.Net Ltd. 2023
+#
+# Written by Rob van der Linde <rob@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from .fields import BooleanField, DnField, IntegerField, StringField
+from .model import Model
+
+# LDAP Syntax to Value Type CN lookup table.
+# These are the lookups used by known AD attributes, add new ones as required.
+SYNTAX_TO_VALUE_TYPE_CN = {
+ "2.5.5.1": "MS-DS-Text", # Object(DS-DN)
+ "2.5.5.2": "MS-DS-Text", # String(Object-Identifier)
+ "2.5.5.8": "MS-DS-YesNo", # Boolean
+ "2.5.5.9": "MS-DS-Number", # Integer
+ "2.5.5.12": "MS-DS-Text", # String(Unicode)
+ "2.5.5.15": "MS-DS-Text", # String(NT-Sec-Desc)
+ "2.5.5.16": "MS-DS-Number", # LargeInteger
+}
+
+
+class ValueType(Model):
+ description = StringField("description")
+ display_name = StringField("displayName")
+ claim_is_single_valued = BooleanField("msDS-ClaimIsSingleValued")
+ claim_is_value_space_restricted = BooleanField(
+ "msDS-ClaimIsValueSpaceRestricted")
+ claim_value_type = IntegerField("msDS-ClaimValueType")
+ is_possible_values_present = BooleanField("msDS-IsPossibleValuesPresent")
+ show_in_advanced_view_only = BooleanField("showInAdvancedViewOnly")
+
+ # Backlinks
+ value_type_reference_bl = DnField(
+ "msDS-ValueTypeReferenceBL", readonly=True)
+
+ @staticmethod
+ def get_base_dn(ldb):
+ """Return the base DN for the ValueType model.
+
+ :param ldb: Ldb connection
+ :return: Dn object of container
+ """
+ base_dn = ldb.get_config_basedn()
+ base_dn.add_child("CN=Value Types,CN=Claims Configuration,CN=Services")
+ return base_dn
+
+ @staticmethod
+ def get_object_class():
+ return "msDS-ValueType"
+
+ @classmethod
+ def lookup(cls, ldb, attribute):
+ """Helper function to get ValueType by attribute or raise LookupError.
+
+ :param ldb: Ldb connection
+ :param attribute: AttributeSchema object
+ :raises: LookupError if not found
+ :raises: ValueError for unknown attribute syntax
+ """
+ # If attribute is None.
+ if not attribute:
+ raise ValueError("Attribute is required for value type lookup.")
+
+ # Unknown attribute syntax as it isn't in the lookup table.
+ syntax = attribute.attribute_syntax
+ cn = SYNTAX_TO_VALUE_TYPE_CN.get(syntax)
+ if not cn:
+ raise ValueError(f"Unable to process attribute syntax {syntax}")
+
+ # This should always return something but should still be handled.
+ value_type = cls.get(ldb, cn=cn)
+ if value_type is None:
+ raise LookupError(
+ f"Could not find claim value type for {attribute}.")
+
+ return value_type
+
+ def __str__(self):
+ return str(self.display_name)