1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# Run a perf script command multiple times in parallel, using perf script
5# options --cpu and --time so that each job processes a different chunk
6# of the data.
7#
8# Copyright (c) 2024, Intel Corporation.
9
10import subprocess
11import argparse
12import pathlib
13import shlex
14import time
15import copy
16import sys
17import os
18import re
19
20glb_prog_name = "parallel-perf.py"
21glb_min_interval = 10.0
22glb_min_samples = 64
23
24class Verbosity():
25
26	def __init__(self, quiet=False, verbose=False, debug=False):
27		self.normal    = True
28		self.verbose   = verbose
29		self.debug     = debug
30		self.self_test = True
31		if self.debug:
32			self.verbose = True
33		if self.verbose:
34			quiet = False
35		if quiet:
36			self.normal = False
37
38# Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
39class Work():
40
41	def __init__(self, cmd, pipe_to, output_dir="."):
42		self.popen = None
43		self.consumer = None
44		self.cmd = cmd
45		self.pipe_to = pipe_to
46		self.output_dir = output_dir
47		self.cmdout_name = f"{output_dir}/cmd.txt"
48		self.stdout_name = f"{output_dir}/out.txt"
49		self.stderr_name = f"{output_dir}/err.txt"
50
51	def Command(self):
52		sh_cmd = [ shlex.quote(x) for x in self.cmd ]
53		return " ".join(self.cmd)
54
55	def Stdout(self):
56		return open(self.stdout_name, "w")
57
58	def Stderr(self):
59		return open(self.stderr_name, "w")
60
61	def CreateOutputDir(self):
62		pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
63
64	def Start(self):
65		if self.popen:
66			return
67		self.CreateOutputDir()
68		with open(self.cmdout_name, "w") as f:
69			f.write(self.Command())
70			f.write("\n")
71		stdout = self.Stdout()
72		stderr = self.Stderr()
73		if self.pipe_to:
74			self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
75			args = shlex.split(self.pipe_to)
76			self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
77		else:
78			self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
79
80	def RemoveEmptyErrFile(self):
81		if os.path.exists(self.stderr_name):
82			if os.path.getsize(self.stderr_name) == 0:
83				os.unlink(self.stderr_name)
84
85	def Errors(self):
86		if os.path.exists(self.stderr_name):
87			if os.path.getsize(self.stderr_name) != 0:
88				return [ f"Non-empty error file {self.stderr_name}" ]
89		return []
90
91	def TidyUp(self):
92		self.RemoveEmptyErrFile()
93
94	def RawPollWait(self, p, wait):
95		if wait:
96			return p.wait()
97		return p.poll()
98
99	def Poll(self, wait=False):
100		if not self.popen:
101			return None
102		result = self.RawPollWait(self.popen, wait)
103		if self.consumer:
104			res = result
105			result = self.RawPollWait(self.consumer, wait)
106			if result != None and res == None:
107				self.popen.kill()
108				result = None
109			elif result == 0 and res != None and res != 0:
110				result = res
111		if result != None:
112			self.TidyUp()
113		return result
114
115	def Wait(self):
116		return self.Poll(wait=True)
117
118	def Kill(self):
119		if not self.popen:
120			return
121		self.popen.kill()
122		if self.consumer:
123			self.consumer.kill()
124
125def KillWork(worklist, verbosity):
126	for w in worklist:
127		w.Kill()
128	for w in worklist:
129		w.Wait()
130
131def NumberOfCPUs():
132	return os.sysconf("SC_NPROCESSORS_ONLN")
133
134def NanoSecsToSecsStr(x):
135	if x == None:
136		return ""
137	x = str(x)
138	if len(x) < 10:
139		x = "0" * (10 - len(x)) + x
140	return x[:len(x) - 9] + "." + x[-9:]
141
142def InsertOptionAfter(cmd, option, after):
143	try:
144		pos = cmd.index(after)
145		cmd.insert(pos + 1, option)
146	except:
147		cmd.append(option)
148
149def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150	max_len = len(str(cpus[-1]))
151	cpu_dir_fmt = f"cpu-%.{max_len}u"
152	worklist = []
153	pos = 0
154	for cpu in cpus:
155		if cpu >= 0:
156			cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157			cpu_option = f"--cpu={cpu}"
158		else:
159			cpu_dir = output_dir
160			cpu_option = None
161
162		tr_dir_fmt = "time-range"
163
164		if len(time_ranges_by_cpu) > 1:
165			time_ranges = time_ranges_by_cpu[pos]
166			tr_dir_fmt += f"-{pos}"
167			pos += 1
168		else:
169			time_ranges = time_ranges_by_cpu[0]
170
171		max_len = len(str(len(time_ranges)))
172		tr_dir_fmt += f"-%.{max_len}u"
173
174		i = 0
175		for r in time_ranges:
176			if r == [None, None]:
177				time_option = None
178				work_output_dir = cpu_dir
179			else:
180				time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181				work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182				i += 1
183			work_cmd = list(cmd)
184			if time_option != None:
185				InsertOptionAfter(work_cmd, time_option, "script")
186			if cpu_option != None:
187				InsertOptionAfter(work_cmd, cpu_option, "script")
188			w = Work(work_cmd, pipe_to, work_output_dir)
189			worklist.append(w)
190	return worklist
191
192def DoRunWork(worklist, nr_jobs, verbosity):
193	nr_to_do = len(worklist)
194	not_started = list(worklist)
195	running = []
196	done = []
197	chg = False
198	while True:
199		nr_done = len(done)
200		if chg and verbosity.normal:
201			nr_run = len(running)
202			print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203			if verbosity.verbose:
204				print()
205			chg = False
206		if nr_done == nr_to_do:
207			break
208		while len(running) < nr_jobs and len(not_started):
209			w = not_started.pop(0)
210			running.append(w)
211			if verbosity.verbose:
212				print("Starting:", w.Command())
213			w.Start()
214			chg = True
215		if len(running):
216			time.sleep(0.1)
217		finished = []
218		not_finished = []
219		while len(running):
220			w = running.pop(0)
221			r = w.Poll()
222			if r == None:
223				not_finished.append(w)
224				continue
225			if r == 0:
226				if verbosity.verbose:
227					print("Finished:", w.Command())
228				finished.append(w)
229				chg = True
230				continue
231			if verbosity.normal and not verbosity.verbose:
232				print()
233			print("Job failed!\n    return code:", r, "\n    command:    ", w.Command())
234			if w.pipe_to:
235				print("    piped to:   ", w.pipe_to)
236			print("Killing outstanding jobs")
237			KillWork(not_finished, verbosity)
238			KillWork(running, verbosity)
239			return False
240		running = not_finished
241		done += finished
242	errorlist = []
243	for w in worklist:
244		errorlist += w.Errors()
245	if len(errorlist):
246		print("Errors:")
247		for e in errorlist:
248			print(e)
249	elif verbosity.normal:
250		print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251	return True
252
253def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254	try:
255		return DoRunWork(worklist, nr_jobs, verbosity)
256	except:
257		for w in worklist:
258			w.Kill()
259		raise
260	return True
261
262def ReadHeader(perf, file_name):
263	return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264
265def ParseHeader(hdr):
266	result = {}
267	lines = hdr.split("\n")
268	for line in lines:
269		if ":" in line and line[0] == "#":
270			pos = line.index(":")
271			name = line[1:pos-1].strip()
272			value = line[pos+1:].strip()
273			if name in result:
274				orig_name = name
275				nr = 2
276				while True:
277					name = f"{orig_name} {nr}"
278					if name not in result:
279						break
280					nr += 1
281			result[name] = value
282	return result
283
284def HeaderField(hdr_dict, hdr_fld):
285	if hdr_fld not in hdr_dict:
286		raise Exception(f"'{hdr_fld}' missing from header information")
287	return hdr_dict[hdr_fld]
288
289# Represent the position of an option within a command string
290# and provide the option value and/or remove the option
291class OptPos():
292
293	def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294		self.opt_element = opt_element		# list element that contains option
295		self.value_element = value_element	# list element that contains option value
296		self.opt_pos = opt_pos			# string position of option
297		self.value_pos = value_pos		# string position of value
298		self.error = error			# error message string
299
300	def __init__(self, args, short_name, long_name, default=None):
301		self.args = list(args)
302		self.default = default
303		n = 2 + len(long_name)
304		m = len(short_name)
305		pos = -1
306		for opt in args:
307			pos += 1
308			if m and opt[:2] == f"-{short_name}":
309				if len(opt) == 2:
310					if pos + 1 < len(args):
311						self.Init(pos, pos + 1, 0, 0)
312					else:
313						self.Init(error = f"-{short_name} option missing value")
314				else:
315					self.Init(pos, pos, 0, 2)
316				return
317			if opt[:n] == f"--{long_name}":
318				if len(opt) == n:
319					if pos + 1 < len(args):
320						self.Init(pos, pos + 1, 0, 0)
321					else:
322						self.Init(error = f"--{long_name} option missing value")
323				elif opt[n] == "=":
324					self.Init(pos, pos, 0, n + 1)
325				else:
326					self.Init(error = f"--{long_name} option expected '='")
327				return
328			if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329				ipos = opt.index(short_name)
330				if "-" in opt[1:]:
331					hpos = opt[1:].index("-")
332					if hpos < ipos:
333						continue
334				if ipos + 1 == len(opt):
335					if pos + 1 < len(args):
336						self.Init(pos, pos + 1, ipos, 0)
337					else:
338						self.Init(error = f"-{short_name} option missing value")
339				else:
340					self.Init(pos, pos, ipos, ipos + 1)
341				return
342		self.Init()
343
344	def Value(self):
345		if self.opt_element >= 0:
346			if self.opt_element != self.value_element:
347				return self.args[self.value_element]
348			else:
349				return self.args[self.value_element][self.value_pos:]
350		return self.default
351
352	def Remove(self, args):
353		if self.opt_element == -1:
354			return
355		if self.opt_element != self.value_element:
356			del args[self.value_element]
357		if self.opt_pos:
358			args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359		else:
360			del args[self.opt_element]
361
362def DetermineInputFileName(cmd):
363	p = OptPos(cmd, "i", "input", "perf.data")
364	if p.error:
365		raise Exception(f"perf command {p.error}")
366	file_name = p.Value()
367	if not os.path.exists(file_name):
368		raise Exception(f"perf command input file '{file_name}' not found")
369	return file_name
370
371def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372	p = OptPos(args, short_name, long_name)
373	if p.error:
374		raise Exception(f"{err_prefix}{p.error}")
375	value = p.Value()
376	if remove:
377		p.Remove(args)
378	return value
379
380def ExtractOption(args, short_name, long_name, err_prefix):
381	return ReadOption(args, short_name, long_name, err_prefix, True)
382
383def ReadPerfOption(args, short_name, long_name):
384	return ReadOption(args, short_name, long_name, "perf command ")
385
386def ExtractPerfOption(args, short_name, long_name):
387	return ExtractOption(args, short_name, long_name, "perf command ")
388
389def PerfDoubleQuickCommands(cmd, file_name):
390	cpu_str = ReadPerfOption(cmd, "C", "cpu")
391	time_str = ReadPerfOption(cmd, "", "time")
392	# Use double-quick sampling to determine trace data density
393	times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394	if cpu_str != None and cpu_str != "":
395		times_cmd.append(f"--cpu={cpu_str}")
396	if time_str != None and time_str != "":
397		times_cmd.append(f"--time={time_str}")
398	cnts_cmd = list(times_cmd)
399	cnts_cmd.append("-Fcpu")
400	times_cmd.append("-Fcpu,time")
401	return cnts_cmd, times_cmd
402
403class CPUTimeRange():
404	def __init__(self, cpu):
405		self.cpu = cpu
406		self.sample_cnt = 0
407		self.time_ranges = None
408		self.interval = 0
409		self.interval_remaining = 0
410		self.remaining = 0
411		self.tr_pos = 0
412
413def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414	cpu_time_range = cpu_time_ranges[cpu]
415	cpu_time_range.remaining -= 1
416	cpu_time_range.interval_remaining -= 1
417	if cpu_time_range.remaining == 0:
418		cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419		return
420	if cpu_time_range.interval_remaining == 0:
421		time = TimeVal(line[1][:-1], 0)
422		time_ranges = cpu_time_range.time_ranges
423		time_ranges[cpu_time_range.tr_pos][1] = time - 1
424		time_ranges.append([time, max_time])
425		cpu_time_range.tr_pos += 1
426		cpu_time_range.interval_remaining = cpu_time_range.interval
427
428def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429	try:
430		cpu_time_ranges[cpu].sample_cnt += 1
431	except:
432		print("exception")
433		print("cpu", cpu)
434		print("len(cpu_time_ranges)", len(cpu_time_ranges))
435		raise
436
437def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438	# Assume CPU number is at beginning of line and enclosed by []
439	pat = re.compile(r"\s*\[[0-9]+\]")
440	p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441	while True:
442		line = p.stdout.readline()
443		if line:
444			line = line.decode("utf-8")
445			if pat.match(line):
446				line = line.split()
447				if per_cpu:
448					# Assumes CPU number is enclosed by []
449					cpu = int(line[0][1:-1])
450				else:
451					cpu = 0
452				fn(line, cpu, *x)
453		else:
454			break
455	p.wait()
456
457def IntersectTimeRanges(new_time_ranges, time_ranges):
458	pos = 0
459	new_pos = 0
460	# Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
461	# Note also, there *must* be at least one intersection.
462	while pos < len(time_ranges) and new_pos < len(new_time_ranges):
463		# new end < old start => no intersection, remove new
464		if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
465			del new_time_ranges[new_pos]
466			continue
467		# new start > old end => no intersection, check next
468		if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
469			pos += 1
470			if pos < len(time_ranges):
471				continue
472			# no next, so remove remaining
473			while new_pos < len(new_time_ranges):
474				del new_time_ranges[new_pos]
475			return
476		# Found an intersection
477		# new start < old start => adjust new start = old start
478		if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
479			new_time_ranges[new_pos][0] = time_ranges[pos][0]
480		# new end > old end => keep the overlap, insert the remainder
481		if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
482			r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
483			new_time_ranges[new_pos][1] = time_ranges[pos][1]
484			new_pos += 1
485			new_time_ranges.insert(new_pos, r)
486			continue
487		# new [start, end] is within old [start, end]
488		new_pos += 1
489
490def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
491	if verbosity.normal:
492		print("\rAnalyzing...", flush=True, end=" ")
493		if verbosity.verbose:
494			print()
495	cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
496
497	nr_cpus = cpus[-1] + 1 if per_cpu else 1
498	if per_cpu:
499		nr_cpus = cpus[-1] + 1
500		cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
501	else:
502		nr_cpus = 1
503		cpu_time_ranges = [ CPUTimeRange(-1) ]
504
505	if verbosity.debug:
506		print("nr_cpus", nr_cpus)
507		print("cnts_cmd", cnts_cmd)
508		print("times_cmd", times_cmd)
509
510	# Count the number of "double quick" samples per CPU
511	ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
512
513	tot = 0
514	mx = 0
515	for cpu_time_range in cpu_time_ranges:
516		cnt = cpu_time_range.sample_cnt
517		tot += cnt
518		if cnt > mx:
519			mx = cnt
520		if verbosity.debug:
521			print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
522
523	if min_size < 1:
524		min_size = 1
525
526	if mx < min_size:
527		# Too little data to be worth splitting
528		if verbosity.debug:
529			print("Too little data to split by time")
530		if nr == 0:
531			nr = 1
532		return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
533
534	if nr:
535		divisor = nr
536		min_size = 1
537	else:
538		divisor = NumberOfCPUs()
539
540	interval = int(round(tot / divisor, 0))
541	if interval < min_size:
542		interval = min_size
543
544	if verbosity.debug:
545		print("divisor", divisor)
546		print("min_size", min_size)
547		print("interval", interval)
548
549	min_time = time_ranges[0][0]
550	max_time = time_ranges[-1][1]
551
552	for cpu_time_range in cpu_time_ranges:
553		cnt = cpu_time_range.sample_cnt
554		if cnt == 0:
555			cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
556			continue
557		# Adjust target interval for CPU to give approximately equal interval sizes
558		# Determine number of intervals, rounding to nearest integer
559		n = int(round(cnt / interval, 0))
560		if n < 1:
561			n = 1
562		# Determine interval size, rounding up
563		d, m = divmod(cnt, n)
564		if m:
565			d += 1
566		cpu_time_range.interval = d
567		cpu_time_range.interval_remaining = d
568		cpu_time_range.remaining = cnt
569		# Init. time ranges for each CPU with the start time
570		cpu_time_range.time_ranges = [ [min_time, max_time] ]
571
572	# Set time ranges so that the same number of "double quick" samples
573	# will fall into each time range.
574	ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
575
576	for cpu_time_range in cpu_time_ranges:
577		if cpu_time_range.sample_cnt:
578			IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
579
580	return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
581
582def SplitSingleTimeRangeIntoN(time_range, n):
583	if n <= 1:
584		return [time_range]
585	start = time_range[0]
586	end   = time_range[1]
587	duration = int((end - start + 1) / n)
588	if duration < 1:
589		return [time_range]
590	time_ranges = []
591	for i in range(n):
592		time_ranges.append([start, start + duration - 1])
593		start += duration
594	time_ranges[-1][1] = end
595	return time_ranges
596
597def TimeRangeDuration(r):
598	return r[1] - r[0] + 1
599
600def TotalDuration(time_ranges):
601	duration = 0
602	for r in time_ranges:
603		duration += TimeRangeDuration(r)
604	return duration
605
606def SplitTimeRangesByInterval(time_ranges, interval):
607	new_ranges = []
608	for r in time_ranges:
609		duration = TimeRangeDuration(r)
610		n = duration / interval
611		n = int(round(n, 0))
612		new_ranges += SplitSingleTimeRangeIntoN(r, n)
613	return new_ranges
614
615def SplitTimeRangesIntoN(time_ranges, n, min_interval):
616	if n <= len(time_ranges):
617		return time_ranges
618	duration = TotalDuration(time_ranges)
619	interval = duration / n
620	if interval < min_interval:
621		interval = min_interval
622	return SplitTimeRangesByInterval(time_ranges, interval)
623
624def RecombineTimeRanges(tr):
625	new_tr = copy.deepcopy(tr)
626	n = len(new_tr)
627	i = 1
628	while i < len(new_tr):
629		# if prev end + 1 == cur start, combine them
630		if new_tr[i - 1][1] + 1 == new_tr[i][0]:
631			new_tr[i][0] = new_tr[i - 1][0]
632			del new_tr[i - 1]
633		else:
634			i += 1
635	return new_tr
636
637def OpenTimeRangeEnds(time_ranges, min_time, max_time):
638	if time_ranges[0][0] <= min_time:
639		time_ranges[0][0] = None
640	if time_ranges[-1][1] >= max_time:
641		time_ranges[-1][1] = None
642
643def BadTimeStr(time_str):
644	raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
645
646def ValidateTimeRanges(time_ranges, time_str):
647	n = len(time_ranges)
648	for i in range(n):
649		start = time_ranges[i][0]
650		end   = time_ranges[i][1]
651		if i != 0 and start <= time_ranges[i - 1][1]:
652			BadTimeStr(time_str)
653		if start > end:
654			BadTimeStr(time_str)
655
656def TimeVal(s, dflt):
657	s = s.strip()
658	if s == "":
659		return dflt
660	a = s.split(".")
661	if len(a) > 2:
662		raise Exception(f"Bad time value'{s}'")
663	x = int(a[0])
664	if x < 0:
665		raise Exception("Negative time not allowed")
666	x *= 1000000000
667	if len(a) > 1:
668		x += int((a[1] + "000000000")[:9])
669	return x
670
671def BadCPUStr(cpu_str):
672	raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
673
674def ParseTimeStr(time_str, min_time, max_time):
675	if time_str == None or time_str == "":
676		return [[min_time, max_time]]
677	time_ranges = []
678	for r in time_str.split():
679		a = r.split(",")
680		if len(a) != 2:
681			BadTimeStr(time_str)
682		try:
683			start = TimeVal(a[0], min_time)
684			end   = TimeVal(a[1], max_time)
685		except:
686			BadTimeStr(time_str)
687		time_ranges.append([start, end])
688	ValidateTimeRanges(time_ranges, time_str)
689	return time_ranges
690
691def ParseCPUStr(cpu_str, nr_cpus):
692	if cpu_str == None or cpu_str == "":
693		return [-1]
694	cpus = []
695	for r in cpu_str.split(","):
696		a = r.split("-")
697		if len(a) < 1 or len(a) > 2:
698			BadCPUStr(cpu_str)
699		try:
700			start = int(a[0].strip())
701			if len(a) > 1:
702				end = int(a[1].strip())
703			else:
704				end = start
705		except:
706			BadCPUStr(cpu_str)
707		if start < 0 or end < 0 or end < start or end >= nr_cpus:
708			BadCPUStr(cpu_str)
709		cpus.extend(range(start, end + 1))
710	cpus = list(set(cpus)) # Remove duplicates
711	cpus.sort()
712	return cpus
713
714class ParallelPerf():
715
716	def __init__(self, a):
717		for arg_name in vars(a):
718			setattr(self, arg_name, getattr(a, arg_name))
719		self.orig_nr = self.nr
720		self.orig_cmd = list(self.cmd)
721		self.perf = self.cmd[0]
722		if os.path.exists(self.output_dir):
723			raise Exception(f"Output '{self.output_dir}' already exists")
724		if self.jobs < 0 or self.nr < 0 or self.interval < 0:
725			raise Exception("Bad options (negative values): try -h option for help")
726		if self.nr != 0 and self.interval != 0:
727			raise Exception("Cannot specify number of time subdivisions and time interval")
728		if self.jobs == 0:
729			self.jobs = NumberOfCPUs()
730		if self.nr == 0 and self.interval == 0:
731			if self.per_cpu:
732				self.nr = 1
733			else:
734				self.nr = self.jobs
735
736	def Init(self):
737		if self.verbosity.debug:
738			print("cmd", self.cmd)
739		self.file_name = DetermineInputFileName(self.cmd)
740		self.hdr = ReadHeader(self.perf, self.file_name)
741		self.hdr_dict = ParseHeader(self.hdr)
742		self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
743
744	def ExtractTimeInfo(self):
745		self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
746		self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
747		self.time_str = ExtractPerfOption(self.cmd, "", "time")
748		self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
749		if self.verbosity.debug:
750			print("time_ranges", self.time_ranges)
751
752	def ExtractCPUInfo(self):
753		if self.per_cpu:
754			nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
755			self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
756			if self.cpu_str == None or self.cpu_str == "":
757				self.cpus = [ x for x in range(nr_cpus) ]
758			else:
759				self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
760		else:
761			self.cpu_str = None
762			self.cpus = [-1]
763		if self.verbosity.debug:
764			print("cpus", self.cpus)
765
766	def IsIntelPT(self):
767		return self.cmd_line.find("intel_pt") >= 0
768
769	def SplitTimeRanges(self):
770		if self.IsIntelPT() and self.interval == 0:
771			self.split_time_ranges_for_each_cpu = \
772				SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
773								  self.orig_cmd, self.file_name, self.per_cpu,
774								  self.min_size, self.min_interval, self.verbosity)
775		elif self.nr:
776			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
777		else:
778			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
779
780	def CheckTimeRanges(self):
781		for tr in self.split_time_ranges_for_each_cpu:
782			# Re-combined time ranges should be the same
783			new_tr = RecombineTimeRanges(tr)
784			if new_tr != self.time_ranges:
785				if self.verbosity.debug:
786					print("tr", tr)
787					print("new_tr", new_tr)
788				raise Exception("Self test failed!")
789
790	def OpenTimeRangeEnds(self):
791		for time_ranges in self.split_time_ranges_for_each_cpu:
792			OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
793
794	def CreateWorkList(self):
795		self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
796
797	def PerfDataRecordedPerCPU(self):
798		if "--per-thread" in self.cmd_line.split():
799			return False
800		return True
801
802	def DefaultToPerCPU(self):
803		# --no-per-cpu option takes precedence
804		if self.no_per_cpu:
805			return False
806		if not self.PerfDataRecordedPerCPU():
807			return False
808		# Default to per-cpu for Intel PT data that was recorded per-cpu,
809		# because decoding can be done for each CPU separately.
810		if self.IsIntelPT():
811			return True
812		return False
813
814	def Config(self):
815		self.Init()
816		self.ExtractTimeInfo()
817		if not self.per_cpu:
818			self.per_cpu = self.DefaultToPerCPU()
819		if self.verbosity.debug:
820			print("per_cpu", self.per_cpu)
821		self.ExtractCPUInfo()
822		self.SplitTimeRanges()
823		if self.verbosity.self_test:
824			self.CheckTimeRanges()
825		# Prefer open-ended time range to starting / ending with min_time / max_time resp.
826		self.OpenTimeRangeEnds()
827		self.CreateWorkList()
828
829	def Run(self):
830		if self.dry_run:
831			print(len(self.worklist),"jobs:")
832			for w in self.worklist:
833				print(w.Command())
834			return True
835		result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
836		if self.verbosity.verbose:
837			print(glb_prog_name, "done")
838		return result
839
840def RunParallelPerf(a):
841	pp = ParallelPerf(a)
842	pp.Config()
843	return pp.Run()
844
845def Main(args):
846	ap = argparse.ArgumentParser(
847		prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
848		description =
849"""
850Run a perf script command multiple times in parallel, using perf script options
851--cpu and --time so that each job processes a different chunk of the data.
852""",
853		epilog =
854"""
855Follow the options by '--' and then the perf script command e.g.
856
857	$ perf record -a -- sleep 10
858	$ parallel-perf.py --nr=4 -- perf script --ns
859	All jobs finished successfully
860	$ tree parallel-perf-output/
861	parallel-perf-output/
862	├── time-range-0
863	│   ├── cmd.txt
864	│   └── out.txt
865	├── time-range-1
866	│   ├── cmd.txt
867	│   └── out.txt
868	├── time-range-2
869	│   ├── cmd.txt
870	│   └── out.txt
871	└── time-range-3
872	    ├── cmd.txt
873	    └── out.txt
874	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
875	parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
876	parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
877	parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
878	parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
879
880Any perf script command can be used, including the use of perf script options
881--dlfilter and --script, so that the benefit of running parallel jobs
882naturally extends to them also.
883
884If option --pipe-to is used, standard output is first piped through that
885command. Beware, if the command fails (e.g. grep with no matches), it will be
886considered a fatal error.
887
888Final standard output is redirected to files named out.txt in separate
889subdirectories under the output directory. Similarly, standard error is
890written to files named err.txt. In addition, files named cmd.txt contain the
891corresponding perf script command. After processing, err.txt files are removed
892if they are empty.
893
894If any job exits with a non-zero exit code, then all jobs are killed and no
895more are started. A message is printed if any job results in a non-empty
896err.txt file.
897
898There is a separate output subdirectory for each time range. If the --per-cpu
899option is used, these are further grouped under cpu-n subdirectories, e.g.
900
901	$ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
902	All jobs finished successfully
903	$ tree parallel-perf-output
904	parallel-perf-output/
905	├── cpu-0
906	│   ├── time-range-0
907	│   │   ├── cmd.txt
908	│   │   └── out.txt
909	│   └── time-range-1
910	│       ├── cmd.txt
911	│       └── out.txt
912	└── cpu-1
913	    ├── time-range-0
914	    │   ├── cmd.txt
915	    │   └── out.txt
916	    └── time-range-1
917	        ├── cmd.txt
918	        └── out.txt
919	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
920	parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
921	parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
922	parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
923	parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
924
925Subdivisions of time range, and cpus if the --per-cpu option is used, are
926expressed by the --time and --cpu perf script options respectively. If the
927supplied perf script command has a --time option, then that time range is
928subdivided, otherwise the time range given by 'time of first sample' to
929'time of last sample' is used (refer perf script --header-only). Similarly, the
930supplied perf script command may provide a --cpu option, and only those CPUs
931will be processed.
932
933To prevent time intervals becoming too small, the --min-interval option can
934be used.
935
936Note there is special handling for processing Intel PT traces. If an interval is
937not specified and the perf record command contained the intel_pt event, then the
938time range will be subdivided in order to produce subdivisions that contain
939approximately the same amount of trace data. That is accomplished by counting
940double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
941approximately the same number of samples. In that case, time ranges may not be
942the same for each CPU processed. For Intel PT, --per-cpu is the default, but
943that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
944decoding produces 1 sample for each PSB synchronization packet, which in turn
945come after a certain number of bytes output, determined by psb_period (refer
946perf Intel PT documentation). The minimum number of double-quick samples that
947will define a time range can be set by the --min_size option, which defaults to
94864.
949""")
950	ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
951	ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
952	ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
953	ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
954	ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
955	ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
956	ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
957	ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
958	ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
959	ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
960	ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
961	ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
962	ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
963	cmd_line = list(args)
964	try:
965		split_pos = cmd_line.index("--")
966		cmd = cmd_line[split_pos + 1:]
967		args = cmd_line[:split_pos]
968	except:
969		cmd = None
970		args = cmd_line
971	a = ap.parse_args(args=args[1:])
972	a.cmd = cmd
973	a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
974	try:
975		if a.cmd == None:
976			if len(args) <= 1:
977				ap.print_help()
978				return True
979			raise Exception("Command line must contain '--' before perf command")
980		return RunParallelPerf(a)
981	except Exception as e:
982		print("Fatal error: ", str(e))
983		if a.debug:
984			raise
985		return False
986
987if __name__ == "__main__":
988	if not Main(sys.argv):
989		sys.exit(1)
990