diff options
Diffstat (limited to 'src/VBox/ValidationKit/testmanager/core/testgroup.py')
-rwxr-xr-x | src/VBox/ValidationKit/testmanager/core/testgroup.py | 771 |
1 files changed, 771 insertions, 0 deletions
diff --git a/src/VBox/ValidationKit/testmanager/core/testgroup.py b/src/VBox/ValidationKit/testmanager/core/testgroup.py new file mode 100755 index 00000000..9a648b0d --- /dev/null +++ b/src/VBox/ValidationKit/testmanager/core/testgroup.py @@ -0,0 +1,771 @@ +# -*- coding: utf-8 -*- +# $Id: testgroup.py $ + +""" +Test Manager - Test groups management. +""" + +__copyright__ = \ +""" +Copyright (C) 2012-2023 Oracle and/or its affiliates. + +This file is part of VirtualBox base platform packages, as +available from https://www.virtualbox.org. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation, in version 3 of the +License. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, see <https://www.gnu.org/licenses>. + +The contents of this file may alternatively be used under the terms +of the Common Development and Distribution License Version 1.0 +(CDDL), a copy of it is provided in the "COPYING.CDDL" file included +in the VirtualBox distribution, in which case the provisions of the +CDDL are applicable instead of those of the GPL. + +You may elect to license modified versions of this file under the +terms and conditions of either the GPL or the CDDL or both. + +SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0 +""" +__version__ = "$Revision: 155244 $" + + +# Standard python imports. +import unittest; + +# Validation Kit imports. +from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, ModelLogicBase, TMRowInUse, \ + TMTooManyRows, TMInvalidData, TMRowNotFound, TMRowAlreadyExists; +from testmanager.core.testcase import TestCaseData, TestCaseDataEx; + + +class TestGroupMemberData(ModelDataBase): + """Representation of a test group member database row.""" + + ksParam_idTestGroup = 'TestGroupMember_idTestGroup'; + ksParam_idTestCase = 'TestGroupMember_idTestCase'; + ksParam_tsEffective = 'TestGroupMember_tsEffective'; + ksParam_tsExpire = 'TestGroupMember_tsExpire'; + ksParam_uidAuthor = 'TestGroupMember_uidAuthor'; + ksParam_iSchedPriority = 'TestGroupMember_iSchedPriority'; + ksParam_aidTestCaseArgs = 'TestGroupMember_aidTestCaseArgs'; + + kasAllowNullAttributes = ['idTestGroup', 'idTestCase', 'tsEffective', 'tsExpire', 'uidAuthor', 'aidTestCaseArgs' ]; + kiMin_iSchedPriority = 0; + kiMax_iSchedPriority = 31; + + kcDbColumns = 7; + + def __init__(self): + ModelDataBase.__init__(self) + + # + # Initialize with defaults. + # See the database for explanations of each of these fields. + # + self.idTestGroup = None; + self.idTestCase = None; + self.tsEffective = None; + self.tsExpire = None; + self.uidAuthor = None; + self.iSchedPriority = 16; + self.aidTestCaseArgs = None; + + def initFromDbRow(self, aoRow): + """ + Reinitialize from a SELECT * FROM TestCaseGroupMembers. + Return self. Raises exception if no row. + """ + if aoRow is None: + raise TMRowNotFound('Test group member not found.') + + self.idTestGroup = aoRow[0]; + self.idTestCase = aoRow[1]; + self.tsEffective = aoRow[2]; + self.tsExpire = aoRow[3]; + self.uidAuthor = aoRow[4]; + self.iSchedPriority = aoRow[5]; + self.aidTestCaseArgs = aoRow[6]; + return self + + + def getAttributeParamNullValues(self, sAttr): + # Arrays default to [] as NULL currently. That doesn't work for us. + if sAttr == 'aidTestCaseArgs': + aoNilValues = [None, '-1']; + else: + aoNilValues = ModelDataBase.getAttributeParamNullValues(self, sAttr); + return aoNilValues; + + def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb): + if sAttr != 'aidTestCaseArgs': + return ModelDataBase._validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb); + + # -1 is a special value, which when present make the whole thing NULL (None). + (aidVariations, sError) = self.validateListOfInts(oValue, aoNilValues = aoNilValues, fAllowNull = fAllowNull, + iMin = -1, iMax = 0x7ffffffe); + if sError is None: + if aidVariations is None: + pass; + elif -1 in aidVariations: + aidVariations = None; + elif 0 in aidVariations: + sError = 'Invalid test case varation ID #0.'; + else: + aidVariations = sorted(aidVariations); + return (aidVariations, sError); + + + +class TestGroupMemberDataEx(TestGroupMemberData): + """Extended representation of a test group member.""" + + def __init__(self): + """Extend parent class""" + TestGroupMemberData.__init__(self) + self.oTestCase = None; # TestCaseDataEx. + + def initFromDbRowEx(self, aoRow, oDb, tsNow = None): + """ + Reinitialize from a SELECT * FROM TestGroupMembers, TestCases row. + Will query the necessary additional data from oDb using tsNow. + + Returns self. Raises exception if no row or database error. + """ + TestGroupMemberData.initFromDbRow(self, aoRow); + self.oTestCase = TestCaseDataEx(); + self.oTestCase.initFromDbRowEx(aoRow[TestGroupMemberData.kcDbColumns:], oDb, tsNow); + return self; + + def initFromParams(self, oDisp, fStrict = True): + self.oTestCase = None; + return TestGroupMemberData.initFromParams(self, oDisp, fStrict); + + def getDataAttributes(self): + asAttributes = TestGroupMemberData.getDataAttributes(self); + asAttributes.remove('oTestCase'); + return asAttributes; + + def _validateAndConvertWorker(self, asAllowNullAttributes, oDb, enmValidateFor = ModelDataBase.ksValidateFor_Other): + dErrors = TestGroupMemberData._validateAndConvertWorker(self, asAllowNullAttributes, oDb, enmValidateFor); + if self.ksParam_idTestCase not in dErrors: + self.oTestCase = TestCaseDataEx() + try: + self.oTestCase.initFromDbWithId(oDb, self.idTestCase); + except Exception as oXcpt: + self.oTestCase = TestCaseDataEx() + dErrors[self.ksParam_idTestCase] = str(oXcpt); + return dErrors; + + +class TestGroupMemberData2(TestCaseData): + """Special representation of a Test Group Member item""" + + def __init__(self): + """Extend parent class""" + TestCaseData.__init__(self) + self.idTestGroup = None + self.aidTestCaseArgs = [] + + def initFromDbRowEx(self, aoRow): + """ + Reinitialize from this query: + + SELECT TestCases.*, + TestGroupMembers.idTestGroup, + TestGroupMembers.aidTestCaseArgs + FROM TestCases, TestGroupMembers + WHERE TestCases.idTestCase = TestGroupMembers.idTestCase + + Represents complete test group member (test case) info. + Returns object of type TestGroupMemberData2. Raises exception if no row. + """ + TestCaseData.initFromDbRow(self, aoRow); + self.idTestGroup = aoRow[-2] + self.aidTestCaseArgs = aoRow[-1] + return self; + + +class TestGroupData(ModelDataBase): + """ + Test group data. + """ + + ksIdAttr = 'idTestGroup'; + + ksParam_idTestGroup = 'TestGroup_idTestGroup' + ksParam_tsEffective = 'TestGroup_tsEffective' + ksParam_tsExpire = 'TestGroup_tsExpire' + ksParam_uidAuthor = 'TestGroup_uidAuthor' + ksParam_sName = 'TestGroup_sName' + ksParam_sDescription = 'TestGroup_sDescription' + ksParam_sComment = 'TestGroup_sComment' + + kasAllowNullAttributes = ['idTestGroup', 'tsEffective', 'tsExpire', 'uidAuthor', 'sDescription', 'sComment' ]; + + kcDbColumns = 7; + + def __init__(self): + ModelDataBase.__init__(self); + + # + # Initialize with defaults. + # See the database for explanations of each of these fields. + # + self.idTestGroup = None + self.tsEffective = None + self.tsExpire = None + self.uidAuthor = None + self.sName = None + self.sDescription = None + self.sComment = None + + def initFromDbRow(self, aoRow): + """ + Reinitialize from a SELECT * FROM TestGroups row. + Returns object of type TestGroupData. Raises exception if no row. + """ + if aoRow is None: + raise TMRowNotFound('Test group not found.') + + self.idTestGroup = aoRow[0] + self.tsEffective = aoRow[1] + self.tsExpire = aoRow[2] + self.uidAuthor = aoRow[3] + self.sName = aoRow[4] + self.sDescription = aoRow[5] + self.sComment = aoRow[6] + return self + + def initFromDbWithId(self, oDb, idTestGroup, tsNow = None, sPeriodBack = None): + """ + Initialize the object from the database. + """ + oDb.execute(self.formatSimpleNowAndPeriodQuery(oDb, + 'SELECT *\n' + 'FROM TestGroups\n' + 'WHERE idTestGroup = %s\n' + , ( idTestGroup,), tsNow, sPeriodBack)); + aoRow = oDb.fetchOne() + if aoRow is None: + raise TMRowNotFound('idTestGroup=%s not found (tsNow=%s sPeriodBack=%s)' % (idTestGroup, tsNow, sPeriodBack,)); + return self.initFromDbRow(aoRow); + + +class TestGroupDataEx(TestGroupData): + """ + Extended test group data. + """ + + ksParam_aoMembers = 'TestGroupDataEx_aoMembers'; + kasAltArrayNull = [ 'aoMembers', ]; + + ## Helper parameter containing the comma separated list with the IDs of + # potential members found in the parameters. + ksParam_aidTestCases = 'TestGroupDataEx_aidTestCases'; + + + def __init__(self): + TestGroupData.__init__(self); + self.aoMembers = []; # TestGroupMemberDataEx. + + def _initExtraMembersFromDb(self, oDb, tsNow = None, sPeriodBack = None): + """ + Worker shared by the initFromDb* methods. + Returns self. Raises exception if no row or database error. + """ + self.aoMembers = []; + _ = sPeriodBack; ## @todo sPeriodBack + + if tsNow is None: + oDb.execute('SELECT TestGroupMembers.*, TestCases.*\n' + 'FROM TestGroupMembers\n' + 'LEFT OUTER JOIN TestCases ON (\n' + ' TestGroupMembers.idTestCase = TestCases.idTestCase\n' + ' AND TestCases.tsExpire = \'infinity\'::TIMESTAMP)\n' + 'WHERE TestGroupMembers.idTestGroup = %s\n' + ' AND TestGroupMembers.tsExpire = \'infinity\'::TIMESTAMP\n' + 'ORDER BY TestCases.sName, TestCases.idTestCase\n' + , (self.idTestGroup,)); + else: + oDb.execute('SELECT TestGroupMembers.*, TestCases.*\n' + 'FROM TestGroupMembers\n' + 'LEFT OUTER JOIN TestCases ON (\n' + ' TestGroupMembers.idTestCase = TestCases.idTestCase\n' + ' AND TestCases.tsExpire > %s\n' + ' AND TestCases.tsEffective <= %s)\n' + 'WHERE TestGroupMembers.idTestGroup = %s\n' + ' AND TestGroupMembers.tsExpire > %s\n' + ' AND TestGroupMembers.tsEffective <= %s\n' + 'ORDER BY TestCases.sName, TestCases.idTestCase\n' + , (tsNow, tsNow, self.idTestGroup, tsNow, tsNow)); + + for aoRow in oDb.fetchAll(): + self.aoMembers.append(TestGroupMemberDataEx().initFromDbRowEx(aoRow, oDb, tsNow)); + return self; + + def initFromDbRowEx(self, aoRow, oDb, tsNow = None, sPeriodBack = None): + """ + Reinitialize from a SELECT * FROM TestGroups row. Will query the + necessary additional data from oDb using tsNow. + Returns self. Raises exception if no row or database error. + """ + TestGroupData.initFromDbRow(self, aoRow); + return self._initExtraMembersFromDb(oDb, tsNow, sPeriodBack); + + def initFromDbWithId(self, oDb, idTestGroup, tsNow = None, sPeriodBack = None): + """ + Initialize the object from the database. + """ + TestGroupData.initFromDbWithId(self, oDb, idTestGroup, tsNow, sPeriodBack); + return self._initExtraMembersFromDb(oDb, tsNow, sPeriodBack); + + + def getAttributeParamNullValues(self, sAttr): + if sAttr != 'aoMembers': + return TestGroupData.getAttributeParamNullValues(self, sAttr); + return ['', [], None]; + + def convertParamToAttribute(self, sAttr, sParam, oValue, oDisp, fStrict): + if sAttr != 'aoMembers': + return TestGroupData.convertParamToAttribute(self, sAttr, sParam, oValue, oDisp, fStrict); + + aoNewValue = []; + aidSelected = oDisp.getListOfIntParams(sParam, iMin = 1, iMax = 0x7ffffffe, aiDefaults = []) + sIds = oDisp.getStringParam(self.ksParam_aidTestCases, sDefault = ''); + for idTestCase in sIds.split(','): + try: idTestCase = int(idTestCase); + except: pass; + oDispWrapper = self.DispWrapper(oDisp, '%s[%s][%%s]' % (TestGroupDataEx.ksParam_aoMembers, idTestCase,)) + oMember = TestGroupMemberDataEx().initFromParams(oDispWrapper, fStrict = False); + if idTestCase in aidSelected: + aoNewValue.append(oMember); + return aoNewValue; + + def _validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb): + if sAttr != 'aoMembers': + return TestGroupData._validateAndConvertAttribute(self, sAttr, sParam, oValue, aoNilValues, fAllowNull, oDb); + + asErrors = []; + aoNewMembers = []; + for oOldMember in oValue: + oNewMember = TestGroupMemberDataEx().initFromOther(oOldMember); + aoNewMembers.append(oNewMember); + + dErrors = oNewMember.validateAndConvert(oDb, ModelDataBase.ksValidateFor_Other); + if dErrors: + asErrors.append(str(dErrors)); + + if not asErrors: + for i, _ in enumerate(aoNewMembers): + idTestCase = aoNewMembers[i]; + for j in range(i + 1, len(aoNewMembers)): + if aoNewMembers[j].idTestCase == idTestCase: + asErrors.append('Duplicate testcase #%d!' % (idTestCase, )); + break; + + return (aoNewMembers, None if not asErrors else '<br>\n'.join(asErrors)); + + +class TestGroupLogic(ModelLogicBase): + """ + Test case management logic. + """ + + def __init__(self, oDb): + ModelLogicBase.__init__(self, oDb) + self.dCache = None; + + # + # Standard methods. + # + + def fetchForListing(self, iStart, cMaxRows, tsNow, aiSortColumns = None): + """ + Fetches test groups. + + Returns an array (list) of TestGroupDataEx items, empty list if none. + Raises exception on error. + """ + _ = aiSortColumns; + if tsNow is None: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n' + 'ORDER BY sName ASC\n' + 'LIMIT %s OFFSET %s\n' + , (cMaxRows, iStart,)); + else: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire > %s\n' + ' AND tsEffective <= %s\n' + 'ORDER BY sName ASC\n' + 'LIMIT %s OFFSET %s\n' + , (tsNow, tsNow, cMaxRows, iStart,)); + + aoRet = []; + for aoRow in self._oDb.fetchAll(): + aoRet.append(TestGroupDataEx().initFromDbRowEx(aoRow, self._oDb, tsNow)); + return aoRet; + + def addEntry(self, oData, uidAuthor, fCommit = False): + """ + Adds a testgroup to the database. + """ + + # + # Validate inputs. + # + assert isinstance(oData, TestGroupDataEx); + dErrors = oData.validateAndConvert(self._oDb, oData.ksValidateFor_Add); + if dErrors: + raise TMInvalidData('addEntry invalid input: %s' % (dErrors,)); + self._assertUniq(oData, None); + + # + # Do the job. + # + self._oDb.execute('INSERT INTO TestGroups (uidAuthor, sName, sDescription, sComment)\n' + 'VALUES (%s, %s, %s, %s)\n' + 'RETURNING idTestGroup\n' + , ( uidAuthor, + oData.sName, + oData.sDescription, + oData.sComment )); + idTestGroup = self._oDb.fetchOne()[0]; + oData.idTestGroup = idTestGroup; + + for oMember in oData.aoMembers: + oMember.idTestGroup = idTestGroup; + self._insertTestGroupMember(uidAuthor, oMember) + + self._oDb.maybeCommit(fCommit); + return True; + + def editEntry(self, oData, uidAuthor, fCommit = False): + """ + Modifies a test group. + """ + + # + # Validate inputs and read in the old(/current) data. + # + assert isinstance(oData, TestGroupDataEx); + dErrors = oData.validateAndConvert(self._oDb, oData.ksValidateFor_Edit); + if dErrors: + raise TMInvalidData('editEntry invalid input: %s' % (dErrors,)); + self._assertUniq(oData, oData.idTestGroup); + + oOldData = TestGroupDataEx().initFromDbWithId(self._oDb, oData.idTestGroup); + + # + # Update the data that needs updating. + # + + if not oData.isEqualEx(oOldData, [ 'aoMembers', 'tsEffective', 'tsExpire', 'uidAuthor', ]): + self._historizeTestGroup(oData.idTestGroup); + self._oDb.execute('INSERT INTO TestGroups\n' + ' (uidAuthor, idTestGroup, sName, sDescription, sComment)\n' + 'VALUES (%s, %s, %s, %s, %s)\n' + , ( uidAuthor, + oData.idTestGroup, + oData.sName, + oData.sDescription, + oData.sComment )); + + # Create a lookup dictionary for old entries. + dOld = {}; + for oOld in oOldData.aoMembers: + dOld[oOld.idTestCase] = oOld; + assert len(dOld) == len(oOldData.aoMembers); + + # Add new members, updated existing ones. + dNew = {}; + for oNewMember in oData.aoMembers: + oNewMember.idTestGroup = oData.idTestGroup; + if oNewMember.idTestCase in dNew: + raise TMRowAlreadyExists('Duplicate test group member: idTestCase=%d (%s / %s)' + % (oNewMember.idTestCase, oNewMember, dNew[oNewMember.idTestCase],)); + dNew[oNewMember.idTestCase] = oNewMember; + + oOldMember = dOld.get(oNewMember.idTestCase, None); + if oOldMember is not None: + if oNewMember.isEqualEx(oOldMember, [ 'uidAuthor', 'tsEffective', 'tsExpire' ]): + continue; # Skip, nothing changed. + self._historizeTestGroupMember(oData.idTestGroup, oNewMember.idTestCase); + self._insertTestGroupMember(uidAuthor, oNewMember); + + # Expire members that have been removed. + sQuery = self._oDb.formatBindArgs('UPDATE TestGroupMembers\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , ( oData.idTestGroup, )); + if dNew: + sQuery += ' AND idTestCase NOT IN (%s)' % (', '.join([str(iKey) for iKey in dNew]),); + self._oDb.execute(sQuery); + + self._oDb.maybeCommit(fCommit); + return True; + + def removeEntry(self, uidAuthor, idTestGroup, fCascade = False, fCommit = False): + """ + Deletes a test group. + """ + _ = uidAuthor; ## @todo record uidAuthor. + + # + # Cascade. + # + if fCascade is not True: + self._oDb.execute('SELECT SchedGroups.idSchedGroup, SchedGroups.sName\n' + 'FROM SchedGroupMembers, SchedGroups\n' + 'WHERE SchedGroupMembers.idTestGroup = %s\n' + ' AND SchedGroupMembers.tsExpire = \'infinity\'::TIMESTAMP\n' + ' AND SchedGroups.idSchedGroup = SchedGroupMembers.idSchedGroup\n' + ' AND SchedGroups.tsExpire = \'infinity\'::TIMESTAMP\n' + , ( idTestGroup, )); + aoGroups = self._oDb.fetchAll(); + if aoGroups: + asGroups = ['%s (#%d)' % (sName, idSchedGroup) for idSchedGroup, sName in aoGroups]; + raise TMRowInUse('Test group #%d is member of one or more scheduling groups: %s' + % (idTestGroup, ', '.join(asGroups),)); + else: + self._oDb.execute('UPDATE SchedGroupMembers\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , ( idTestGroup, )); + + # + # Remove the group. + # + self._oDb.execute('UPDATE TestGroupMembers\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , (idTestGroup,)) + self._oDb.execute('UPDATE TestGroups\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , (idTestGroup,)) + + self._oDb.maybeCommit(fCommit) + return True; + + def cachedLookup(self, idTestGroup): + """ + Looks up the most recent TestGroupDataEx object for idTestGroup + via an object cache. + + Returns a shared TestGroupDataEx object. None if not found. + Raises exception on DB error. + """ + if self.dCache is None: + self.dCache = self._oDb.getCache('TestGroupDataEx'); + oEntry = self.dCache.get(idTestGroup, None); + if oEntry is None: + fNeedTsNow = False; + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , (idTestGroup, )); + if self._oDb.getRowCount() == 0: + # Maybe it was deleted, try get the last entry. + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE idTestGroup = %s\n' + 'ORDER BY tsExpire DESC\n' + 'LIMIT 1\n' + , (idTestGroup, )); + fNeedTsNow = True; + elif self._oDb.getRowCount() > 1: + raise self._oDb.integrityException('%s infinity rows for %s' % (self._oDb.getRowCount(), idTestGroup)); + + if self._oDb.getRowCount() == 1: + aaoRow = self._oDb.fetchOne(); + oEntry = TestGroupDataEx(); + tsNow = oEntry.initFromDbRow(aaoRow).tsEffective if fNeedTsNow else None; + oEntry.initFromDbRowEx(aaoRow, self._oDb, tsNow); + self.dCache[idTestGroup] = oEntry; + return oEntry; + + + # + # Other methods. + # + + def fetchOrderedByName(self, tsNow = None): + """ + Return list of objects of type TestGroupData ordered by name. + May raise exception on database error. + """ + if tsNow is None: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n' + 'ORDER BY sName ASC\n'); + else: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire > %s\n' + ' AND tsEffective <= %s\n' + 'ORDER BY sName ASC\n' + , (tsNow, tsNow,)); + aoRet = [] + for _ in range(self._oDb.getRowCount()): + aoRet.append(TestGroupData().initFromDbRow(self._oDb.fetchOne())); + return aoRet; + + def getMembers(self, idTestGroup): + """ + Fetches all test case records from DB which are + belong to current Test Group. + Returns list of objects of type TestGroupMemberData2 (!). + """ + self._oDb.execute('SELECT TestCases.*,\n' + ' TestGroupMembers.idTestGroup,\n' + ' TestGroupMembers.aidTestCaseArgs\n' + 'FROM TestCases, TestGroupMembers\n' + 'WHERE TestCases.tsExpire = \'infinity\'::TIMESTAMP\n' + ' AND TestGroupMembers.tsExpire = \'infinity\'::TIMESTAMP\n' + ' AND TestGroupMembers.idTestCase = TestCases.idTestCase\n' + ' AND TestGroupMembers.idTestGroup = %s\n' + 'ORDER BY TestCases.idTestCase ASC;', + (idTestGroup,)) + + aaoRows = self._oDb.fetchAll() + aoRet = [] + for aoRow in aaoRows: + aoRet.append(TestGroupMemberData2().initFromDbRowEx(aoRow)) + + return aoRet + + def getAll(self, tsNow=None): + """Return list of objects of type TestGroupData""" + + if tsNow is None: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire = \'infinity\'::TIMESTAMP\n' + 'ORDER BY idTestGroup ASC;') + else: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire > %s\n' + ' AND tsEffective <= %s\n' + 'ORDER BY idTestGroup ASC;', + (tsNow, tsNow)) + + aaoRows = self._oDb.fetchAll() + aoRet = [] + for aoRow in aaoRows: + aoRet.append(TestGroupData().initFromDbRow(aoRow)) + + return aoRet + + def getById(self, idTestGroup, tsNow=None): + """Get Test Group data by its ID""" + + if tsNow is None: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire = \'infinity\'::timestamp\n' + ' AND idTestGroup = %s\n' + 'ORDER BY idTestGroup ASC;' + , (idTestGroup,)) + else: + self._oDb.execute('SELECT *\n' + 'FROM TestGroups\n' + 'WHERE tsExpire > %s\n' + ' AND tsEffective <= %s\n' + ' AND idTestGroup = %s\n' + 'ORDER BY idTestGroup ASC;' + , (tsNow, tsNow, idTestGroup)) + + aRows = self._oDb.fetchAll() + if len(aRows) not in (0, 1): + raise TMTooManyRows('Found more than one test groups with the same credentials. Database structure is corrupted.') + try: + return TestGroupData().initFromDbRow(aRows[0]) + except IndexError: + return None + + # + # Helpers. + # + + def _assertUniq(self, oData, idTestGroupIgnore): + """ Checks that the test group name is unique, raises exception if it isn't. """ + self._oDb.execute('SELECT idTestGroup\n' + 'FROM TestGroups\n' + 'WHERE sName = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + + ('' if idTestGroupIgnore is None else ' AND idTestGroup <> %d\n' % (idTestGroupIgnore,)) + , ( oData.sName, )) + if self._oDb.getRowCount() > 0: + raise TMRowAlreadyExists('A Test group with name "%s" already exist.' % (oData.sName,)); + return True; + + def _historizeTestGroup(self, idTestGroup): + """ Historize Test Group record. """ + self._oDb.execute('UPDATE TestGroups\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND tsExpire = \'infinity\'::TIMESTAMP\n' + , ( idTestGroup, )); + return True; + + def _historizeTestGroupMember(self, idTestGroup, idTestCase): + """ Historize Test Group Member record. """ + self._oDb.execute('UPDATE TestGroupMembers\n' + 'SET tsExpire = CURRENT_TIMESTAMP\n' + 'WHERE idTestGroup = %s\n' + ' AND idTestCase = %s\n' + ' AND tsExpire = \'infinity\'::timestamp\n' + , (idTestGroup, idTestCase,)); + return True; + + def _insertTestGroupMember(self, uidAuthor, oMember): + """ Inserts a test group member. """ + self._oDb.execute('INSERT INTO TestGroupMembers\n' + ' (uidAuthor, idTestGroup, idTestCase, iSchedPriority, aidTestCaseArgs)\n' + 'VALUES (%s, %s, %s, %s, %s)\n' + , ( uidAuthor, + oMember.idTestGroup, + oMember.idTestCase, + oMember.iSchedPriority, + oMember.aidTestCaseArgs, )); + return True; + + + +# +# Unit testing. +# + +# pylint: disable=missing-docstring +class TestGroupMemberDataTestCase(ModelDataBaseTestCase): + def setUp(self): + self.aoSamples = [TestGroupMemberData(),]; + +class TestGroupDataTestCase(ModelDataBaseTestCase): + def setUp(self): + self.aoSamples = [TestGroupData(),]; + +if __name__ == '__main__': + unittest.main(); + # not reached. + |