From f215e02bf85f68d3a6106c2a1f4f7f063f819064 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 11 Apr 2024 10:17:27 +0200 Subject: Adding upstream version 7.0.14-dfsg. Signed-off-by: Daniel Baumann --- src/VBox/ValidationKit/tests/usb/tdUsb1.py | 590 +++++++++++++++++++++++++++++ 1 file changed, 590 insertions(+) create mode 100755 src/VBox/ValidationKit/tests/usb/tdUsb1.py (limited to 'src/VBox/ValidationKit/tests/usb/tdUsb1.py') diff --git a/src/VBox/ValidationKit/tests/usb/tdUsb1.py b/src/VBox/ValidationKit/tests/usb/tdUsb1.py new file mode 100755 index 00000000..9dd20ebf --- /dev/null +++ b/src/VBox/ValidationKit/tests/usb/tdUsb1.py @@ -0,0 +1,590 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# $Id: tdUsb1.py $ + +""" +VirtualBox Validation Kit - USB testcase and benchmark. +""" + +__copyright__ = \ +""" +Copyright (C) 2014-2023 Oracle and/or its affiliates. + +This file is part of VirtualBox base platform packages, as +available from https://www.virtualbox.org. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation, in version 3 of the +License. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, see . + +The contents of this file may alternatively be used under the terms +of the Common Development and Distribution License Version 1.0 +(CDDL), a copy of it is provided in the "COPYING.CDDL" file included +in the VirtualBox distribution, in which case the provisions of the +CDDL are applicable instead of those of the GPL. + +You may elect to license modified versions of this file under the +terms and conditions of either the GPL or the CDDL or both. + +SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0 +""" +__version__ = "$Revision: 155244 $" + + +# Standard Python imports. +import os; +import sys; +import socket; + +# Only the main script needs to modify the path. +try: __file__ +except: __file__ = sys.argv[0]; +g_ksValidationKitDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))); +sys.path.append(g_ksValidationKitDir); + +# Validation Kit imports. +from testdriver import reporter; +from testdriver import base; +from testdriver import vbox; +from testdriver import vboxcon; + +# USB gadget control import +import usbgadget; + +# Python 3 hacks: +if sys.version_info[0] >= 3: + xrange = range; # pylint: disable=redefined-builtin,invalid-name + + +class tdUsbBenchmark(vbox.TestDriver): # pylint: disable=too-many-instance-attributes + """ + USB benchmark. + """ + + # The available test devices + # + # The first key is the hostname of the host the test is running on. + # It contains a new dictionary with the attached gadgets based on the + # USB speed we want to test (Low, Full, High, Super). + # The parameters consist of the hostname of the gadget in the network + # and the hardware type. + kdGadgetParams = { + 'adaris': { + 'Low': ('usbtest.de.oracle.com', None), + 'Full': ('usbtest.de.oracle.com', None), + 'High': ('usbtest.de.oracle.com', None), + 'Super': ('usbtest.de.oracle.com', None) + }, + }; + + # Mappings of USB controllers to supported USB device speeds. + kdUsbSpeedMappings = { + 'OHCI': ['Low', 'Full'], + 'EHCI': ['High'], + 'XHCI': ['Low', 'Full', 'High', 'Super'] + }; + + # Tests currently disabled because they fail, need investigation. + kdUsbTestsDisabled = { + 'Low': [24], + 'Full': [24], + 'High': [24], + 'Super': [24] + }; + + def __init__(self): + vbox.TestDriver.__init__(self); + self.asRsrcs = None; + self.asTestVMsDef = ['tst-arch']; + self.asTestVMs = self.asTestVMsDef; + self.asSkipVMs = []; + self.asVirtModesDef = ['hwvirt', 'hwvirt-np', 'raw']; + self.asVirtModes = self.asVirtModesDef; + self.acCpusDef = [1, 2,]; + self.acCpus = self.acCpusDef; + self.asUsbCtrlsDef = ['OHCI', 'EHCI', 'XHCI']; + self.asUsbCtrls = self.asUsbCtrlsDef; + self.asUsbSpeedDef = ['Low', 'Full', 'High', 'Super']; + self.asUsbSpeed = self.asUsbSpeedDef; + self.asUsbTestsDef = ['Compliance', 'Reattach']; + self.asUsbTests = self.asUsbTestsDef; + self.cUsbReattachCyclesDef = 100; + self.cUsbReattachCycles = self.cUsbReattachCyclesDef; + self.sHostname = socket.gethostname().lower(); + self.sGadgetHostnameDef = 'usbtest.de.oracle.com'; + self.uGadgetPortDef = None; + self.sUsbCapturePathDef = self.sScratchPath; + self.sUsbCapturePath = self.sUsbCapturePathDef; + self.fUsbCapture = False; + + # + # Overridden methods. + # + def showUsage(self): + rc = vbox.TestDriver.showUsage(self); + reporter.log(''); + reporter.log('tdUsb1 Options:'); + reporter.log(' --virt-modes '); + reporter.log(' Test the specified VMs in the given order. Use this to change'); + reporter.log(' the execution order or limit the choice of VMs'); + reporter.log(' Default: %s (all)' % (':'.join(self.asTestVMsDef))); + reporter.log(' --skip-vms '); + reporter.log(' Skip the specified VMs when testing.'); + reporter.log(' --usb-ctrls '); + reporter.log(' Default: %s' % (self.cUsbReattachCyclesDef)); + reporter.log(' --hostname: '); + reporter.log(' Default: %s' % (self.sHostname)); + reporter.log(' --default-gadget-host '); + reporter.log(' Default: %s' % (self.sGadgetHostnameDef)); + reporter.log(' --default-gadget-port '); + reporter.log(' Default: %s' % (6042)); + reporter.log(' --usb-capture-path '); + reporter.log(' Default: %s' % (self.sUsbCapturePathDef)); + reporter.log(' --usb-capture'); + reporter.log(' Whether to capture the USB traffic for each test'); + return rc; + + def parseOption(self, asArgs, iArg): # pylint: disable=too-many-branches,too-many-statements + if asArgs[iArg] == '--virt-modes': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--virt-modes" takes a colon separated list of modes'); + self.asVirtModes = asArgs[iArg].split(':'); + for s in self.asVirtModes: + if s not in self.asVirtModesDef: + raise base.InvalidOption('The "--virt-modes" value "%s" is not valid; valid values are: %s' \ + % (s, ' '.join(self.asVirtModesDef))); + elif asArgs[iArg] == '--cpu-counts': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--cpu-counts" takes a colon separated list of cpu counts'); + self.acCpus = []; + for s in asArgs[iArg].split(':'): + try: c = int(s); + except: raise base.InvalidOption('The "--cpu-counts" value "%s" is not an integer' % (s,)); + if c <= 0: raise base.InvalidOption('The "--cpu-counts" value "%s" is zero or negative' % (s,)); + self.acCpus.append(c); + elif asArgs[iArg] == '--test-vms': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--test-vms" takes colon separated list'); + self.asTestVMs = asArgs[iArg].split(':'); + for s in self.asTestVMs: + if s not in self.asTestVMsDef: + raise base.InvalidOption('The "--test-vms" value "%s" is not valid; valid values are: %s' \ + % (s, ' '.join(self.asTestVMsDef))); + elif asArgs[iArg] == '--skip-vms': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--skip-vms" takes colon separated list'); + self.asSkipVMs = asArgs[iArg].split(':'); + for s in self.asSkipVMs: + if s not in self.asTestVMsDef: + reporter.log('warning: The "--test-vms" value "%s" does not specify any of our test VMs.' % (s)); + elif asArgs[iArg] == '--usb-ctrls': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-ctrls" takes a colon separated list of USB controllers'); + self.asUsbCtrls = asArgs[iArg].split(':'); + for s in self.asUsbCtrls: + if s not in self.asUsbCtrlsDef: + reporter.log('warning: The "--usb-ctrls" value "%s" is not a valid USB controller.' % (s)); + elif asArgs[iArg] == '--usb-speed': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-speed" takes a colon separated list of USB speeds'); + self.asUsbSpeed = asArgs[iArg].split(':'); + for s in self.asUsbSpeed: + if s not in self.asUsbSpeedDef: + reporter.log('warning: The "--usb-speed" value "%s" is not a valid USB speed.' % (s)); + elif asArgs[iArg] == '--usb-tests': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-tests" takes a colon separated list of USB tests'); + self.asUsbTests = asArgs[iArg].split(':'); + for s in self.asUsbTests: + if s not in self.asUsbTestsDef: + reporter.log('warning: The "--usb-tests" value "%s" is not a valid USB test.' % (s)); + elif asArgs[iArg] == '--usb-reattach-cycles': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-reattach-cycles" takes cycle count'); + try: self.cUsbReattachCycles = int(asArgs[iArg]); + except: raise base.InvalidOption('The "--usb-reattach-cycles" value "%s" is not an integer' \ + % (asArgs[iArg],)); + if self.cUsbReattachCycles <= 0: + raise base.InvalidOption('The "--usb-reattach-cycles" value "%s" is zero or negative.' \ + % (self.cUsbReattachCycles,)); + elif asArgs[iArg] == '--hostname': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--hostname" takes a hostname'); + self.sHostname = asArgs[iArg]; + elif asArgs[iArg] == '--default-gadget-host': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--default-gadget-host" takes a hostname'); + self.sGadgetHostnameDef = asArgs[iArg]; + elif asArgs[iArg] == '--default-gadget-port': + iArg += 1; + if iArg >= len(asArgs): raise base.InvalidOption('The "--default-gadget-port" takes port number'); + try: self.uGadgetPortDef = int(asArgs[iArg]); + except: raise base.InvalidOption('The "--default-gadget-port" value "%s" is not an integer' \ + % (asArgs[iArg],)); + if self.uGadgetPortDef <= 0: + raise base.InvalidOption('The "--default-gadget-port" value "%s" is zero or negative.' \ + % (self.uGadgetPortDef,)); + elif asArgs[iArg] == '--usb-capture-path': + if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-capture-path" takes a path argument'); + self.sUsbCapturePath = asArgs[iArg]; + elif asArgs[iArg] == '--usb-capture': + self.fUsbCapture = True; + else: + return vbox.TestDriver.parseOption(self, asArgs, iArg); + return iArg + 1; + + def completeOptions(self): + # Remove skipped VMs from the test list. + for sVM in self.asSkipVMs: + try: self.asTestVMs.remove(sVM); + except: pass; + + return vbox.TestDriver.completeOptions(self); + + def getResourceSet(self): + # Construct the resource list the first time it's queried. + if self.asRsrcs is None: + self.asRsrcs = []; + + if 'tst-arch' in self.asTestVMs: + self.asRsrcs.append('4.2/usb/tst-arch.vdi'); + + return self.asRsrcs; + + def actionConfig(self): + + # Some stupid trickery to guess the location of the iso. ## fixme - testsuite unzip ++ + sVBoxValidationKit_iso = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxValidationKit.iso')); + if not os.path.isfile(sVBoxValidationKit_iso): + sVBoxValidationKit_iso = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxTestSuite.iso')); + if not os.path.isfile(sVBoxValidationKit_iso): + sVBoxValidationKit_iso = '/mnt/ramdisk/vbox/svn/trunk/validationkit/VBoxValidationKit.iso'; + if not os.path.isfile(sVBoxValidationKit_iso): + sVBoxValidationKit_iso = '/mnt/ramdisk/vbox/svn/trunk/testsuite/VBoxTestSuite.iso'; + if not os.path.isfile(sVBoxValidationKit_iso): + sCur = os.getcwd(); + for i in range(0, 10): + sVBoxValidationKit_iso = os.path.join(sCur, 'validationkit/VBoxValidationKit.iso'); + if os.path.isfile(sVBoxValidationKit_iso): + break; + sVBoxValidationKit_iso = os.path.join(sCur, 'testsuite/VBoxTestSuite.iso'); + if os.path.isfile(sVBoxValidationKit_iso): + break; + sCur = os.path.abspath(os.path.join(sCur, '..')); + if i is None: pass; # shut up pychecker/pylint. + if not os.path.isfile(sVBoxValidationKit_iso): + sVBoxValidationKit_iso = '/home/bird/validationkit/VBoxValidationKit.iso'; + if not os.path.isfile(sVBoxValidationKit_iso): + sVBoxValidationKit_iso = '/home/bird/testsuite/VBoxTestSuite.iso'; + + # Make sure vboxapi has been imported so we can use the constants. + if not self.importVBoxApi(): + return False; + + # + # Configure the VMs we're going to use. + # + + # Linux VMs + if 'tst-arch' in self.asTestVMs: + oVM = self.createTestVM('tst-arch', 1, '4.2/usb/tst-arch.vdi', sKind = 'ArchLinux_64', fIoApic = True, \ + eNic0AttachType = vboxcon.NetworkAttachmentType_NAT, \ + sDvdImage = sVBoxValidationKit_iso); + if oVM is None: + return False; + + return True; + + def actionExecute(self): + """ + Execute the testcase. + """ + fRc = self.testUsb(); + return fRc; + + def getGadgetParams(self, sHostname, sSpeed): + """ + Returns the gadget hostname and port from the + given hostname the test is running on and device speed we want to test. + """ + kdGadgetsConfigured = self.kdGadgetParams.get(sHostname); + if kdGadgetsConfigured is not None: + return kdGadgetsConfigured.get(sSpeed); + + return (self.sGadgetHostnameDef, self.uGadgetPortDef); + + def getCaptureFilePath(self, sUsbCtrl, sSpeed): + """ + Returns capture filename from the given data. + """ + + return '%s%s%s-%s.pcap' % (self.sUsbCapturePath, os.sep, sUsbCtrl, sSpeed); + + def attachUsbDeviceToVm(self, oSession, sVendorId, sProductId, iBusId, + sCaptureFile = None): + """ + Attaches the given USB device to the VM either via a filter + or directly if capturing the USB traffic is enabled. + + Returns True on success, False on failure. + """ + fRc = False; + if sCaptureFile is None: + fRc = oSession.addUsbDeviceFilter('Compliance device', sVendorId = sVendorId, sProductId = sProductId, \ + sPort = format(iBusId, 'x')); + else: + # Search for the correct device in the USB device list waiting for some time + # to let it appear. + iVendorId = int(sVendorId, 16); + iProductId = int(sProductId, 16); + + # Try a few times to give VBoxSVC a chance to detect the new device. + for _ in xrange(5): + fFound = False; + aoUsbDevs = self.oVBoxMgr.getArray(self.oVBox.host, 'USBDevices'); + for oUsbDev in aoUsbDevs: + if oUsbDev.vendorId == iVendorId \ + and oUsbDev.productId == iProductId \ + and oUsbDev.port == iBusId: + fFound = True; + fRc = oSession.attachUsbDevice(oUsbDev.id, sCaptureFile); + break; + + if fFound: + break; + + # Wait a moment until the next try. + self.sleep(1); + + if fRc: + # Wait a moment to let the USB device appear + self.sleep(9); + + return fRc; + + # + # Test execution helpers. + # + def testUsbCompliance(self, oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile = None): + """ + Test VirtualBoxs USB stack in a VM. + """ + # Get configured USB test devices from hostname we are running on + sGadgetHost, uGadgetPort = self.getGadgetParams(self.sHostname, sSpeed); + + oUsbGadget = usbgadget.UsbGadget(); + reporter.log('Connecting to UTS: ' + sGadgetHost); + fRc = oUsbGadget.connectTo(30 * 1000, sGadgetHost, uPort = uGadgetPort, fTryConnect = True); + if fRc is True: + reporter.log('Connect succeeded'); + self.oVBox.host.addUSBDeviceSource('USBIP', sGadgetHost, sGadgetHost + (':%s' % oUsbGadget.getUsbIpPort()), [], []); + + fSuperSpeed = False; + if sSpeed == 'Super': + fSuperSpeed = True; + + # Create test device gadget and a filter to attach the device automatically. + fRc = oUsbGadget.impersonate(usbgadget.g_ksGadgetImpersonationTest, fSuperSpeed); + if fRc is True: + iBusId, _ = oUsbGadget.getGadgetBusAndDevId(); + fRc = self.attachUsbDeviceToVm(oSession, '0525', 'a4a0', iBusId, sCaptureFile); + if fRc is True: + tupCmdLine = ('UsbTest', ); + # Exclude a few tests which hang and cause a timeout, need investigation. + lstTestsExclude = self.kdUsbTestsDisabled.get(sSpeed); + for iTestExclude in lstTestsExclude: + tupCmdLine = tupCmdLine + ('--exclude', str(iTestExclude)); + + fRc = self.txsRunTest(oTxsSession, 'UsbTest', 3600 * 1000, \ + '${CDROM}/${OS/ARCH}/UsbTest${EXESUFF}', tupCmdLine); + if not fRc: + reporter.testFailure('Running USB test utility failed'); + else: + reporter.testFailure('Failed to attach USB device to VM'); + oUsbGadget.disconnectFrom(); + else: + reporter.testFailure('Failed to impersonate test device'); + + self.oVBox.host.removeUSBDeviceSource(sGadgetHost); + else: + reporter.log('warning: Failed to connect to USB gadget'); + fRc = None + + _ = sUsbCtrl; + return fRc; + + def testUsbReattach(self, oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile = None): # pylint: disable=unused-argument + """ + Tests that rapid connect/disconnect cycles work. + """ + # Get configured USB test devices from hostname we are running on + sGadgetHost, uGadgetPort = self.getGadgetParams(self.sHostname, sSpeed); + + oUsbGadget = usbgadget.UsbGadget(); + reporter.log('Connecting to UTS: ' + sGadgetHost); + fRc = oUsbGadget.connectTo(30 * 1000, sGadgetHost, uPort = uGadgetPort, fTryConnect = True); + if fRc is True: + self.oVBox.host.addUSBDeviceSource('USBIP', sGadgetHost, sGadgetHost + (':%s' % oUsbGadget.getUsbIpPort()), [], []); + + fSuperSpeed = False; + if sSpeed == 'Super': + fSuperSpeed = True; + + # Create test device gadget and a filter to attach the device automatically. + fRc = oUsbGadget.impersonate(usbgadget.g_ksGadgetImpersonationTest, fSuperSpeed); + if fRc is True: + iBusId, _ = oUsbGadget.getGadgetBusAndDevId(); + fRc = self.attachUsbDeviceToVm(oSession, '0525', 'a4a0', iBusId, sCaptureFile); + if fRc is True: + + # Wait a moment to let the USB device appear + self.sleep(3); + + # Do a rapid disconnect reconnect cycle. Wait a second before disconnecting + # again or it will happen so fast that the VM can't attach the new device. + # @todo: Get rid of the constant wait and use an event to get notified when + # the device was attached. + for iCycle in xrange (0, self.cUsbReattachCycles): + fRc = oUsbGadget.disconnectUsb(); + fRc = fRc and oUsbGadget.connectUsb(); + if not fRc: + reporter.testFailure('Reattach cycle %s failed on the gadget device' % (iCycle)); + break; + self.sleep(1); + + else: + reporter.testFailure('Failed to create USB device filter'); + + oUsbGadget.disconnectFrom(); + else: + reporter.testFailure('Failed to impersonate test device'); + else: + reporter.log('warning: Failed to connect to USB gadget'); + fRc = None + + return fRc; + + def testUsbOneCfg(self, sVmName, sUsbCtrl, sSpeed, sUsbTest): + """ + Runs the specified VM thru one specified test. + + Returns a success indicator on the general test execution. This is not + the actual test result. + """ + oVM = self.getVmByName(sVmName); + + # Reconfigure the VM + fRc = True; + oSession = self.openSession(oVM); + if oSession is not None: + fRc = fRc and oSession.enableVirtEx(True); + fRc = fRc and oSession.enableNestedPaging(True); + + # Make sure controllers are disabled initially. + fRc = fRc and oSession.enableUsbOhci(False); + fRc = fRc and oSession.enableUsbEhci(False); + fRc = fRc and oSession.enableUsbXhci(False); + + if sUsbCtrl == 'OHCI': + fRc = fRc and oSession.enableUsbOhci(True); + elif sUsbCtrl == 'EHCI': + fRc = fRc and oSession.enableUsbEhci(True); + elif sUsbCtrl == 'XHCI': + fRc = fRc and oSession.enableUsbXhci(True); + fRc = fRc and oSession.saveSettings(); + fRc = oSession.close() and fRc and True; # pychecker hack. + oSession = None; + else: + fRc = False; + + # Start up. + if fRc is True: + self.logVmInfo(oVM); + oSession, oTxsSession = self.startVmAndConnectToTxsViaTcp(sVmName, fCdWait = False, fNatForwardingForTxs = False); + if oSession is not None: + self.addTask(oTxsSession); + + # Fudge factor - Allow the guest to finish starting up. + self.sleep(5); + + sCaptureFile = None; + if self.fUsbCapture: + sCaptureFile = self.getCaptureFilePath(sUsbCtrl, sSpeed); + + if sUsbTest == 'Compliance': + fRc = self.testUsbCompliance(oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile); + elif sUsbTest == 'Reattach': + fRc = self.testUsbReattach(oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile); + + # cleanup. + self.removeTask(oTxsSession); + self.terminateVmBySession(oSession) + + # Add the traffic dump if it exists and the test failed + if reporter.testErrorCount() > 0 \ + and sCaptureFile is not None \ + and os.path.exists(sCaptureFile): + reporter.addLogFile(sCaptureFile, 'misc/other', 'USB traffic dump'); + else: + fRc = False; + return fRc; + + def testUsbForOneVM(self, sVmName): + """ + Runs one VM thru the various configurations. + """ + fRc = False; + reporter.testStart(sVmName); + for sUsbCtrl in self.asUsbCtrls: + reporter.testStart(sUsbCtrl) + for sUsbSpeed in self.asUsbSpeed: + asSupportedSpeeds = self.kdUsbSpeedMappings.get(sUsbCtrl); + if sUsbSpeed in asSupportedSpeeds: + reporter.testStart(sUsbSpeed) + for sUsbTest in self.asUsbTests: + reporter.testStart(sUsbTest) + fRc = self.testUsbOneCfg(sVmName, sUsbCtrl, sUsbSpeed, sUsbTest); + reporter.testDone(); + reporter.testDone(); + reporter.testDone(); + reporter.testDone(); + return fRc; + + def testUsb(self): + """ + Executes USB test. + """ + + reporter.log("Running on host: " + self.sHostname); + + # Loop thru the test VMs. + for sVM in self.asTestVMs: + # run test on the VM. + fRc = self.testUsbForOneVM(sVM); + + return fRc; + + + +if __name__ == '__main__': + sys.exit(tdUsbBenchmark().main(sys.argv)); + -- cgit v1.2.3