diff options
Diffstat (limited to 'src/VBox/ValidationKit/testmanager/core/base.py')
-rwxr-xr-x | src/VBox/ValidationKit/testmanager/core/base.py | 1514 |
1 files changed, 1514 insertions, 0 deletions
diff --git a/src/VBox/ValidationKit/testmanager/core/base.py b/src/VBox/ValidationKit/testmanager/core/base.py new file mode 100755 index 00000000..eac3d921 --- /dev/null +++ b/src/VBox/ValidationKit/testmanager/core/base.py @@ -0,0 +1,1514 @@ +# -*- coding: utf-8 -*- +# $Id: base.py $ +# pylint: disable=too-many-lines + +""" +Test Manager Core - Base Class(es). +""" + +__copyright__ = \ +""" +Copyright (C) 2012-2023 Oracle and/or its affiliates. + +This file is part of VirtualBox base platform packages, as +available from https://www.virtualbox.org. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation, in version 3 of the +License. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, see <https://www.gnu.org/licenses>. + +The contents of this file may alternatively be used under the terms +of the Common Development and Distribution License Version 1.0 +(CDDL), a copy of it is provided in the "COPYING.CDDL" file included +in the VirtualBox distribution, in which case the provisions of the +CDDL are applicable instead of those of the GPL. + +You may elect to license modified versions of this file under the +terms and conditions of either the GPL or the CDDL or both. + +SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0 +""" +__version__ = "$Revision: 155244 $" + + +# Standard python imports. +import copy; +import datetime; +import json; +import re; +import socket; +import sys; +import uuid; +import unittest; + +# Validation Kit imports. +from common import utils; + +# Python 3 hacks: +if sys.version_info[0] >= 3: + long = int # pylint: disable=redefined-builtin,invalid-name + + +class TMExceptionBase(Exception): + """ + For exceptions raised by any TestManager component. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMTooManyRows(TMExceptionBase): + """ + Too many rows in the result. + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMRowNotFound(TMExceptionBase): + """ + Database row not found. + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMRowAlreadyExists(TMExceptionBase): + """ + Database row already exists (typically raised by addEntry). + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMInvalidData(TMExceptionBase): + """ + Data validation failed. + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMRowInUse(TMExceptionBase): + """ + Database row is in use and cannot be deleted. + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class TMInFligthCollision(TMExceptionBase): + """ + Database update failed because someone else had already made changes to + the data there. + Used by ModelLogicBase decendants. + """ + pass; # pylint: disable=unnecessary-pass + + +class ModelBase(object): # pylint: disable=too-few-public-methods + """ + Something all classes in the logical model inherits from. + + Not sure if 'logical model' is the right term here. + Will see if it has any purpose later on... + """ + + def __init__(self): + pass; + + +class ModelDataBase(ModelBase): # pylint: disable=too-few-public-methods + """ + Something all classes in the data classes in the logical model inherits from. + """ + + ## Child classes can use this to list array attributes which should use + # an empty array ([]) instead of None as database NULL value. + kasAltArrayNull = []; + + ## validate + ## @{ + ksValidateFor_Add = 'add'; + ksValidateFor_AddForeignId = 'add-foreign-id'; + ksValidateFor_Edit = 'edit'; + ksValidateFor_Other = 'other'; + ## @} + + + ## List of internal attributes which should be ignored by + ## getDataAttributes and related machinery + kasInternalAttributes = []; + + def __init__(self): + ModelBase.__init__(self); + + + # + # Standard methods implemented by combining python magic and hungarian prefixes. + # + + def getDataAttributes(self): + """ + Returns a list of data attributes. + """ + asRet = []; + asAttrs = dir(self); + for sAttr in asAttrs: + if sAttr[0] == '_' or sAttr[0] == 'k': + continue; + if sAttr in self.kasInternalAttributes: + continue; + oValue = getattr(self, sAttr); + if callable(oValue): + continue; + asRet.append(sAttr); + return asRet; + + def initFromOther(self, oOther): + """ + Initialize this object with the values from another instance (child + class instance is accepted). + + This serves as a kind of copy constructor. + + Returns self. May raise exception if the type of other object differs + or is damaged. + """ + for sAttr in self.getDataAttributes(): + setattr(self, sAttr, getattr(oOther, sAttr)); + return self; + + @staticmethod + def getHungarianPrefix(sName): + """ + Returns the hungarian prefix of the given name. + """ + for i, _ in enumerate(sName): + if sName[i] not in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', + 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']: + assert re.search('^[A-Z][a-zA-Z0-9]*$', sName[i:]) is not None; + return sName[:i]; + return sName; + + def getAttributeParamNullValues(self, sAttr): + """ + Returns a list of parameter NULL values, with the preferred one being + the first element. + + Child classes can override this to handle one or more attributes specially. + """ + sPrefix = self.getHungarianPrefix(sAttr); + if sPrefix in ['id', 'uid', 'i', 'off', 'pct']: + return [-1, '', '-1',]; + if sPrefix in ['l', 'c',]: + return [long(-1), '', '-1',]; + if sPrefix == 'f': + return ['',]; + if sPrefix in ['enm', 'ip', 's', 'ts', 'uuid']: + return ['',]; + if sPrefix in ['ai', 'aid', 'al', 'as']: + return [[], '', None]; ## @todo ?? + if sPrefix == 'bm': + return ['', [],]; ## @todo bitmaps. + raise TMExceptionBase('Unable to classify "%s" (prefix %s)' % (sAttr, sPrefix)); + + def isAttributeNull(self, sAttr, oValue): + """ + Checks if the specified attribute value indicates NULL. + Return True/False. + + Note! This isn't entirely kosher actually. + """ + if oValue is None: + return True; + aoNilValues = self.getAttributeParamNullValues(sAttr); + return oValue in aoNilValues; + + def _convertAttributeFromParamNull(self, sAttr, oValue): + """ + Converts an attribute from parameter NULL to database NULL value. + Returns the new attribute value. + """ + aoNullValues = self.getAttributeParamNullValues(sAttr); + if oValue in aoNullValues: + oValue = None if sAttr not in self.kasAltArrayNull else []; + # + # Perform deep conversion on ModelDataBase object and lists of them. + # + elif isinstance(oValue, list) and oValue and isinstance(oValue[0], ModelDataBase): + oValue = copy.copy(oValue); + for i, _ in enumerate(oValue): + assert isinstance(oValue[i], ModelDataBase); + oValue[i] = copy.copy(oValue[i]); + oValue[i].convertFromParamNull(); + + elif isinstance(oValue, ModelDataBase): + oValue = copy.copy(oValue); + oValue.convertFromParamNull(); + + return oValue; + + def convertFromParamNull(self): + """ + Converts from parameter NULL values to database NULL values (None). + Returns self. + """ + for sAttr in self.getDataAttributes(): + oValue = getattr(self, sAttr); + oNewValue = self._convertAttributeFromParamNull(sAttr, oValue); + if oValue != oNewValue: + setattr(self, sAttr, oNewValue); + return self; + + def _convertAttributeToParamNull(self, sAttr, oValue): + """ + Converts an attribute from database NULL to a sepcial value we can pass + thru parameter list. + Returns the new attribute value. + """ + if oValue is None: + oValue = self.getAttributeParamNullValues(sAttr)[0]; + # + # Perform deep conversion on ModelDataBase object and lists of them. + # + elif isinstance(oValue, list) and oValue and isinstance(oValue[0], ModelDataBase): + oValue = copy.copy(oValue); + for i, _ in enumerate(oValue): + assert isinstance(oValue[i], ModelDataBase); + oValue[i] = copy.copy(oValue[i]); + oValue[i].convertToParamNull(); + + elif isinstance(oValue, ModelDataBase): + oValue = copy.copy(oValue); + oValue.convertToParamNull(); + + return oValue; + + def convertToParamNull(self): + """ + Converts from database NULL values (None) to special values we can + pass thru parameters list. + Returns self. + """ + for sAttr in self.getDataAttributes(): + oValue = getattr(self, sAttr); + oNewValue = self._convertAttributeToParamNull(sAttr, oValue); + if oValue != oNewValue: + setattr(self, sAttr, oNewValue); + return self; + + def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb): + """ + Validates and convert one attribute. + Returns the converted value. + + Child classes can override this to handle one or more attributes specially. + Note! oDb can be None. + """ + sPrefix = self.getHungarianPrefix(sAttr); + + if sPrefix in ['id', 'uid']: + (oNewValue, sError) = self.validateInt( oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + elif sPrefix in ['i', 'off', 'pct']: + (oNewValue, sError) = self.validateInt( oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + iMin = getattr(self, 'kiMin_' + sAttr, 0), + iMax = getattr(self, 'kiMax_' + sAttr, 0x7ffffffe)); + elif sPrefix in ['l', 'c']: + (oNewValue, sError) = self.validateLong(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + lMin = getattr(self, 'klMin_' + sAttr, 0), + lMax = getattr(self, 'klMax_' + sAttr, None)); + elif sPrefix == 'f': + if not oValue and not fAllowNull: oValue = '0'; # HACK ALERT! Checkboxes are only added when checked. + (oNewValue, sError) = self.validateBool(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + elif sPrefix == 'ts': + (oNewValue, sError) = self.validateTs( oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + elif sPrefix == 'ip': + (oNewValue, sError) = self.validateIp( oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + elif sPrefix == 'uuid': + (oNewValue, sError) = self.validateUuid(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + elif sPrefix == 'enm': + (oNewValue, sError) = self.validateWord(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + asValid = getattr(self, 'kasValidValues_' + sAttr)); # The list is required. + elif sPrefix == 's': + (oNewValue, sError) = self.validateStr( oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + cchMin = getattr(self, 'kcchMin_' + sAttr, 0), + cchMax = getattr(self, 'kcchMax_' + sAttr, 4096), + fAllowUnicodeSymbols = getattr(self, 'kfAllowUnicode_' + sAttr, False) ); + ## @todo al. + elif sPrefix == 'aid': + (oNewValue, sError) = self.validateListOfInts(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + iMin = 1, iMax = 0x7ffffffe); + elif sPrefix == 'as': + (oNewValue, sError) = self.validateListOfStr(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + asValidValues = getattr(self, 'kasValidValues_' + sAttr, None), + cchMin = getattr(self, 'kcchMin_' + sAttr, 0 if fAllowNull else 1), + cchMax = getattr(self, 'kcchMax_' + sAttr, 4096)); + + elif sPrefix == 'bm': + ## @todo figure out bitfields. + (oNewValue, sError) = self.validateListOfStr(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull); + else: + raise TMExceptionBase('Unable to classify "%s" (prefix %s)' % (sAttr, sPrefix)); + + _ = sParam; _ = oDb; + return (oNewValue, sError); + + def _validateAndConvertWorker(self, asAllowNullAttributes, oDb, enmValidateFor = ksValidateFor_Other): + """ + Worker for implementing validateAndConvert(). + """ + dErrors = {}; + for sAttr in self.getDataAttributes(): + oValue = getattr(self, sAttr); + sParam = getattr(self, 'ksParam_' + sAttr); + aoNilValues = self.getAttributeParamNullValues(sAttr); + aoNilValues.append(None); + + (oNewValue, sError) = self._validateAndConvertAttribute(sAttr, sParam, oValue, aoNilValues, + sAttr in asAllowNullAttributes, oDb); + if oValue != oNewValue: + setattr(self, sAttr, oNewValue); + if sError is not None: + dErrors[sParam] = sError; + + # Check the NULL requirements of the primary ID(s) for the 'add' and 'edit' actions. + if enmValidateFor in (ModelDataBase.ksValidateFor_Add, + ModelDataBase.ksValidateFor_AddForeignId, + ModelDataBase.ksValidateFor_Edit,): + fMustBeNull = enmValidateFor == ModelDataBase.ksValidateFor_Add; + sAttr = getattr(self, 'ksIdAttr', None); + if sAttr is not None: + oValue = getattr(self, sAttr); + if self.isAttributeNull(sAttr, oValue) != fMustBeNull: + sParam = getattr(self, 'ksParam_' + sAttr); + sErrMsg = 'Must be NULL!' if fMustBeNull else 'Must not be NULL!' + if sParam in dErrors: + dErrors[sParam] += ' ' + sErrMsg; + else: + dErrors[sParam] = sErrMsg; + + return dErrors; + + def validateAndConvert(self, oDb, enmValidateFor = ksValidateFor_Other): + """ + Validates the input and converts valid fields to their right type. + Returns a dictionary with per field reports, only invalid fields will + be returned, so an empty dictionary means that the data is valid. + + The dictionary keys are ksParam_*. + + Child classes can override _validateAndConvertAttribute to handle + selected fields specially. There are also a few class variables that + can be used to advice the validation: kcchMin_sAttr, kcchMax_sAttr, + kiMin_iAttr, kiMax_iAttr, klMin_lAttr, klMax_lAttr, + kasValidValues_enmAttr, and kasAllowNullAttributes. + """ + return self._validateAndConvertWorker(getattr(self, 'kasAllowNullAttributes', []), oDb, + enmValidateFor = enmValidateFor); + + def validateAndConvertEx(self, asAllowNullAttributes, oDb, enmValidateFor = ksValidateFor_Other): + """ + Same as validateAndConvert but with custom allow-null list. + """ + return self._validateAndConvertWorker(asAllowNullAttributes, oDb, enmValidateFor = enmValidateFor); + + def convertParamToAttribute(self, sAttr, sParam, oValue, oDisp, fStrict): + """ + Calculate the attribute value when initialized from a parameter. + + Returns the new value, with parameter NULL values. Raises exception on + invalid parameter value. + + Child classes can override to do special parameter conversion jobs. + """ + sPrefix = self.getHungarianPrefix(sAttr); + asValidValues = getattr(self, 'kasValidValues_' + sAttr, None); + fAllowNull = sAttr in getattr(self, 'kasAllowNullAttributes', []); + if fStrict: + if sPrefix == 'f': + # HACK ALERT! Checkboxes are only present when checked, so we always have to provide a default. + oNewValue = oDisp.getStringParam(sParam, asValidValues, '0'); + elif sPrefix[0] == 'a': + # HACK ALERT! Lists are not present if empty. + oNewValue = oDisp.getListOfStrParams(sParam, []); + else: + oNewValue = oDisp.getStringParam(sParam, asValidValues, None, fAllowNull = fAllowNull); + else: + if sPrefix[0] == 'a': + oNewValue = oDisp.getListOfStrParams(sParam, []); + else: + assert oValue is not None, 'sAttr=%s' % (sAttr,); + oNewValue = oDisp.getStringParam(sParam, asValidValues, oValue, fAllowNull = fAllowNull); + return oNewValue; + + def initFromParams(self, oDisp, fStrict = True): + """ + Initialize the object from parameters. + The input is not validated at all, except that all parameters must be + present when fStrict is True. + + Returns self. Raises exception on invalid parameter value. + + Note! The returned object has parameter NULL values, not database ones! + """ + + self.convertToParamNull() + for sAttr in self.getDataAttributes(): + oValue = getattr(self, sAttr); + oNewValue = self.convertParamToAttribute(sAttr, getattr(self, 'ksParam_' + sAttr), oValue, oDisp, fStrict); + if oNewValue != oValue: + setattr(self, sAttr, oNewValue); + return self; + + def areAttributeValuesEqual(self, sAttr, sPrefix, oValue1, oValue2): + """ + Called to compare two attribute values and python thinks differs. + + Returns True/False. + + Child classes can override this to do special compares of things like arrays. + """ + # Just in case someone uses it directly. + if oValue1 == oValue2: + return True; + + # + # Timestamps can be both string (param) and object (db) + # depending on the data source. Compare string values to make + # sure we're doing the right thing here. + # + if sPrefix == 'ts': + return str(oValue1) == str(oValue2); + + # + # Some generic code handling ModelDataBase children. + # + if isinstance(oValue1, list) and isinstance(oValue2, list): + if len(oValue1) == len(oValue2): + for i, _ in enumerate(oValue1): + if not isinstance(oValue1[i], ModelDataBase) \ + or type(oValue1) is not type(oValue2): + return False; + if not oValue1[i].isEqual(oValue2[i]): + return False; + return True; + + elif isinstance(oValue1, ModelDataBase) \ + and type(oValue1) is type(oValue2): + return oValue1[i].isEqual(oValue2[i]); + + _ = sAttr; + return False; + + def isEqual(self, oOther): + """ Compares two instances. """ + for sAttr in self.getDataAttributes(): + if getattr(self, sAttr) != getattr(oOther, sAttr): + # Delegate the final decision to an overridable method. + if not self.areAttributeValuesEqual(sAttr, self.getHungarianPrefix(sAttr), + getattr(self, sAttr), getattr(oOther, sAttr)): + return False; + return True; + + def isEqualEx(self, oOther, asExcludeAttrs): + """ Compares two instances, omitting the given attributes. """ + for sAttr in self.getDataAttributes(): + if sAttr not in asExcludeAttrs \ + and getattr(self, sAttr) != getattr(oOther, sAttr): + # Delegate the final decision to an overridable method. + if not self.areAttributeValuesEqual(sAttr, self.getHungarianPrefix(sAttr), + getattr(self, sAttr), getattr(oOther, sAttr)): + return False; + return True; + + def reinitToNull(self): + """ + Reinitializes the object to (database) NULL values. + Returns self. + """ + for sAttr in self.getDataAttributes(): + setattr(self, sAttr, None); + return self; + + def toString(self): + """ + Stringifies the object. + Returns string representation. + """ + + sMembers = ''; + for sAttr in self.getDataAttributes(): + oValue = getattr(self, sAttr); + sMembers += ', %s=%s' % (sAttr, oValue); + + oClass = type(self); + if sMembers == '': + return '<%s>' % (oClass.__name__); + return '<%s: %s>' % (oClass.__name__, sMembers[2:]); + + def __str__(self): + return self.toString(); + + + + # + # New validation helpers. + # + # These all return (oValue, sError), where sError is None when the value + # is valid and an error message when not. On success and in case of + # range errors, oValue is converted into the requested type. + # + + @staticmethod + def validateInt(sValue, iMin = 0, iMax = 0x7ffffffe, aoNilValues = tuple([-1, None, '']), fAllowNull = True): + """ Validates an integer field. """ + if sValue in aoNilValues: + if fAllowNull: + return (None if sValue is None else aoNilValues[0], None); + return (sValue, 'Mandatory.'); + + try: + if utils.isString(sValue): + iValue = int(sValue, 0); + else: + iValue = int(sValue); + except: + return (sValue, 'Not an integer'); + + if iValue in aoNilValues: + return (aoNilValues[0], None if fAllowNull else 'Mandatory.'); + + if iValue < iMin: + return (iValue, 'Value too small (min %d)' % (iMin,)); + if iValue > iMax: + return (iValue, 'Value too high (max %d)' % (iMax,)); + return (iValue, None); + + @staticmethod + def validateLong(sValue, lMin = 0, lMax = None, aoNilValues = tuple([long(-1), None, '']), fAllowNull = True): + """ Validates an long integer field. """ + if sValue in aoNilValues: + if fAllowNull: + return (None if sValue is None else aoNilValues[0], None); + return (sValue, 'Mandatory.'); + try: + if utils.isString(sValue): + lValue = long(sValue, 0); + else: + lValue = long(sValue); + except: + return (sValue, 'Not a long integer'); + + if lValue in aoNilValues: + return (aoNilValues[0], None if fAllowNull else 'Mandatory.'); + + if lMin is not None and lValue < lMin: + return (lValue, 'Value too small (min %d)' % (lMin,)); + if lMax is not None and lValue > lMax: + return (lValue, 'Value too high (max %d)' % (lMax,)); + return (lValue, None); + + kdTimestampRegex = { + len('2012-10-08 01:54:06'): r'(\d{4})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d)$', + len('2012-10-08 01:54:06.00'): r'(\d{4})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d).\d{2}$', + len('2012-10-08 01:54:06.000'): r'(\d{4})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d).\d{3}$', + len('999999-12-31 00:00:00.00'): r'(\d{6})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d).\d{2}$', + len('9999-12-31 23:59:59.999999'): r'(\d{4})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d).\d{6}$', + len('9999-12-31T23:59:59.999999999'): r'(\d{4})-([01]\d)-([0123]\d)[ Tt]([012]\d):[0-5]\d:([0-6]\d).\d{9}$', + }; + + @staticmethod + def validateTs(sValue, aoNilValues = tuple([None, '']), fAllowNull = True, fRelative = False): + """ Validates a timestamp field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + if not utils.isString(sValue): + return (sValue, None); + + # Validate and strip off the timezone stuff. + if sValue[-1] in 'Zz': + sStripped = sValue[:-1]; + sValue = sStripped + 'Z'; + elif len(sValue) >= 19 + 3: + oRes = re.match(r'^.*[+-](\d\d):(\d\d)$', sValue); + if oRes is not None: + if int(oRes.group(1)) > 12 or int(oRes.group(2)) >= 60: + return (sValue, 'Invalid timezone offset.'); + sStripped = sValue[:-6]; + else: + sStripped = sValue; + else: + sStripped = sValue; + + # Used the stripped value length to find regular expression for validating and parsing the timestamp. + sError = None; + sRegExp = ModelDataBase.kdTimestampRegex.get(len(sStripped), None); + if sRegExp: + oRes = re.match(sRegExp, sStripped); + if oRes is not None: + iYear = int(oRes.group(1)); + if iYear % 4 == 0 and (iYear % 100 != 0 or iYear % 400 == 0): + acDaysOfMonth = [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; + else: + acDaysOfMonth = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; + iMonth = int(oRes.group(2)); + iDay = int(oRes.group(3)); + iHour = int(oRes.group(4)); + iSec = int(oRes.group(5)); + if iMonth > 12 or (iMonth <= 0 and not fRelative): + sError = 'Invalid timestamp month.'; + elif iDay > acDaysOfMonth[iMonth - 1]: + sError = 'Invalid timestamp day-of-month (%02d has %d days).' % (iMonth, acDaysOfMonth[iMonth - 1]); + elif iHour > 23: + sError = 'Invalid timestamp hour.' + elif iSec >= 61: + sError = 'Invalid timestamp second.' + elif iSec >= 60: + sError = 'Invalid timestamp: no leap seconds, please.' + else: + sError = 'Invalid timestamp (validation regexp: %s).' % (sRegExp,); + else: + sError = 'Invalid timestamp length.'; + return (sValue, sError); + + @staticmethod + def validateIp(sValue, aoNilValues = tuple([None, '']), fAllowNull = True): + """ Validates an IP address field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + if sValue == '::1': + return (sValue, None); + + try: + socket.inet_pton(socket.AF_INET, sValue); # pylint: disable=no-member + except: + try: + socket.inet_pton(socket.AF_INET6, sValue); # pylint: disable=no-member + except: + return (sValue, 'Not a valid IP address.'); + + return (sValue, None); + + @staticmethod + def validateBool(sValue, aoNilValues = tuple([None, '']), fAllowNull = True): + """ Validates a boolean field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + if sValue in ('True', 'true', '1', True): + return (True, None); + if sValue in ('False', 'false', '0', False): + return (False, None); + return (sValue, 'Invalid boolean value.'); + + @staticmethod + def validateUuid(sValue, aoNilValues = tuple([None, '']), fAllowNull = True): + """ Validates an UUID field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + try: + sValue = str(uuid.UUID(sValue)); + except: + return (sValue, 'Invalid UUID value.'); + return (sValue, None); + + @staticmethod + def validateWord(sValue, cchMin = 1, cchMax = 64, asValid = None, aoNilValues = tuple([None, '']), fAllowNull = True): + """ Validates a word field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + if re.search('[^a-zA-Z0-9_-]', sValue) is not None: + sError = 'Single word ([a-zA-Z0-9_-]), please.'; + elif cchMin is not None and len(sValue) < cchMin: + sError = 'Too short, min %s chars' % (cchMin,); + elif cchMax is not None and len(sValue) > cchMax: + sError = 'Too long, max %s chars' % (cchMax,); + elif asValid is not None and sValue not in asValid: + sError = 'Invalid value "%s", must be one of: %s' % (sValue, asValid); + else: + sError = None; + return (sValue, sError); + + @staticmethod + def validateStr(sValue, cchMin = 0, cchMax = 4096, aoNilValues = tuple([None, '']), fAllowNull = True, + fAllowUnicodeSymbols = False): + """ Validates a string field. """ + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + if cchMin is not None and len(sValue) < cchMin: + sError = 'Too short, min %s chars' % (cchMin,); + elif cchMax is not None and len(sValue) > cchMax: + sError = 'Too long, max %s chars' % (cchMax,); + elif fAllowUnicodeSymbols is False and utils.hasNonAsciiCharacters(sValue): + sError = 'Non-ascii characters not allowed' + else: + sError = None; + return (sValue, sError); + + @staticmethod + def validateEmail(sValue, aoNilValues = tuple([None, '']), fAllowNull = True): + """ Validates a email field.""" + if sValue in aoNilValues: + return (sValue, None if fAllowNull else 'Mandatory.'); + + if re.match(r'.+@.+\..+', sValue) is None: + return (sValue,'Invalid e-mail format.'); + return (sValue, None); + + @staticmethod + def validateListOfSomething(asValues, aoNilValues = tuple([[], None]), fAllowNull = True): + """ Validate a list of some uniform values. Returns a copy of the list (if list it is). """ + if asValues in aoNilValues or (not asValues and not fAllowNull): + return (asValues, None if fAllowNull else 'Mandatory.') + + if not isinstance(asValues, list): + return (asValues, 'Invalid data type (%s).' % (type(asValues),)); + + asValues = list(asValues); # copy the list. + if asValues: + oType = type(asValues[0]); + for i in range(1, len(asValues)): + if type(asValues[i]) is not oType: # pylint: disable=unidiomatic-typecheck + return (asValues, 'Invalid entry data type ([0]=%s vs [%d]=%s).' % (oType, i, type(asValues[i])) ); + + return (asValues, None); + + @staticmethod + def validateListOfStr(asValues, cchMin = None, cchMax = None, asValidValues = None, + aoNilValues = tuple([[], None]), fAllowNull = True): + """ Validates a list of text items.""" + (asValues, sError) = ModelDataBase.validateListOfSomething(asValues, aoNilValues, fAllowNull); + + if sError is None and asValues not in aoNilValues and asValues: + if not utils.isString(asValues[0]): + return (asValues, 'Invalid item data type.'); + + if not fAllowNull and cchMin is None: + cchMin = 1; + + for sValue in asValues: + if asValidValues is not None and sValue not in asValidValues: + sThisErr = 'Invalid value "%s".' % (sValue,); + elif cchMin is not None and len(sValue) < cchMin: + sThisErr = 'Value "%s" is too short, min length is %u chars.' % (sValue, cchMin); + elif cchMax is not None and len(sValue) > cchMax: + sThisErr = 'Value "%s" is too long, max length is %u chars.' % (sValue, cchMax); + else: + continue; + + if sError is None: + sError = sThisErr; + else: + sError += ' ' + sThisErr; + + return (asValues, sError); + + @staticmethod + def validateListOfInts(asValues, iMin = 0, iMax = 0x7ffffffe, aoNilValues = tuple([[], None]), fAllowNull = True): + """ Validates a list of integer items.""" + (asValues, sError) = ModelDataBase.validateListOfSomething(asValues, aoNilValues, fAllowNull); + + if sError is None and asValues not in aoNilValues and asValues: + for i, _ in enumerate(asValues): + sValue = asValues[i]; + + sThisErr = ''; + try: + iValue = int(sValue); + except: + sThisErr = 'Invalid integer value "%s".' % (sValue,); + else: + asValues[i] = iValue; + if iValue < iMin: + sThisErr = 'Value %d is too small (min %d)' % (iValue, iMin,); + elif iValue > iMax: + sThisErr = 'Value %d is too high (max %d)' % (iValue, iMax,); + else: + continue; + + if sError is None: + sError = sThisErr; + else: + sError += ' ' + sThisErr; + + return (asValues, sError); + + + + # + # Old validation helpers. + # + + @staticmethod + def _validateInt(dErrors, sName, sValue, iMin = 0, iMax = 0x7ffffffe, aoNilValues = tuple([-1, None, ''])): + """ Validates an integer field. """ + (sValue, sError) = ModelDataBase.validateInt(sValue, iMin, iMax, aoNilValues, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateIntNN(dErrors, sName, sValue, iMin = 0, iMax = 0x7ffffffe, aoNilValues = tuple([-1, None, ''])): + """ Validates an integer field, not null. """ + (sValue, sError) = ModelDataBase.validateInt(sValue, iMin, iMax, aoNilValues, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateLong(dErrors, sName, sValue, lMin = 0, lMax = None, aoNilValues = tuple([long(-1), None, ''])): + """ Validates an long integer field. """ + (sValue, sError) = ModelDataBase.validateLong(sValue, lMin, lMax, aoNilValues, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateLongNN(dErrors, sName, sValue, lMin = 0, lMax = None, aoNilValues = tuple([long(-1), None, ''])): + """ Validates an long integer field, not null. """ + (sValue, sError) = ModelDataBase.validateLong(sValue, lMin, lMax, aoNilValues, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateTs(dErrors, sName, sValue): + """ Validates a timestamp field. """ + (sValue, sError) = ModelDataBase.validateTs(sValue, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateTsNN(dErrors, sName, sValue): + """ Validates a timestamp field, not null. """ + (sValue, sError) = ModelDataBase.validateTs(sValue, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateIp(dErrors, sName, sValue): + """ Validates an IP address field. """ + (sValue, sError) = ModelDataBase.validateIp(sValue, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateIpNN(dErrors, sName, sValue): + """ Validates an IP address field, not null. """ + (sValue, sError) = ModelDataBase.validateIp(sValue, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateBool(dErrors, sName, sValue): + """ Validates a boolean field. """ + (sValue, sError) = ModelDataBase.validateBool(sValue, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateBoolNN(dErrors, sName, sValue): + """ Validates a boolean field, not null. """ + (sValue, sError) = ModelDataBase.validateBool(sValue, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateUuid(dErrors, sName, sValue): + """ Validates an UUID field. """ + (sValue, sError) = ModelDataBase.validateUuid(sValue, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateUuidNN(dErrors, sName, sValue): + """ Validates an UUID field, not null. """ + (sValue, sError) = ModelDataBase.validateUuid(sValue, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateWord(dErrors, sName, sValue, cchMin = 1, cchMax = 64, asValid = None): + """ Validates a word field. """ + (sValue, sError) = ModelDataBase.validateWord(sValue, cchMin, cchMax, asValid, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateWordNN(dErrors, sName, sValue, cchMin = 1, cchMax = 64, asValid = None): + """ Validates a boolean field, not null. """ + (sValue, sError) = ModelDataBase.validateWord(sValue, cchMin, cchMax, asValid, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateStr(dErrors, sName, sValue, cchMin = 0, cchMax = 4096): + """ Validates a string field. """ + (sValue, sError) = ModelDataBase.validateStr(sValue, cchMin, cchMax, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateStrNN(dErrors, sName, sValue, cchMin = 0, cchMax = 4096): + """ Validates a string field, not null. """ + (sValue, sError) = ModelDataBase.validateStr(sValue, cchMin, cchMax, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateEmail(dErrors, sName, sValue): + """ Validates a email field.""" + (sValue, sError) = ModelDataBase.validateEmail(sValue, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateEmailNN(dErrors, sName, sValue): + """ Validates a email field.""" + (sValue, sError) = ModelDataBase.validateEmail(sValue, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateListOfStr(dErrors, sName, asValues, asValidValues = None): + """ Validates a list of text items.""" + (sValue, sError) = ModelDataBase.validateListOfStr(asValues, asValidValues = asValidValues, fAllowNull = True); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + @staticmethod + def _validateListOfStrNN(dErrors, sName, asValues, asValidValues = None): + """ Validates a list of text items, not null and len >= 1.""" + (sValue, sError) = ModelDataBase.validateListOfStr(asValues, asValidValues = asValidValues, fAllowNull = False); + if sError is not None: + dErrors[sName] = sError; + return sValue; + + # + # Various helpers. + # + + @staticmethod + def formatSimpleNowAndPeriod(oDb, tsNow = None, sPeriodBack = None, + sTablePrefix = '', sExpCol = 'tsExpire', sEffCol = 'tsEffective'): + """ + Formats a set of tsNow and sPeriodBack arguments for a standard testmanager + table. + + If sPeriodBack is given, the query is effective for the period + (tsNow - sPeriodBack) thru (tsNow). + + If tsNow isn't given, it defaults to current time. + + Returns the final portion of a WHERE query (start with AND) and maybe an + ORDER BY and LIMIT bit if sPeriodBack is given. + """ + if tsNow is not None: + if sPeriodBack is not None: + sRet = oDb.formatBindArgs(' AND ' + sTablePrefix + sExpCol + ' > (%s::timestamp - %s::interval)\n' + ' AND tsEffective <= %s\n' + 'ORDER BY ' + sTablePrefix + sExpCol + ' DESC\n' + 'LIMIT 1\n' + , ( tsNow, sPeriodBack, tsNow)); + else: + sRet = oDb.formatBindArgs(' AND ' + sTablePrefix + sExpCol + ' > %s\n' + ' AND ' + sTablePrefix + sEffCol + ' <= %s\n' + , ( tsNow, tsNow, )); + else: + if sPeriodBack is not None: + sRet = oDb.formatBindArgs(' AND ' + sTablePrefix + sExpCol + ' > (CURRENT_TIMESTAMP - %s::interval)\n' + ' AND ' + sTablePrefix + sEffCol + ' <= CURRENT_TIMESTAMP\n' + 'ORDER BY ' + sTablePrefix + sExpCol + ' DESC\n' + 'LIMIT 1\n' + , ( sPeriodBack, )); + else: + sRet = ' AND ' + sTablePrefix + sExpCol + ' = \'infinity\'::timestamp\n'; + return sRet; + + @staticmethod + def formatSimpleNowAndPeriodQuery(oDb, sQuery, aBindArgs, tsNow = None, sPeriodBack = None, + sTablePrefix = '', sExpCol = 'tsExpire', sEffCol = 'tsEffective'): + """ + Formats a simple query for a standard testmanager table with optional + tsNow and sPeriodBack arguments. + + The sQuery and sBindArgs are passed along to oDb.formatBindArgs to form + the first part of the query. Must end with an open WHERE statement as + we'll be adding the time part starting with 'AND something...'. + + See formatSimpleNowAndPeriod for tsNow and sPeriodBack description. + + Returns the final portion of a WHERE query (start with AND) and maybe an + ORDER BY and LIMIT bit if sPeriodBack is given. + + """ + return oDb.formatBindArgs(sQuery, aBindArgs) \ + + ModelDataBase.formatSimpleNowAndPeriod(oDb, tsNow, sPeriodBack, sTablePrefix, sExpCol, sEffCol); + + + # + # JSON + # + + @staticmethod + def stringToJson(sString): + """ Converts a string to a JSON value string. """ + if not utils.isString(sString): + sString = utils.toUnicode(sString); + if not utils.isString(sString): + sString = str(sString); + return json.dumps(sString); + + @staticmethod + def dictToJson(dDict, dOptions = None): + """ Converts a dictionary to a JSON string. """ + sJson = u'{ '; + for i, oKey in enumerate(dDict): + if i > 0: + sJson += ', '; + sJson += '%s: %s' % (ModelDataBase.stringToJson(oKey), + ModelDataBase.genericToJson(dDict[oKey], dOptions)); + return sJson + ' }'; + + @staticmethod + def listToJson(aoList, dOptions = None): + """ Converts list of something to a JSON string. """ + sJson = u'[ '; + for i, oValue in enumerate(aoList): + if i > 0: + sJson += u', '; + sJson += ModelDataBase.genericToJson(oValue, dOptions); + return sJson + u' ]'; + + @staticmethod + def datetimeToJson(oDateTime): + """ Converts a datetime instance to a JSON string. """ + return '"%s"' % (oDateTime,); + + + @staticmethod + def genericToJson(oValue, dOptions = None): + """ Converts a generic object to a JSON string. """ + if isinstance(oValue, ModelDataBase): + return oValue.toJson(); + if isinstance(oValue, dict): + return ModelDataBase.dictToJson(oValue, dOptions); + if isinstance(oValue, (list, tuple, set, frozenset)): + return ModelDataBase.listToJson(oValue, dOptions); + if isinstance(oValue, datetime.datetime): + return ModelDataBase.datetimeToJson(oValue) + return json.dumps(oValue); + + def attribValueToJson(self, sAttr, oValue, dOptions = None): + """ + Converts the attribute value to JSON. + Returns JSON (string). + """ + _ = sAttr; + return self.genericToJson(oValue, dOptions); + + def toJson(self, dOptions = None): + """ + Converts the object to JSON. + Returns JSON (string). + """ + sJson = u'{ '; + for iAttr, sAttr in enumerate(self.getDataAttributes()): + oValue = getattr(self, sAttr); + if iAttr > 0: + sJson += ', '; + sJson += u'"%s": ' % (sAttr,); + sJson += self.attribValueToJson(sAttr, oValue, dOptions); + return sJson + u' }'; + + + # + # Sub-classes. + # + + class DispWrapper(object): + """Proxy object.""" + def __init__(self, oDisp, sAttrFmt): + self.oDisp = oDisp; + self.sAttrFmt = sAttrFmt; + def getStringParam(self, sName, asValidValues = None, sDefault = None, fAllowNull = False): + """See WuiDispatcherBase.getStringParam.""" + return self.oDisp.getStringParam(self.sAttrFmt % (sName,), asValidValues, sDefault, fAllowNull = fAllowNull); + def getListOfStrParams(self, sName, asDefaults = None): + """See WuiDispatcherBase.getListOfStrParams.""" + return self.oDisp.getListOfStrParams(self.sAttrFmt % (sName,), asDefaults); + def getListOfIntParams(self, sName, iMin = None, iMax = None, aiDefaults = None): + """See WuiDispatcherBase.getListOfIntParams.""" + return self.oDisp.getListOfIntParams(self.sAttrFmt % (sName,), iMin, iMax, aiDefaults); + + + + +# pylint: disable=no-member,missing-docstring,too-few-public-methods +class ModelDataBaseTestCase(unittest.TestCase): + """ + Base testcase for ModelDataBase decendants. + Derive from this and override setUp. + """ + + def setUp(self): + """ + Override this! Don't call super! + The subclasses are expected to set aoSamples to an array of instance + samples. The first entry must be a default object, the subsequent ones + are optional and their contents freely choosen. + """ + self.aoSamples = [ModelDataBase(),]; + + def testEquality(self): + for oSample in self.aoSamples: + self.assertEqual(oSample.isEqual(copy.copy(oSample)), True); + self.assertIsNotNone(oSample.isEqual(self.aoSamples[0])); + + def testNullConversion(self): + if not self.aoSamples[0].getDataAttributes(): + return; + for oSample in self.aoSamples: + oCopy = copy.copy(oSample); + self.assertEqual(oCopy.convertToParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oSample), False); + self.assertEqual(oCopy.convertFromParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oSample), True, '\ngot : %s\nexpected: %s' % (oCopy, oSample,)); + + oCopy = copy.copy(oSample); + self.assertEqual(oCopy.convertToParamNull(), oCopy); + oCopy2 = copy.copy(oCopy); + self.assertEqual(oCopy.convertToParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oCopy2), True); + self.assertEqual(oCopy.convertToParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oCopy2), True); + + oCopy = copy.copy(oSample); + self.assertEqual(oCopy.convertFromParamNull(), oCopy); + oCopy2 = copy.copy(oCopy); + self.assertEqual(oCopy.convertFromParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oCopy2), True); + self.assertEqual(oCopy.convertFromParamNull(), oCopy); + self.assertEqual(oCopy.isEqual(oCopy2), True); + + def testReinitToNull(self): + oFirst = copy.copy(self.aoSamples[0]); + self.assertEqual(oFirst.reinitToNull(), oFirst); + for oSample in self.aoSamples: + oCopy = copy.copy(oSample); + self.assertEqual(oCopy.reinitToNull(), oCopy); + self.assertEqual(oCopy.isEqual(oFirst), True); + + def testValidateAndConvert(self): + for oSample in self.aoSamples: + oCopy = copy.copy(oSample); + oCopy.convertToParamNull(); + dError1 = oCopy.validateAndConvert(None); + + oCopy2 = copy.copy(oCopy); + self.assertEqual(oCopy.validateAndConvert(None), dError1); + self.assertEqual(oCopy.isEqual(oCopy2), True); + + def testInitFromParams(self): + class DummyDisp(object): + def getStringParam(self, sName, asValidValues = None, sDefault = None, fAllowNull = False): + _ = sName; _ = asValidValues; _ = fAllowNull; + return sDefault; + def getListOfStrParams(self, sName, asDefaults = None): + _ = sName; + return asDefaults; + def getListOfIntParams(self, sName, iMin = None, iMax = None, aiDefaults = None): + _ = sName; _ = iMin; _ = iMax; + return aiDefaults; + + for oSample in self.aoSamples: + oCopy = copy.copy(oSample); + self.assertEqual(oCopy.initFromParams(DummyDisp(), fStrict = False), oCopy); + + def testToString(self): + for oSample in self.aoSamples: + self.assertIsNotNone(oSample.toString()); + + +class FilterCriterionValueAndDescription(object): + """ + A filter criterion value and its description. + """ + + def __init__(self, oValue, sDesc, cTimes = None, sHover = None, fIrrelevant = False): + self.oValue = oValue; ##< Typically the ID of something in the database. + self.sDesc = sDesc; ##< What to display. + self.cTimes = cTimes; ##< Number of times the value occurs in the result set. None if not given. + self.sHover = sHover; ##< Optional hover/title string. + self.fIrrelevant = fIrrelevant; ##< Irrelevant filter option, only present because it's selected + self.aoSubs = []; ##< References to FilterCriterion.oSub.aoPossible. + + +class FilterCriterion(object): + """ + A filter criterion. + """ + + ## @name The state. + ## @{ + ksState_NotSelected = 'not-selected'; + ksState_Selected = 'selected'; + ## @} + + ## @name The kind of filtering. + ## @{ + ## 'Element of' by default, 'not an element of' when fInverted is False. + ksKind_ElementOfOrNot = 'element-of-or-not'; + ## The criterion is a special one and cannot be inverted. + ksKind_Special = 'special'; + ## @} + + ## @name The value type. + ## @{ + ksType_UInt = 'uint'; ##< unsigned integer value. + ksType_UIntNil = 'uint-nil'; ##< unsigned integer value, with nil. + ksType_String = 'string'; ##< string value. + ksType_Ranges = 'ranges'; ##< List of (unsigned) integer ranges. + ## @} + + def __init__(self, sName, sVarNm = None, sType = ksType_UInt, # pylint: disable=too-many-arguments + sState = ksState_NotSelected, sKind = ksKind_ElementOfOrNot, + sTable = None, sColumn = None, asTables = None, oSub = None): + assert len(sVarNm) == 2; # required by wuimain.py for filtering. + self.sName = sName; + self.sState = sState; + self.sType = sType; + self.sKind = sKind; + self.sVarNm = sVarNm; + self.aoSelected = []; ##< User input from sVarNm. Single value, type according to sType. + self.sInvVarNm = 'i' + sVarNm if sKind == self.ksKind_ElementOfOrNot else None; + self.fInverted = False; ##< User input from sInvVarNm. Inverts the operation (-> not an element of). + self.aoPossible = []; ##< type: list[FilterCriterionValueAndDescription] + assert (sTable is None and asTables is None) or ((sTable is not None) != (asTables is not None)), \ + '%s %s' % (sTable, asTables); + self.asTables = [sTable,] if sTable is not None else asTables; + assert sColumn is None or len(self.asTables) == 1, '%s %s' % (self.asTables, sColumn); + self.sColumn = sColumn; ##< Normally only applicable if one table. + self.fExpanded = None; ##< Tristate (None, False, True) + self.oSub = oSub; ##< type: FilterCriterion + + +class ModelFilterBase(ModelBase): + """ + Base class for filters. + + Filters are used to narrow down data that is displayed in a list or + report. This class differs a little from ModelDataBase in that it is not + tied to a database table, but one or more database queries that are + typically rather complicated. + + The filter object has two roles: + + 1. It is used by a ModelLogicBase descendant to store the available + filtering options for data begin displayed. + + 2. It decodes and stores the filtering options submitted by the user so + a ModeLogicBase descendant can use it to construct WHERE statements. + + The ModelFilterBase class is related to the ModelDataBase class in that it + decodes user parameters and stores data, however it is not a descendant. + + Note! In order to reduce URL lengths, we use very very brief parameter + names for the filters. + """ + + def __init__(self): + ModelBase.__init__(self); + self.aCriteria = [] # type: list[FilterCriterion] + + def _initFromParamsWorker(self, oDisp, oCriterion): # (,FilterCriterion) + """ Worker for initFromParams. """ + if oCriterion.sType == FilterCriterion.ksType_UInt: + oCriterion.aoSelected = oDisp.getListOfIntParams(oCriterion.sVarNm, iMin = 0, aiDefaults = []); + elif oCriterion.sType == FilterCriterion.ksType_UIntNil: + oCriterion.aoSelected = oDisp.getListOfIntParams(oCriterion.sVarNm, iMin = -1, aiDefaults = []); + elif oCriterion.sType == FilterCriterion.ksType_String: + oCriterion.aoSelected = oDisp.getListOfStrParams(oCriterion.sVarNm, asDefaults = []); + if len(oCriterion.aoSelected) > 100: + raise TMExceptionBase('Variable %s has %u value, max allowed is 100!' + % (oCriterion.sVarNm, len(oCriterion.aoSelected))); + for sValue in oCriterion.aoSelected: + if len(sValue) > 64 \ + or '\'' in sValue \ + or sValue[-1] == '\\': + raise TMExceptionBase('Variable %s has an illegal value "%s"!' % (oCriterion.sVarNm, sValue)); + elif oCriterion.sType == FilterCriterion.ksType_Ranges: + def convertRangeNumber(sValue): + """ Helper """ + sValue = sValue.strip(); + if sValue and sValue not in ('inf', 'Inf', 'INf', 'INF', 'InF', 'iNf', 'iNF', 'inF',): + try: return int(sValue); + except: pass; + return None; + + for sRange in oDisp.getStringParam(oCriterion.sVarNm, sDefault = '').split(','): + sRange = sRange.strip(); + if sRange and sRange != '-' and any(ch.isdigit() for ch in sRange): + asValues = sRange.split('-'); + if len(asValues) == 1: + asValues = [asValues[0], asValues[0]]; + elif len(asValues) > 2: + asValues = [asValues[0], asValues[-1]]; + tTuple = (convertRangeNumber(asValues[0]), convertRangeNumber(asValues[1])); + if tTuple[0] is not None and tTuple[1] is not None and tTuple[0] > tTuple[1]: + tTuple = (tTuple[1], tTuple[0]); + oCriterion.aoSelected.append(tTuple); + else: + assert False; + if oCriterion.aoSelected: + oCriterion.sState = FilterCriterion.ksState_Selected; + else: + oCriterion.sState = FilterCriterion.ksState_NotSelected; + + if oCriterion.sKind == FilterCriterion.ksKind_ElementOfOrNot: + oCriterion.fInverted = oDisp.getBoolParam(oCriterion.sInvVarNm, fDefault = False); + + if oCriterion.oSub is not None: + self._initFromParamsWorker(oDisp, oCriterion.oSub); + return; + + def initFromParams(self, oDisp): # type: (WuiDispatcherBase) -> self + """ + Initialize the object from parameters. + + Returns self. Raises exception on invalid parameter value. + """ + + for oCriterion in self.aCriteria: + self._initFromParamsWorker(oDisp, oCriterion); + return self; + + def strainParameters(self, dParams, aAdditionalParams = None): + """ Filters just the parameters relevant to this filter, returning a copy. """ + + # Collect the parameter names. + dWanted = {}; + for oCrit in self.aCriteria: + dWanted[oCrit.sVarNm] = 1; + if oCrit.sInvVarNm: + dWanted[oCrit.sInvVarNm] = 1; + + # Add additional stuff. + if aAdditionalParams: + for sParam in aAdditionalParams: + dWanted[sParam] = 1; + + # To the straining. + dRet = {}; + for sKey in dParams: + if sKey in dWanted: + dRet[sKey] = dParams[sKey]; + return dRet; + + +class ModelLogicBase(ModelBase): # pylint: disable=too-few-public-methods + """ + Something all classes in the logic classes the logical model inherits from. + """ + + def __init__(self, oDb): + ModelBase.__init__(self); + + # + # Note! Do not create a connection here if None, we need to DB share + # connection with all other logic objects so we can perform half + # complex transactions involving several logic objects. + # + self._oDb = oDb; + + def getDbConnection(self): + """ + Gets the database connection. + This should only be used for instantiating other ModelLogicBase children. + """ + return self._oDb; + + def _dbRowsToModelDataList(self, oModelDataType, aaoRows = None): + """ + Helper for conerting a simple fetch into a list of ModelDataType python objects. + + If aaoRows is None, we'll fetchAll from the database ourselves. + + The oModelDataType must be a class derived from ModelDataBase and implement + the initFormDbRow method. + + Returns a list of oModelDataType instances. + """ + assert issubclass(oModelDataType, ModelDataBase); + aoRet = []; + if aaoRows is None: + aaoRows = self._oDb.fetchAll(); + for aoRow in aaoRows: + aoRet.append(oModelDataType().initFromDbRow(aoRow)); + return aoRet; + + + +class AttributeChangeEntry(object): # pylint: disable=too-few-public-methods + """ + Data class representing the changes made to one attribute. + """ + + def __init__(self, sAttr, oNewRaw, oOldRaw, sNewText, sOldText): + self.sAttr = sAttr; + self.oNewRaw = oNewRaw; + self.oOldRaw = oOldRaw; + self.sNewText = sNewText; + self.sOldText = sOldText; + +class AttributeChangeEntryPre(AttributeChangeEntry): # pylint: disable=too-few-public-methods + """ + AttributeChangeEntry for preformatted values. + """ + + def __init__(self, sAttr, oNewRaw, oOldRaw, sNewText, sOldText): + AttributeChangeEntry.__init__(self, sAttr, oNewRaw, oOldRaw, sNewText, sOldText); + +class ChangeLogEntry(object): # pylint: disable=too-few-public-methods + """ + A change log entry returned by the fetchChangeLog method typically + implemented by ModelLogicBase child classes. + """ + + def __init__(self, uidAuthor, sAuthor, tsEffective, tsExpire, oNewRaw, oOldRaw, aoChanges): + self.uidAuthor = uidAuthor; + self.sAuthor = sAuthor; + self.tsEffective = tsEffective; + self.tsExpire = tsExpire; + self.oNewRaw = oNewRaw; + self.oOldRaw = oOldRaw; # Note! NULL for the last entry. + self.aoChanges = aoChanges; + |