1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'all', 'compile_commands.json', 'ARCH=' + self._linux_arch,
76			   'O=' + build_dir, '--jobs=' + str(jobs)]
77		if make_options:
78			command.extend(make_options)
79		if self._cross_compile:
80			command += ['CROSS_COMPILE=' + self._cross_compile]
81		print('Building with:\n$', ' '.join(command))
82		try:
83			proc = subprocess.Popen(command,
84						stderr=subprocess.PIPE,
85						stdout=subprocess.DEVNULL)
86		except OSError as e:
87			raise BuildError('Could not call execute make: ' + str(e))
88		except subprocess.CalledProcessError as e:
89			raise BuildError(e.output)
90		_, stderr = proc.communicate()
91		if proc.returncode != 0:
92			raise BuildError(stderr.decode())
93		if stderr:  # likely only due to build warnings
94			print(stderr.decode())
95
96	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
97		raise RuntimeError('not implemented!')
98
99
100class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
101
102	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
103		super().__init__(linux_arch=qemu_arch_params.linux_arch,
104				 cross_compile=cross_compile)
105		self._kconfig = qemu_arch_params.kconfig
106		self._qemu_arch = qemu_arch_params.qemu_arch
107		self._kernel_path = qemu_arch_params.kernel_path
108		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
109		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
110		self._serial = qemu_arch_params.serial
111
112	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
113		kconfig = kunit_config.parse_from_string(self._kconfig)
114		kconfig.merge_in_entries(base_kunitconfig)
115		return kconfig
116
117	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
118		kernel_path = os.path.join(build_dir, self._kernel_path)
119		qemu_command = ['qemu-system-' + self._qemu_arch,
120				'-nodefaults',
121				'-m', '1024',
122				'-kernel', kernel_path,
123				'-append', ' '.join(params + [self._kernel_command_line]),
124				'-no-reboot',
125				'-nographic',
126				'-serial', self._serial] + self._extra_qemu_params
127		# Note: shlex.join() does what we want, but requires python 3.8+.
128		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
129		return subprocess.Popen(qemu_command,
130					stdin=subprocess.PIPE,
131					stdout=subprocess.PIPE,
132					stderr=subprocess.STDOUT,
133					text=True, errors='backslashreplace')
134
135class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
136	"""An abstraction over command line operations performed on a source tree."""
137
138	def __init__(self, cross_compile: Optional[str]=None):
139		super().__init__(linux_arch='um', cross_compile=cross_compile)
140
141	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
142		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
143		kconfig.merge_in_entries(base_kunitconfig)
144		return kconfig
145
146	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
147		"""Runs the Linux UML binary. Must be named 'linux'."""
148		linux_bin = os.path.join(build_dir, 'linux')
149		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
150		print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
151		return subprocess.Popen([linux_bin] + params,
152					   stdin=subprocess.PIPE,
153					   stdout=subprocess.PIPE,
154					   stderr=subprocess.STDOUT,
155					   text=True, errors='backslashreplace')
156
157def get_kconfig_path(build_dir: str) -> str:
158	return os.path.join(build_dir, KCONFIG_PATH)
159
160def get_kunitconfig_path(build_dir: str) -> str:
161	return os.path.join(build_dir, KUNITCONFIG_PATH)
162
163def get_old_kunitconfig_path(build_dir: str) -> str:
164	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
165
166def get_parsed_kunitconfig(build_dir: str,
167			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
168	if not kunitconfig_paths:
169		path = get_kunitconfig_path(build_dir)
170		if not os.path.exists(path):
171			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
172		return kunit_config.parse_file(path)
173
174	merged = kunit_config.Kconfig()
175
176	for path in kunitconfig_paths:
177		if os.path.isdir(path):
178			path = os.path.join(path, KUNITCONFIG_PATH)
179		if not os.path.exists(path):
180			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
181
182		partial = kunit_config.parse_file(path)
183		diff = merged.conflicting_options(partial)
184		if diff:
185			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
186			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
187		merged.merge_in_entries(partial)
188	return merged
189
190def get_outfile_path(build_dir: str) -> str:
191	return os.path.join(build_dir, OUTFILE_PATH)
192
193def _default_qemu_config_path(arch: str) -> str:
194	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
195	if os.path.isfile(config_path):
196		return config_path
197
198	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
199	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
200
201def _get_qemu_ops(config_path: str,
202		  extra_qemu_args: Optional[List[str]],
203		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
204	# The module name/path has very little to do with where the actual file
205	# exists (I learned this through experimentation and could not find it
206	# anywhere in the Python documentation).
207	#
208	# Bascially, we completely ignore the actual file location of the config
209	# we are loading and just tell Python that the module lives in the
210	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
211	# exists as a file.
212	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
213	spec = importlib.util.spec_from_file_location(module_path, config_path)
214	assert spec is not None
215	config = importlib.util.module_from_spec(spec)
216	# See https://github.com/python/typeshed/pull/2626 for context.
217	assert isinstance(spec.loader, importlib.abc.Loader)
218	spec.loader.exec_module(config)
219
220	if not hasattr(config, 'QEMU_ARCH'):
221		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
222	params: qemu_config.QemuArchParams = config.QEMU_ARCH
223	if extra_qemu_args:
224		params.extra_qemu_params.extend(extra_qemu_args)
225	return params.linux_arch, LinuxSourceTreeOperationsQemu(
226			params, cross_compile=cross_compile)
227
228class LinuxSourceTree:
229	"""Represents a Linux kernel source tree with KUnit tests."""
230
231	def __init__(
232	      self,
233	      build_dir: str,
234	      kunitconfig_paths: Optional[List[str]]=None,
235	      kconfig_add: Optional[List[str]]=None,
236	      arch: Optional[str]=None,
237	      cross_compile: Optional[str]=None,
238	      qemu_config_path: Optional[str]=None,
239	      extra_qemu_args: Optional[List[str]]=None) -> None:
240		signal.signal(signal.SIGINT, self.signal_handler)
241		if qemu_config_path:
242			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
243		else:
244			self._arch = 'um' if arch is None else arch
245			if self._arch == 'um':
246				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
247			else:
248				qemu_config_path = _default_qemu_config_path(self._arch)
249				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
250
251		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
252		if kconfig_add:
253			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
254			self._kconfig.merge_in_entries(kconfig)
255
256	def arch(self) -> str:
257		return self._arch
258
259	def clean(self) -> bool:
260		try:
261			self._ops.make_mrproper()
262		except ConfigError as e:
263			logging.error(e)
264			return False
265		return True
266
267	def validate_config(self, build_dir: str) -> bool:
268		kconfig_path = get_kconfig_path(build_dir)
269		validated_kconfig = kunit_config.parse_file(kconfig_path)
270		if self._kconfig.is_subset_of(validated_kconfig):
271			return True
272		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
273		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
274			  'This is probably due to unsatisfied dependencies.\n' \
275			  'Missing: ' + ', '.join(str(e) for e in missing)
276		if self._arch == 'um':
277			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
278				   'on a different architecture with something like "--arch=x86_64".'
279		logging.error(message)
280		return False
281
282	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
283		kconfig_path = get_kconfig_path(build_dir)
284		if build_dir and not os.path.exists(build_dir):
285			os.mkdir(build_dir)
286		try:
287			self._kconfig = self._ops.make_arch_config(self._kconfig)
288			self._kconfig.write_to_file(kconfig_path)
289			self._ops.make_olddefconfig(build_dir, make_options)
290		except ConfigError as e:
291			logging.error(e)
292			return False
293		if not self.validate_config(build_dir):
294			return False
295
296		old_path = get_old_kunitconfig_path(build_dir)
297		if os.path.exists(old_path):
298			os.remove(old_path)  # write_to_file appends to the file
299		self._kconfig.write_to_file(old_path)
300		return True
301
302	def _kunitconfig_changed(self, build_dir: str) -> bool:
303		old_path = get_old_kunitconfig_path(build_dir)
304		if not os.path.exists(old_path):
305			return True
306
307		old_kconfig = kunit_config.parse_file(old_path)
308		return old_kconfig != self._kconfig
309
310	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
311		"""Creates a new .config if it is not a subset of the .kunitconfig."""
312		kconfig_path = get_kconfig_path(build_dir)
313		if not os.path.exists(kconfig_path):
314			print('Generating .config ...')
315			return self.build_config(build_dir, make_options)
316
317		existing_kconfig = kunit_config.parse_file(kconfig_path)
318		self._kconfig = self._ops.make_arch_config(self._kconfig)
319
320		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
321			return True
322		print('Regenerating .config ...')
323		os.remove(kconfig_path)
324		return self.build_config(build_dir, make_options)
325
326	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
327		try:
328			self._ops.make_olddefconfig(build_dir, make_options)
329			self._ops.make(jobs, build_dir, make_options)
330		except (ConfigError, BuildError) as e:
331			logging.error(e)
332			return False
333		return self.validate_config(build_dir)
334
335	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
336		if not args:
337			args = []
338		if filter_glob:
339			args.append('kunit.filter_glob=' + filter_glob)
340		if filter:
341			args.append('kunit.filter="' + filter + '"')
342		if filter_action:
343			args.append('kunit.filter_action=' + filter_action)
344		args.append('kunit.enable=1')
345
346		process = self._ops.start(args, build_dir)
347		assert process.stdout is not None  # tell mypy it's set
348
349		# Enforce the timeout in a background thread.
350		def _wait_proc() -> None:
351			try:
352				process.wait(timeout=timeout)
353			except Exception as e:
354				print(e)
355				process.terminate()
356				process.wait()
357		waiter = threading.Thread(target=_wait_proc)
358		waiter.start()
359
360		output = open(get_outfile_path(build_dir), 'w')
361		try:
362			# Tee the output to the file and to our caller in real time.
363			for line in process.stdout:
364				output.write(line)
365				yield line
366		# This runs even if our caller doesn't consume every line.
367		finally:
368			# Flush any leftover output to the file
369			output.write(process.stdout.read())
370			output.close()
371			process.stdout.close()
372
373			waiter.join()
374			subprocess.call(['stty', 'sane'])
375
376	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
377		logging.error('Build interruption occurred. Cleaning console.')
378		subprocess.call(['stty', 'sane'])
379