1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
|
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Contains a helper function for deploying and executing a packaged
executable on a Target."""
from __future__ import print_function
import common
import hashlib
import logging
import multiprocessing
import os
import re
import select
import subprocess
import sys
import threading
import uuid
from symbolizer import BuildIdsPaths, RunSymbolizer, SymbolizerFilter
FAR = common.GetHostToolPathFromPlatform('far')
# Amount of time to wait for the termination of the system log output thread.
_JOIN_TIMEOUT_SECS = 5
def _AttachKernelLogReader(target):
"""Attaches a kernel log reader as a long-running SSH task."""
logging.info('Attaching kernel logger.')
return target.RunCommandPiped(['dlog', '-f'],
stdin=open(os.devnull, 'r'),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
class SystemLogReader(object):
"""Collects and symbolizes Fuchsia system log to a file."""
def __init__(self):
self._listener_proc = None
self._symbolizer_proc = None
self._system_log = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stops the system logging processes and closes the output file."""
if self._symbolizer_proc:
self._symbolizer_proc.kill()
if self._listener_proc:
self._listener_proc.kill()
if self._system_log:
self._system_log.close()
def Start(self, target, package_paths, system_log_file):
"""Start a system log reader as a long-running SSH task."""
logging.debug('Writing fuchsia system log to %s' % system_log_file)
self._listener_proc = target.RunCommandPiped(['log_listener'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
self._system_log = open(system_log_file, 'w', buffering=1)
self._symbolizer_proc = RunSymbolizer(self._listener_proc.stdout,
self._system_log,
BuildIdsPaths(package_paths))
class MergedInputStream(object):
"""Merges a number of input streams into a UNIX pipe on a dedicated thread.
Terminates when the file descriptor of the primary stream (the first in
the sequence) is closed."""
def __init__(self, streams):
assert len(streams) > 0
self._streams = streams
self._output_stream = None
self._thread = None
def Start(self):
"""Returns a pipe to the merged output stream."""
read_pipe, write_pipe = os.pipe()
self._output_stream = os.fdopen(write_pipe, 'wb', 1)
self._thread = threading.Thread(target=self._Run)
self._thread.start()
return os.fdopen(read_pipe, 'r')
def _Run(self):
streams_by_fd = {}
primary_fd = self._streams[0].fileno()
for s in self._streams:
streams_by_fd[s.fileno()] = s
# Set when the primary FD is closed. Input from other FDs will continue to
# be processed until select() runs dry.
flush = False
# The lifetime of the MergedInputStream is bound to the lifetime of
# |primary_fd|.
while primary_fd:
# When not flushing: block until data is read or an exception occurs.
rlist, _, xlist = select.select(streams_by_fd, [], streams_by_fd)
if len(rlist) == 0 and flush:
break
for fileno in xlist:
del streams_by_fd[fileno]
if fileno == primary_fd:
primary_fd = None
for fileno in rlist:
line = streams_by_fd[fileno].readline()
if line:
self._output_stream.write(line)
else:
del streams_by_fd[fileno]
if fileno == primary_fd:
primary_fd = None
# Flush the streams by executing nonblocking reads from the input file
# descriptors until no more data is available, or all the streams are
# closed.
while streams_by_fd:
rlist, _, _ = select.select(streams_by_fd, [], [], 0)
if not rlist:
break
for fileno in rlist:
line = streams_by_fd[fileno].readline()
if line:
self._output_stream.write(line)
else:
del streams_by_fd[fileno]
def _GetComponentUri(package_name):
return 'fuchsia-pkg://fuchsia.com/%s#meta/%s.cmx' % (package_name,
package_name)
class RunTestPackageArgs:
"""RunTestPackage() configuration arguments structure.
code_coverage: If set, the test package will be run via 'runtests', and the
output will be saved to /tmp folder on the device.
system_logging: If set, connects a system log reader to the target.
test_realm_label: Specifies the realm name that run-test-component should use.
This must be specified if a filter file is to be set, or a results summary
file fetched after the test suite has run.
use_run_test_component: If True then the test package will be run hermetically
via 'run-test-component', rather than using 'run'.
"""
def __init__(self):
self.code_coverage = False
self.system_logging = False
self.test_realm_label = None
self.use_run_test_component = False
@staticmethod
def FromCommonArgs(args):
run_test_package_args = RunTestPackageArgs()
run_test_package_args.code_coverage = args.code_coverage
run_test_package_args.system_logging = args.include_system_logs
return run_test_package_args
def _DrainStreamToStdout(stream, quit_event):
"""Outputs the contents of |stream| until |quit_event| is set."""
while not quit_event.is_set():
rlist, _, _ = select.select([stream], [], [], 0.1)
if rlist:
line = rlist[0].readline()
if not line:
return
print(line.rstrip())
def RunTestPackage(output_dir, target, package_paths, package_name,
package_args, args):
"""Installs the Fuchsia package at |package_path| on the target,
executes it with |package_args|, and symbolizes its output.
output_dir: The path containing the build output files.
target: The deployment Target object that will run the package.
package_paths: The paths to the .far packages to be installed.
package_name: The name of the primary package to run.
package_args: The arguments which will be passed to the Fuchsia process.
args: RunTestPackageArgs instance configuring how the package will be run.
Returns the exit code of the remote package process."""
system_logger = (_AttachKernelLogReader(target)
if args.system_logging else None)
try:
if system_logger:
# Spin up a thread to asynchronously dump the system log to stdout
# for easier diagnoses of early, pre-execution failures.
log_output_quit_event = multiprocessing.Event()
log_output_thread = threading.Thread(target=lambda: _DrainStreamToStdout(
system_logger.stdout, log_output_quit_event))
log_output_thread.daemon = True
log_output_thread.start()
with target.GetPkgRepo():
target.InstallPackage(package_paths)
if system_logger:
log_output_quit_event.set()
log_output_thread.join(timeout=_JOIN_TIMEOUT_SECS)
logging.info('Running application.')
# TODO(crbug.com/1156768): Deprecate runtests.
if args.code_coverage:
# runtests requires specifying an output directory and a double dash
# before the argument list.
command = ['runtests', '-o', '/tmp', _GetComponentUri(package_name)]
if args.test_realm_label:
command += ['--realm-label', args.test_realm_label]
command += ['--']
elif args.use_run_test_component:
command = ['run-test-component']
if args.test_realm_label:
command += ['--realm-label=%s' % args.test_realm_label]
command.append(_GetComponentUri(package_name))
else:
command = ['run', _GetComponentUri(package_name)]
command.extend(package_args)
process = target.RunCommandPiped(command,
stdin=open(os.devnull, 'r'),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
if system_logger:
output_stream = MergedInputStream(
[process.stdout, system_logger.stdout]).Start()
else:
output_stream = process.stdout
# Run the log data through the symbolizer process.
output_stream = SymbolizerFilter(output_stream,
BuildIdsPaths(package_paths))
for next_line in output_stream:
# TODO(crbug/1198733): Switch to having stream encode to utf-8 directly
# once we drop Python 2 support.
print(next_line.encode('utf-8').rstrip())
process.wait()
if process.returncode == 0:
logging.info('Process exited normally with status code 0.')
else:
# The test runner returns an error status code if *any* tests fail,
# so we should proceed anyway.
logging.warning('Process exited with status code %d.' %
process.returncode)
finally:
if system_logger:
logging.info('Terminating kernel log reader.')
log_output_quit_event.set()
log_output_thread.join()
system_logger.kill()
return process.returncode
|