1# Testing utilities 2# Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import os 9import socket 10import struct 11import subprocess 12import time 13import remotehost 14import logging 15import re 16logger = logging.getLogger() 17import hostapd 18 19def get_ifnames(): 20 ifnames = [] 21 with open("/proc/net/dev", "r") as f: 22 lines = f.readlines() 23 for l in lines: 24 val = l.split(':', 1) 25 if len(val) == 2: 26 ifnames.append(val[0].strip(' ')) 27 return ifnames 28 29class HwsimSkip(Exception): 30 def __init__(self, reason): 31 self.reason = reason 32 def __str__(self): 33 return self.reason 34 35def long_duration_test(func): 36 func.long_duration_test = True 37 return func 38 39class fail_test(object): 40 _test_fail = 'TEST_FAIL' 41 _get_fail = 'GET_FAIL' 42 43 def __init__(self, dev, count, funcs, *args): 44 self._dev = dev 45 self._funcs = [(count, funcs)] 46 47 args = list(args) 48 while args: 49 count = args.pop(0) 50 funcs = args.pop(0) 51 self._funcs.append((count, funcs)) 52 def __enter__(self): 53 patterns = ' '.join(['%d:%s' % (c, f) for c, f in self._funcs]) 54 cmd = '%s %s' % (self._test_fail, patterns) 55 if "OK" not in self._dev.request(cmd): 56 raise HwsimSkip("TEST_FAIL not supported") 57 def __exit__(self, type, value, traceback): 58 pending = self._dev.request(self._get_fail) 59 if type is None: 60 expected = ' '.join(['0:%s' % f for c, f in self._funcs]) 61 if pending != expected: 62 # Ensure the failure cannot trigger in the future 63 self._dev.request('%s 0:' % self._test_fail) 64 raise Exception("Not all failures triggered (pending: %s)" % pending) 65 else: 66 logger.info("Pending failures at time of exception: %s" % pending) 67 68class alloc_fail(fail_test): 69 _test_fail = 'TEST_ALLOC_FAIL' 70 _get_fail = 'GET_ALLOC_FAIL' 71 72def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40, 73 timeout=0.05): 74 for i in range(0, max_iter): 75 if dev.request(cmd).startswith("0:"): 76 break 77 if i == max_iter - 1: 78 raise Exception(note) 79 time.sleep(timeout) 80 81def require_under_vm(): 82 if os.getenv('VM') != 'VM': 83 raise HwsimSkip("Not running under VM") 84 85def iface_is_in_bridge(bridge, ifname): 86 fname = "/sys/class/net/"+ifname+"/brport/bridge" 87 if not os.path.exists(fname): 88 return False 89 if not os.path.islink(fname): 90 return False 91 truebridge = os.path.basename(os.readlink(fname)) 92 if bridge == truebridge: 93 return True 94 return False 95 96def skip_with_fips(dev, reason="Not supported in FIPS mode"): 97 res = dev.get_capability("fips") 98 if res and 'FIPS' in res: 99 raise HwsimSkip(reason) 100 101def check_ext_key_id_capa(dev): 102 res = dev.get_driver_status_field('capa.flags') 103 if (int(res, 0) & 0x8000000000000000) == 0: 104 raise HwsimSkip("Extended Key ID not supported") 105 106def skip_without_tkip(dev): 107 res = dev.get_capability("fips") 108 if "TKIP" not in dev.get_capability("pairwise") or \ 109 "TKIP" not in dev.get_capability("group"): 110 raise HwsimSkip("Cipher TKIP not supported") 111 112def check_wep_capa(dev): 113 if "WEP40" not in dev.get_capability("group"): 114 raise HwsimSkip("WEP not supported") 115 116def check_sae_capab(dev): 117 if "SAE" not in dev.get_capability("auth_alg"): 118 raise HwsimSkip("SAE not supported") 119 120def check_sae_pk_capab(dev): 121 capab = dev.get_capability("sae") 122 if capab is None or "PK" not in capab: 123 raise HwsimSkip("SAE-PK not supported") 124 125def check_owe_capab(dev): 126 if "OWE" not in dev.get_capability("key_mgmt"): 127 raise HwsimSkip("OWE not supported") 128 129def check_erp_capa(dev): 130 capab = dev.get_capability("erp") 131 if not capab or 'ERP' not in capab: 132 raise HwsimSkip("ERP not supported in the build") 133 134def check_fils_capa(dev): 135 capa = dev.get_capability("fils") 136 if capa is None or "FILS" not in capa: 137 raise HwsimSkip("FILS not supported") 138 139def check_fils_sk_pfs_capa(dev): 140 capa = dev.get_capability("fils") 141 if capa is None or "FILS-SK-PFS" not in capa: 142 raise HwsimSkip("FILS-SK-PFS not supported") 143 144def check_imsi_privacy_support(dev): 145 tls = dev.request("GET tls_library") 146 if tls.startswith("OpenSSL"): 147 return 148 raise HwsimSkip("IMSI privacy not supported with this TLS library: " + tls) 149 150def check_tls_tod(dev): 151 tls = dev.request("GET tls_library") 152 if not tls.startswith("OpenSSL") and \ 153 not tls.startswith("wolfSSL") and \ 154 not tls.startswith("internal"): 155 raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls) 156 157def vht_supported(): 158 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) 159 out, err = cmd.communicate() 160 reg = out.decode() 161 if "@ 80)" in reg or "@ 160)" in reg: 162 return True 163 return False 164 165def eht_320mhz_supported(): 166 cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE) 167 out, err = cmd.communicate() 168 reg = out.decode() 169 if "@ 320)" in reg: 170 return True 171 return False 172 173def he_6ghz_supported(freq=5975): 174 cmd = subprocess.Popen(["iw", "reg", "get"], 175 stdout=subprocess.PIPE) 176 out, err = cmd.communicate() 177 reg_rules = out.decode().splitlines() 178 for rule in reg_rules: 179 m = re.search(r"\s*\(\d+\s*-\s*\d+", rule) 180 if not m: 181 continue 182 freqs = re.findall(r"\d+", m.group(0)) 183 if int(freqs[0]) <= freq and freq <= int(freqs[1]): 184 return True 185 186 return False 187 188# This function checks whether the provided dev, which may be either 189# WpaSupplicant or Hostapd supports CSA. 190def csa_supported(dev): 191 res = dev.get_driver_status() 192 if (int(res['capa.flags'], 0) & 0x80000000) == 0: 193 raise HwsimSkip("CSA not supported") 194 195def get_phy(ap, ifname=None): 196 phy = "phy3" 197 try: 198 hostname = ap['hostname'] 199 except: 200 hostname = None 201 host = remotehost.Host(hostname) 202 203 if ifname == None: 204 ifname = ap['ifname'] 205 status, buf = host.execute(["iw", "dev", ifname, "info"]) 206 if status != 0: 207 raise Exception("iw " + ifname + " info failed") 208 lines = buf.split("\n") 209 for line in lines: 210 if "wiphy" in line: 211 words = line.split() 212 phy = "phy" + words[1] 213 break 214 return phy 215 216def parse_ie(buf): 217 ret = {} 218 data = binascii.unhexlify(buf) 219 while len(data) >= 2: 220 ie, elen = struct.unpack('BB', data[0:2]) 221 data = data[2:] 222 if elen > len(data): 223 break 224 ret[ie] = data[0:elen] 225 data = data[elen:] 226 return ret 227 228def wait_regdom_changes(dev): 229 for i in range(10): 230 ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1) 231 if ev is None: 232 break 233 234def clear_country(dev): 235 logger.info("Try to clear country") 236 id = dev[1].add_network() 237 dev[1].set_network(id, "mode", "2") 238 dev[1].set_network_quoted(id, "ssid", "country-clear") 239 dev[1].set_network(id, "key_mgmt", "NONE") 240 dev[1].set_network(id, "frequency", "2412") 241 dev[1].set_network(id, "scan_freq", "2412") 242 dev[1].select_network(id) 243 ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"]) 244 if ev: 245 dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412") 246 dev[1].request("DISCONNECT") 247 dev[0].wait_disconnected() 248 dev[0].request("DISCONNECT") 249 dev[0].request("ABORT_SCAN") 250 time.sleep(1) 251 dev[0].dump_monitor() 252 dev[1].dump_monitor() 253 254def clear_regdom(hapd, dev, count=1): 255 disable_hapd(hapd) 256 clear_regdom_dev(dev, count) 257 258def disable_hapd(hapd): 259 if hapd: 260 hapd.request("DISABLE") 261 time.sleep(0.1) 262 263def clear_regdom_dev(dev, count=1): 264 for i in range(count): 265 dev[i].request("DISCONNECT") 266 for i in range(count): 267 dev[i].disconnect_and_stop_scan() 268 dev[0].cmd_execute(['iw', 'reg', 'set', '00']) 269 wait_regdom_changes(dev[0]) 270 country = dev[0].get_driver_status_field("country") 271 logger.info("Country code at the end: " + country) 272 if country != "00": 273 clear_country(dev) 274 for i in range(count): 275 dev[i].flush_scan_cache() 276 277def radiotap_build(): 278 radiotap_payload = struct.pack('BB', 0x08, 0) 279 radiotap_payload += struct.pack('BB', 0, 0) 280 radiotap_payload += struct.pack('BB', 0, 0) 281 radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload), 282 0xc002) 283 return radiotap_hdr + radiotap_payload 284 285def start_monitor(ifname, freq=2412): 286 subprocess.check_call(["iw", ifname, "set", "type", "monitor"]) 287 subprocess.call(["ip", "link", "set", "dev", ifname, "up"]) 288 subprocess.check_call(["iw", ifname, "set", "freq", str(freq)]) 289 290 ETH_P_ALL = 3 291 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 292 socket.htons(ETH_P_ALL)) 293 sock.bind((ifname, 0)) 294 sock.settimeout(0.5) 295 return sock 296 297def stop_monitor(ifname): 298 subprocess.call(["ip", "link", "set", "dev", ifname, "down"]) 299 subprocess.call(["iw", ifname, "set", "type", "managed"]) 300 301def clear_scan_cache(apdev): 302 ifname = apdev['ifname'] 303 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up']) 304 hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412', 305 'flush']) 306 time.sleep(0.1) 307 hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down']) 308 309def set_world_reg(apdev0=None, apdev1=None, dev0=None): 310 if apdev0: 311 hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00']) 312 if apdev1: 313 hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00']) 314 if dev0: 315 dev0.cmd_execute(['iw', 'reg', 'set', '00']) 316 time.sleep(0.1) 317 318def sysctl_write(val): 319 subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w')) 320 321def var_arg_call(fn, dev, apdev, params): 322 if fn.__code__.co_argcount > 2: 323 return fn(dev, apdev, params) 324 elif fn.__code__.co_argcount > 1: 325 return fn(dev, apdev) 326 return fn(dev) 327 328def cloned_wrapper(wrapper, fn): 329 # we need the name set right for selecting / printing etc. 330 wrapper.__name__ = fn.__name__ 331 wrapper.__doc__ = fn.__doc__ 332 # reparent to the right module for module filtering 333 wrapper.__module__ = fn.__module__ 334 return wrapper 335 336def disable_ipv6(fn): 337 def wrapper(dev, apdev, params): 338 require_under_vm() 339 try: 340 sysctl_write('net.ipv6.conf.all.disable_ipv6=1') 341 sysctl_write('net.ipv6.conf.default.disable_ipv6=1') 342 var_arg_call(fn, dev, apdev, params) 343 finally: 344 sysctl_write('net.ipv6.conf.all.disable_ipv6=0') 345 sysctl_write('net.ipv6.conf.default.disable_ipv6=0') 346 return cloned_wrapper(wrapper, fn) 347