1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# test tools for the pyxpcom bindings
from xpcom import _xpcom
import unittest
# export a "getmemusage()" function that returns a useful "bytes used" count
# for the current process. Growth in this when doing the same thing over and
# over implies a leak.
try:
import win32api
import win32pdh
import win32pdhutil
have_pdh = 1
except ImportError:
have_pdh = 0
# XXX - win32pdh is slow, particularly finding our current process.
# A better way would be good.
# Our win32pdh specific functions - they can be at the top-level on all
# platforms, but will only actually be called if the modules are available.
def FindMyCounter():
pid_me = win32api.GetCurrentProcessId()
object = "Process"
items, instances = win32pdh.EnumObjectItems(None,None,object, -1)
for instance in instances:
# We use 2 counters - "ID Process" and "Working Set"
counter = "ID Process"
format = win32pdh.PDH_FMT_LONG
hq = win32pdh.OpenQuery()
path = win32pdh.MakeCounterPath( (None,object,instance, None, -1,"ID Process") )
hc1 = win32pdh.AddCounter(hq, path)
path = win32pdh.MakeCounterPath( (None,object,instance, None, -1,"Working Set") )
hc2 = win32pdh.AddCounter(hq, path)
win32pdh.CollectQueryData(hq)
type, pid = win32pdh.GetFormattedCounterValue(hc1, format)
if pid==pid_me:
win32pdh.RemoveCounter(hc1) # not needed any more
return hq, hc2
# Not mine - close the query and try again
win32pdh.RemoveCounter(hc1)
win32pdh.RemoveCounter(hc2)
win32pdh.CloseQuery(hq)
else:
raise RuntimeError, "Can't find myself!?"
def CloseCounter(hq, hc):
win32pdh.RemoveCounter(hc)
win32pdh.CloseQuery(hq)
def GetCounterValue(hq, hc):
win32pdh.CollectQueryData(hq)
format = win32pdh.PDH_FMT_LONG
type, val = win32pdh.GetFormattedCounterValue(hc, format)
return val
g_pdh_data = None
# The pdh function that does the work
def pdh_getmemusage():
global g_pdh_data
if g_pdh_data is None:
hq, hc = FindMyCounter()
g_pdh_data = hq, hc
hq, hc = g_pdh_data
return GetCounterValue(hq, hc)
# The public bit
if have_pdh:
getmemusage = pdh_getmemusage
else:
def getmemusage():
return 0
# Test runner utilities, including some support for builtin leak tests.
class TestLoader(unittest.TestLoader):
def loadTestsFromTestCase(self, testCaseClass):
"""Return a suite of all tests cases contained in testCaseClass"""
leak_tests = []
for name in self.getTestCaseNames(testCaseClass):
real_test = testCaseClass(name)
leak_test = self._getTestWrapper(real_test)
leak_tests.append(leak_test)
return self.suiteClass(leak_tests)
def _getTestWrapper(self, test):
# later! see pywin32's win32/test/util.py
return test
def loadTestsFromModule(self, mod):
if hasattr(mod, "suite"):
ret = mod.suite()
else:
ret = unittest.TestLoader.loadTestsFromModule(self, mod)
assert ret.countTestCases() > 0, "No tests in %r" % (mod,)
return ret
def loadTestsFromName(self, name, module=None):
test = unittest.TestLoader.loadTestsFromName(self, name, module)
if isinstance(test, unittest.TestSuite):
pass # hmmm? print "Don't wrap suites yet!", test._tests
elif isinstance(test, unittest.TestCase):
test = self._getTestWrapper(test)
else:
print "XXX - what is", test
return test
# A base class our tests should derive from (well, one day it will be)
TestCase = unittest.TestCase
def suite_from_functions(*funcs):
suite = unittest.TestSuite()
for func in funcs:
suite.addTest(unittest.FunctionTestCase(func))
return suite
def testmain(*args, **kw):
new_kw = kw.copy()
if not new_kw.has_key('testLoader'):
new_kw['testLoader'] = TestLoader()
try:
unittest.main(*args, **new_kw)
finally:
_xpcom.NS_ShutdownXPCOM()
ni = _xpcom._GetInterfaceCount()
ng = _xpcom._GetGatewayCount()
if ni or ng:
print "********* WARNING - Leaving with %d/%d objects alive" % (ni,ng)
|