Lines Matching +full:self +full:- +full:test

1 # SPDX-License-Identifier: GPL-2.0
11 …def __init__(self, metric: list[str], wl: str, value: list[float], low: float, up=float('nan'), de… argument
12 self.metric: list = metric # multiple metrics in relationship type tests
13 self.workloads = [wl] # multiple workloads possible
14 self.collectedValue: list = value
15 self.valueLowBound = low
16 self.valueUpBound = up
17 self.description = description
19 def __repr__(self) -> str: argument
20 if len(self.metric) > 1:
24 … \tRelationship rule description: \'{5}\'".format(self.metric, self.collectedValue, self.workloads,
25self.valueLowBound, self.valueUpBound, self.description)
26 elif len(self.collectedValue) == 0:
28 \tworkload(s): {1}".format(self.metric, self.workloads)
33 .format(self.metric, self.collectedValue, self.workloads,
34 self.valueLowBound, self.valueUpBound)
38 …def __init__(self, rulefname, reportfname='', t=5, debug=False, datafname='', fullrulefname='', wo… argument
39 self.rulefname = rulefname
40 self.reportfname = reportfname
41 self.rules = None
42 self.collectlist: str = metrics
43 self.metrics = self.__set_metrics(metrics)
44 self.skiplist = set()
45 self.tolerance = t
47 self.workloads = [x for x in workload.split(",") if x]
48 self.wlidx = 0 # idx of current workloads
49 self.allresults = dict() # metric results of all workload
50 self.alltotalcnt = dict()
51 self.allpassedcnt = dict()
53 self.results = dict() # metric results of current workload
54 # vars for test pass/failure statistics
56 self.ignoremetrics = set()
57 self.totalcnt = 0
58 self.passedcnt = 0
60 self.errlist = list()
63 self.pctgmetrics = set() # Percentage rule
66 self.datafname = datafname
67 self.debug = debug
68 self.fullrulefname = fullrulefname
70 def __set_metrics(self, metrics=''): argument
76 def read_json(self, filename: str) -> dict: argument
86 def json_dump(self, data, output_file): argument
97 def get_results(self, idx: int = 0): argument
98 return self.results.get(idx)
100 def get_bounds(self, lb, ub, error, alias={}, ridx: int = 0) -> list: argument
103 If missing lb, use 0.0; missing ub, use float('inf); missing error, use self.tolerance.
109 upper bound, return -1 if the upper bound is a metric value and is not collected
121 vall = self.get_value(alias[ub], ridx)
132 ubv = get_bound_value(ub, -1, ridx)
134 t = get_bound_value(error, self.tolerance, ridx)
141 def get_value(self, name: str, ridx: int = 0) -> list: argument
143 Get value of the metric from self.results.
144 … If result of this metric is not provided, the metric name will be added into self.ignoremetics.
145 All future test(s) on this metric will fail.
148 @returns: list with value found in self.results; list is empty when value is not found.
151 data = self.results[ridx] if ridx in self.results else self.results[0]
152 if name not in self.ignoremetrics:
158 self.ignoremetrics.add(name)
161 def check_bound(self, val, lb, ub, err): argument
162 return True if val <= ub + err and val >= lb - err else False
165 def pos_val_test(self): argument
167 Check if metrics value are non-negative.
168 One metric is counted as one test.
170 Metrics with negative value will be added into self.ignoremetrics.
176 results = self.get_results()
188 # than 20 metrics failed positive test.
191 self.second_test(rerun, second_results)
200 self.ignoremetrics.update(negmetric.keys())
201 self.errlist.extend(
202 … [TestError([m], self.workloads[self.wlidx], negmetric[m], 0) for m in negmetric.keys()])
206 def evaluate_formula(self, formula: str, alias: dict, ridx: int = 0): argument
212 @returns: value of the formula is success; -1 if the one or more metric value not provided
222 if i+1 == len(formula) or formula[i] in ('+', '-', '*', '/'):
225 v = self.get_value(s, ridx)
231 stack[-1] = stack[-1] * v
233 stack[-1] = stack[-1] / v
234 elif sign == '-':
235 stack.append(-v[0])
244 return -1, "Metric value missing: "+','.join(errs)
250 def relationship_test(self, rule: dict): argument
254 One rule is counted as ont test.
262 lbv, ubv, t = self.get_bounds(
264 val, f = self.evaluate_formula(
276 if val == -1:
277self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [],
279 elif not self.check_bound(val, lbv, ubv, t):
280self.errlist.append(TestError([m['Name'] for m in rule['Metrics']], self.workloads[self.wlidx], [v…
283 self.passedcnt += 1
284 self.totalcnt += 1
288 # Single Metric Test
289 def single_test(self, rule: dict): argument
293 One metric is counted as one test in this type of test.
296 This test updates self.total_cnt.
300 lbv, ubv, t = self.get_bounds(
309 result = self.get_value(m['Name'])
310 … if len(result) > 0 and self.check_bound(result[0], lbv, ubv, t) or m['Name'] in self.skiplist:
318 self.second_test(rerun, second_results)
322 if self.check_bound(val, lbv, ubv, t):
327 self.results[0][name] = val
329 self.totalcnt += totalcnt
330 self.passedcnt += passcnt
332 self.errlist.extend([TestError([name], self.workloads[self.wlidx], val,
337 def create_report(self): argument
341 print(self.errlist)
343 if self.debug:
344 allres = [{"Workload": self.workloads[i], "Results": self.allresults[i]}
345 for i in range(0, len(self.workloads))]
346 self.json_dump(allres, self.datafname)
348 def check_rule(self, testtype, metric_list): argument
358 if m['Name'] not in self.metrics:
363 def convert(self, data: list, metricvalues: dict): argument
365 Convert collected metric data from the -j output to dict of {metric_name:value}.
370 … if "metric-unit" in result and result["metric-unit"] != "(null)" and result["metric-unit"] != "":
371 … name = result["metric-unit"].split(" ")[1] if len(result["metric-unit"].split(" ")) > 1 \
372 else result["metric-unit"]
373 metricvalues[name.lower()] = float(result["metric-value"])
378 def _run_perf(self, metric, workload: str): argument
380 command = [tool, 'stat', '-j', '-M', f"{metric}", "-a"]
384 cmd = subprocess.run(command, stderr=subprocess.PIPE, encoding='utf-8')
390 def collect_perf(self, workload: str): argument
392 Collect metric data with "perf stat -M" on given workload with -a and -j.
394 self.results = dict()
398 if self.collectlist != "":
399 collectlist[0] = {x for x in self.collectlist.split(",")}
401 collectlist[0] = set(list(self.metrics))
403 for rule in self.rules:
416 data = self._run_perf(metric, wl)
417 if idx not in self.results:
418 self.results[idx] = dict()
419 self.convert(data, self.results[idx])
422 def second_test(self, collectlist, second_results): argument
423 workload = self.workloads[self.wlidx]
425 data = self._run_perf(metric, workload)
426 self.convert(data, second_results)
431 def parse_perf_metrics(self): argument
437 command = ['perf', 'list', '-j', '--details', 'metrics']
439 stderr=subprocess.PIPE, encoding='utf-8')
447 self.metrics.add(name)
449 self.pctgmetrics.add(name.lower())
456 def remove_unsupported_rules(self, rules): argument
461 if m["Name"] in self.skiplist or m["Name"] not in self.metrics:
468 def create_rules(self): argument
476 data = self.read_json(self.rulefname)
478 self.skiplist = set([name.lower() for name in data['SkipList']])
479 self.rules = self.remove_unsupported_rules(rules)
484 'ErrorThreshold': self.tolerance,
486 'Metrics': [{'Name': m.lower()} for m in self.pctgmetrics]}
487 self.rules.append(pctgrule)
489 # Re-index all rules to avoid repeated RuleIndex
491 for r in self.rules:
495 if self.debug:
496 # TODO: need to test and generate file name correctly
497 data = {'RelationshipRules': self.rules, 'SupportedMetrics': [
498 {"MetricName": name} for name in self.metrics]}
499 self.json_dump(data, self.fullrulefname)
504 def _storewldata(self, key): argument
507 @param key: key to the dictionaries (index of self.workloads).
509 self.allresults[key] = self.results
510 self.alltotalcnt[key] = self.totalcnt
511 self.allpassedcnt[key] = self.passedcnt
514 def _init_data(self): argument
518 self.results = dict()
519 self.ignoremetrics = set()
520 self.errlist = list()
521 self.totalcnt = 0
522 self.passedcnt = 0
524 def test(self): member in Validator
526 The real entry point of the test framework.
531 … In the test process, it passes through each rule and launch correct test function bases on the
536 if not self.collectlist:
537 self.parse_perf_metrics()
538 if not self.metrics:
541 self.create_rules()
542 for i in range(0, len(self.workloads)):
543 self.wlidx = i
544 self._init_data()
545 self.collect_perf(self.workloads[i])
546 # Run positive value test
547 self.pos_val_test()
548 for r in self.rules:
551 if not self.check_rule(testtype, r['Metrics']):
554 self.relationship_test(r)
556 self.single_test(r)
558 print("Unsupported Test Type: ", testtype)
559 print("Workload: ", self.workloads[i])
560 print("Total Test Count: ", self.totalcnt)
561 print("Passed Test Count: ", self.passedcnt)
562 self._storewldata(i)
563 self.create_report()
564 return len(self.errlist) > 0
568 def main() -> None:
573 "-rule", help="Base validation rule file", required=True)
575 "-output_dir", help="Path for validator output file, report file", required=True)
576 parser.add_argument("-debug", help="Debug run, save intermediate data to files",
579 "-wl", help="Workload to run while data collection", default="true")
580 parser.add_argument("-m", help="Metric list to validate", default="")
590 ret = validator.test()