1 # Testing utilities
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import binascii
8 import os
9 import socket
10 import struct
11 import subprocess
12 import time
13 import remotehost
14 import logging
15 import re
16 logger = logging.getLogger()
17 import hostapd
18 
19 def get_ifnames():
20     ifnames = []
21     with open("/proc/net/dev", "r") as f:
22         lines = f.readlines()
23         for l in lines:
24             val = l.split(':', 1)
25             if len(val) == 2:
26                 ifnames.append(val[0].strip(' '))
27     return ifnames
28 
29 class HwsimSkip(Exception):
30     def __init__(self, reason):
31         self.reason = reason
32     def __str__(self):
33         return self.reason
34 
35 def long_duration_test(func):
36     func.long_duration_test = True
37     return func
38 
39 class fail_test(object):
40     _test_fail = 'TEST_FAIL'
41     _get_fail = 'GET_FAIL'
42 
43     def __init__(self, dev, count, funcs, *args):
44         self._dev = dev
45         self._funcs = [(count, funcs)]
46 
47         args = list(args)
48         while args:
49             count = args.pop(0)
50             funcs = args.pop(0)
51             self._funcs.append((count, funcs))
52     def __enter__(self):
53         patterns = ' '.join(['%d:%s' % (c, f) for c, f in self._funcs])
54         cmd = '%s %s' % (self._test_fail, patterns)
55         if "OK" not in self._dev.request(cmd):
56             raise HwsimSkip("TEST_FAIL not supported")
57     def __exit__(self, type, value, traceback):
58         pending = self._dev.request(self._get_fail)
59         if type is None:
60             expected = ' '.join(['0:%s' % f for c, f in self._funcs])
61             if pending != expected:
62                 # Ensure the failure cannot trigger in the future
63                 self._dev.request('%s 0:' % self._test_fail)
64                 raise Exception("Not all failures triggered (pending: %s)" % pending)
65         else:
66             logger.info("Pending failures at time of exception: %s" % pending)
67 
68 class alloc_fail(fail_test):
69     _test_fail = 'TEST_ALLOC_FAIL'
70     _get_fail = 'GET_ALLOC_FAIL'
71 
72 def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40,
73 		      timeout=0.05):
74     for i in range(0, max_iter):
75         if dev.request(cmd).startswith("0:"):
76             break
77         if i == max_iter - 1:
78             raise Exception(note)
79         time.sleep(timeout)
80 
81 def require_under_vm():
82     if os.getenv('VM') != 'VM':
83         raise HwsimSkip("Not running under VM")
84 
85 def iface_is_in_bridge(bridge, ifname):
86     fname = "/sys/class/net/"+ifname+"/brport/bridge"
87     if not os.path.exists(fname):
88         return False
89     if not os.path.islink(fname):
90         return False
91     truebridge = os.path.basename(os.readlink(fname))
92     if bridge == truebridge:
93         return True
94     return False
95 
96 def skip_with_fips(dev, reason="Not supported in FIPS mode"):
97     res = dev.get_capability("fips")
98     if res and 'FIPS' in res:
99         raise HwsimSkip(reason)
100 
101 def check_ext_key_id_capa(dev):
102     res = dev.get_driver_status_field('capa.flags')
103     if (int(res, 0) & 0x8000000000000000) == 0:
104         raise HwsimSkip("Extended Key ID not supported")
105 
106 def skip_without_tkip(dev):
107     res = dev.get_capability("fips")
108     if "TKIP" not in dev.get_capability("pairwise") or \
109        "TKIP" not in dev.get_capability("group"):
110         raise HwsimSkip("Cipher TKIP not supported")
111 
112 def check_wep_capa(dev):
113     if "WEP40" not in dev.get_capability("group"):
114         raise HwsimSkip("WEP not supported")
115 
116 def check_sae_capab(dev):
117     if "SAE" not in dev.get_capability("auth_alg"):
118         raise HwsimSkip("SAE not supported")
119 
120 def check_sae_pk_capab(dev):
121     capab = dev.get_capability("sae")
122     if capab is None or "PK" not in capab:
123         raise HwsimSkip("SAE-PK not supported")
124 
125 def check_owe_capab(dev):
126     if "OWE" not in dev.get_capability("key_mgmt"):
127         raise HwsimSkip("OWE not supported")
128 
129 def check_erp_capa(dev):
130     capab = dev.get_capability("erp")
131     if not capab or 'ERP' not in capab:
132         raise HwsimSkip("ERP not supported in the build")
133 
134 def check_fils_capa(dev):
135     capa = dev.get_capability("fils")
136     if capa is None or "FILS" not in capa:
137         raise HwsimSkip("FILS not supported")
138 
139 def check_fils_sk_pfs_capa(dev):
140     capa = dev.get_capability("fils")
141     if capa is None or "FILS-SK-PFS" not in capa:
142         raise HwsimSkip("FILS-SK-PFS not supported")
143 
144 def check_imsi_privacy_support(dev):
145     tls = dev.request("GET tls_library")
146     if tls.startswith("OpenSSL"):
147         return
148     raise HwsimSkip("IMSI privacy not supported with this TLS library: " + tls)
149 
150 def check_tls_tod(dev):
151     tls = dev.request("GET tls_library")
152     if not tls.startswith("OpenSSL") and \
153        not tls.startswith("wolfSSL") and \
154        not tls.startswith("internal"):
155         raise HwsimSkip("TLS TOD-TOFU/STRICT not supported with this TLS library: " + tls)
156 
157 def vht_supported():
158     cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
159     out, err = cmd.communicate()
160     reg = out.decode()
161     if "@ 80)" in reg or "@ 160)" in reg:
162         return True
163     return False
164 
165 def eht_320mhz_supported():
166     cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
167     out, err = cmd.communicate()
168     reg = out.decode()
169     if "@ 320)" in reg:
170         return True
171     return False
172 
173 def he_6ghz_supported(freq=5975):
174     cmd = subprocess.Popen(["iw", "reg", "get"],
175                            stdout=subprocess.PIPE)
176     out, err = cmd.communicate()
177     reg_rules = out.decode().splitlines()
178     for rule in reg_rules:
179         m = re.search(r"\s*\(\d+\s*-\s*\d+", rule)
180         if not m:
181             continue
182         freqs = re.findall(r"\d+", m.group(0))
183         if int(freqs[0]) <= freq and freq <= int(freqs[1]):
184             return True
185 
186     return False
187 
188 # This function checks whether the provided dev, which may be either
189 # WpaSupplicant or Hostapd supports CSA.
190 def csa_supported(dev):
191     res = dev.get_driver_status()
192     if (int(res['capa.flags'], 0) & 0x80000000) == 0:
193         raise HwsimSkip("CSA not supported")
194 
195 def get_phy(ap, ifname=None):
196     phy = "phy3"
197     try:
198         hostname = ap['hostname']
199     except:
200         hostname = None
201     host = remotehost.Host(hostname)
202 
203     if ifname == None:
204         ifname = ap['ifname']
205     status, buf = host.execute(["iw", "dev", ifname, "info"])
206     if status != 0:
207         raise Exception("iw " + ifname + " info failed")
208     lines = buf.split("\n")
209     for line in lines:
210         if "wiphy" in line:
211             words = line.split()
212             phy = "phy" + words[1]
213             break
214     return phy
215 
216 def parse_ie(buf):
217     ret = {}
218     data = binascii.unhexlify(buf)
219     while len(data) >= 2:
220         ie, elen = struct.unpack('BB', data[0:2])
221         data = data[2:]
222         if elen > len(data):
223             break
224         ret[ie] = data[0:elen]
225         data = data[elen:]
226     return ret
227 
228 def wait_regdom_changes(dev):
229     for i in range(10):
230         ev = dev.wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.1)
231         if ev is None:
232             break
233 
234 def clear_country(dev):
235     logger.info("Try to clear country")
236     id = dev[1].add_network()
237     dev[1].set_network(id, "mode", "2")
238     dev[1].set_network_quoted(id, "ssid", "country-clear")
239     dev[1].set_network(id, "key_mgmt", "NONE")
240     dev[1].set_network(id, "frequency", "2412")
241     dev[1].set_network(id, "scan_freq", "2412")
242     dev[1].select_network(id)
243     ev = dev[1].wait_event(["CTRL-EVENT-CONNECTED"])
244     if ev:
245         dev[0].connect("country-clear", key_mgmt="NONE", scan_freq="2412")
246         dev[1].request("DISCONNECT")
247         dev[0].wait_disconnected()
248         dev[0].request("DISCONNECT")
249         dev[0].request("ABORT_SCAN")
250         time.sleep(1)
251         dev[0].dump_monitor()
252         dev[1].dump_monitor()
253 
254 def clear_regdom(hapd, dev, count=1):
255     disable_hapd(hapd)
256     clear_regdom_dev(dev, count)
257 
258 def disable_hapd(hapd):
259     if hapd:
260         hapd.request("DISABLE")
261         time.sleep(0.1)
262 
263 def clear_regdom_dev(dev, count=1):
264     for i in range(count):
265         dev[i].request("DISCONNECT")
266     for i in range(count):
267         dev[i].disconnect_and_stop_scan()
268     dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
269     wait_regdom_changes(dev[0])
270     country = dev[0].get_driver_status_field("country")
271     logger.info("Country code at the end: " + country)
272     if country != "00":
273         clear_country(dev)
274     for i in range(count):
275         dev[i].flush_scan_cache()
276 
277 def radiotap_build():
278     radiotap_payload = struct.pack('BB', 0x08, 0)
279     radiotap_payload += struct.pack('BB', 0, 0)
280     radiotap_payload += struct.pack('BB', 0, 0)
281     radiotap_hdr = struct.pack('<BBHL', 0, 0, 8 + len(radiotap_payload),
282                                0xc002)
283     return radiotap_hdr + radiotap_payload
284 
285 def start_monitor(ifname, freq=2412):
286     subprocess.check_call(["iw", ifname, "set", "type", "monitor"])
287     subprocess.call(["ip", "link", "set", "dev", ifname, "up"])
288     subprocess.check_call(["iw", ifname, "set", "freq", str(freq)])
289 
290     ETH_P_ALL = 3
291     sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
292                          socket.htons(ETH_P_ALL))
293     sock.bind((ifname, 0))
294     sock.settimeout(0.5)
295     return sock
296 
297 def stop_monitor(ifname):
298     subprocess.call(["ip", "link", "set", "dev", ifname, "down"])
299     subprocess.call(["iw", ifname, "set", "type", "managed"])
300 
301 def clear_scan_cache(apdev):
302     ifname = apdev['ifname']
303     hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'up'])
304     hostapd.cmd_execute(apdev, ['iw', ifname, 'scan', 'trigger', 'freq', '2412',
305                                 'flush'])
306     time.sleep(0.1)
307     hostapd.cmd_execute(apdev, ['ifconfig', ifname, 'down'])
308 
309 def set_world_reg(apdev0=None, apdev1=None, dev0=None):
310     if apdev0:
311         hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', '00'])
312     if apdev1:
313         hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', '00'])
314     if dev0:
315         dev0.cmd_execute(['iw', 'reg', 'set', '00'])
316     time.sleep(0.1)
317 
318 def sysctl_write(val):
319     subprocess.call(['sysctl', '-w', val], stdout=open('/dev/null', 'w'))
320 
321 def var_arg_call(fn, dev, apdev, params):
322     if fn.__code__.co_argcount > 2:
323         return fn(dev, apdev, params)
324     elif fn.__code__.co_argcount > 1:
325         return fn(dev, apdev)
326     return fn(dev)
327 
328 def cloned_wrapper(wrapper, fn):
329     # we need the name set right for selecting / printing etc.
330     wrapper.__name__ = fn.__name__
331     wrapper.__doc__ = fn.__doc__
332     # reparent to the right module for module filtering
333     wrapper.__module__ = fn.__module__
334     return wrapper
335 
336 def disable_ipv6(fn):
337     def wrapper(dev, apdev, params):
338         require_under_vm()
339         try:
340             sysctl_write('net.ipv6.conf.all.disable_ipv6=1')
341             sysctl_write('net.ipv6.conf.default.disable_ipv6=1')
342             var_arg_call(fn, dev, apdev, params)
343         finally:
344             sysctl_write('net.ipv6.conf.all.disable_ipv6=0')
345             sysctl_write('net.ipv6.conf.default.disable_ipv6=0')
346     return cloned_wrapper(wrapper, fn)
347