1# MBO tests
2# Copyright (c) 2016, Intel Deutschland GmbH
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7from remotehost import remote_compatible
8import logging
9logger = logging.getLogger()
10
11import hostapd
12import os
13import time
14
15import hostapd
16from tshark import run_tshark
17from utils import *
18
19def set_reg(country_code, apdev0=None, apdev1=None, dev0=None):
20    if apdev0:
21        hostapd.cmd_execute(apdev0, ['iw', 'reg', 'set', country_code])
22    if apdev1:
23        hostapd.cmd_execute(apdev1, ['iw', 'reg', 'set', country_code])
24    if dev0:
25        dev0.cmd_execute(['iw', 'reg', 'set', country_code])
26
27def run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, country, freq_list=None,
28                              disable_ht=False, disable_vht=False):
29    """MBO and supported operating classes"""
30    addr = dev[0].own_addr()
31
32    res2 = None
33    res5 = None
34
35    dev[0].flush_scan_cache()
36    dev[0].dump_monitor()
37
38    logger.info("Country: " + country)
39    dev[0].note("Setting country code " + country)
40    set_reg(country, apdev[0], apdev[1], dev[0])
41    for j in range(5):
42        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=5)
43        if ev is None:
44            raise Exception("No regdom change event")
45        if "alpha2=" + country in ev:
46            break
47    dev[0].dump_monitor()
48    dev[1].dump_monitor()
49    dev[2].dump_monitor()
50    _disable_ht = "1" if disable_ht else "0"
51    _disable_vht = "1" if disable_vht else "0"
52    if hapd:
53        hapd.set("country_code", country)
54        hapd.enable()
55        dev[0].scan_for_bss(hapd.own_addr(), 5180, force_scan=True)
56        dev[0].connect("test-wnm-mbo", key_mgmt="NONE", scan_freq="5180",
57                       freq_list=freq_list, disable_ht=_disable_ht,
58                       disable_vht=_disable_vht)
59        sta = hapd.get_sta(addr)
60        res5 = sta['supp_op_classes'][2:]
61        dev[0].wait_regdom(country_ie=True)
62        time.sleep(0.1)
63        hapd.disable()
64        time.sleep(0.1)
65        dev[0].request("REMOVE_NETWORK all")
66        dev[0].request("ABORT_SCAN")
67        dev[0].wait_disconnected()
68        dev[0].dump_monitor()
69
70    hapd2.set("country_code", country)
71    hapd2.enable()
72    dev[0].scan_for_bss(hapd2.own_addr(), 2412, force_scan=True)
73    dev[0].connect("test-wnm-mbo-2", key_mgmt="NONE", scan_freq="2412",
74                   freq_list=freq_list, disable_ht=_disable_ht,
75                   disable_vht=_disable_vht)
76    sta = hapd2.get_sta(addr)
77    res2 = sta['supp_op_classes'][2:]
78    dev[0].wait_regdom(country_ie=True)
79    time.sleep(0.1)
80    hapd2.disable()
81    time.sleep(0.1)
82    dev[0].request("REMOVE_NETWORK all")
83    dev[0].request("ABORT_SCAN")
84    dev[0].wait_disconnected()
85    dev[0].dump_monitor()
86
87    return res2, res5
88
89def run_mbo_supp_oper_class(dev, apdev, country, expected, inc5,
90                            freq_list=None, disable_ht=False,
91                            disable_vht=False, alt_expected=None):
92    if inc5:
93        params = {'ssid': "test-wnm-mbo",
94                  'mbo': '1',
95                  "country_code": "US",
96                  'ieee80211d': '1',
97                  "ieee80211n": "1",
98                  "hw_mode": "a",
99                  "channel": "36"}
100        hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
101    else:
102        hapd = None
103
104    params = {'ssid': "test-wnm-mbo-2",
105              'mbo': '1',
106              "country_code": "US",
107              'ieee80211d': '1',
108              "ieee80211n": "1",
109              "hw_mode": "g",
110              "channel": "1"}
111    hapd2 = hostapd.add_ap(apdev[1], params, no_enable=True)
112
113    try:
114        dev[0].request("STA_AUTOCONNECT 0")
115        res2, res5 = run_mbo_supp_oper_classes(dev, apdev, hapd, hapd2, country,
116                                               freq_list=freq_list,
117                                               disable_ht=disable_ht,
118                                               disable_vht=disable_vht)
119    finally:
120        dev[0].dump_monitor()
121        dev[0].request("STA_AUTOCONNECT 1")
122        wait_regdom_changes(dev[0])
123        country1 = dev[0].get_driver_status_field("country")
124        logger.info("Country code at the end (1): " + country1)
125        set_reg("00", apdev[0], apdev[1], dev[0])
126        country2 = dev[0].get_driver_status_field("country")
127        logger.info("Country code at the end (2): " + country2)
128        for i in range(5):
129            ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
130            if ev is None or "init=USER type=WORLD" in ev:
131                break
132        wait_regdom_changes(dev[0])
133        country3 = dev[0].get_driver_status_field("country")
134        logger.info("Country code at the end (3): " + country3)
135        if country3 != "00":
136            clear_country(dev)
137
138    # For now, allow operating class 129 to be missing since not all
139    # installed regdb files include the 160 MHz channels.
140    expected2 = expected.replace('808182', '8082')
141    # For now, allow operating classes 121-123 to be missing since not all
142    # installed regdb files include the related US DFS channels.
143    expected2 = expected2.replace('78797a7b7c', '787c')
144    expected3 = expected
145    # For now, allow operating classes 124-127 to be missing for Finland
146    # since they were added only recently in regdb.
147    if country == "FI":
148        expected3 = expected3.replace("7b7c7d7e7f80", "7b80")
149    if res2 != expected and res2 != expected2 and res2 != expected3 and res2 != alt_expected:
150        raise Exception("Unexpected supp_op_class string (country=%s, 2.4 GHz): %s (expected: %s)" % (country, res2, expected))
151    if inc5 and res5 != expected and res5 != expected2 and res5 != expected3 and res5 != alt_expected:
152        raise Exception("Unexpected supp_op_class string (country=%s, 5 GHz): %s (expected: %s)" % (country, res5, expected))
153
154def test_mbo_supp_oper_classes_za(dev, apdev):
155    """MBO and supported operating classes (ZA)"""
156    run_mbo_supp_oper_class(dev, apdev, "ZA",
157                            "515354737475767778797a7b8081008280", True)
158
159def test_mbo_supp_oper_classes_fi(dev, apdev):
160    """MBO and supported operating classes (FI)"""
161    run_mbo_supp_oper_class(dev, apdev, "FI",
162                            "515354737475767778797a7b7c7d7e7f8081008280", True,
163                            alt_expected="515354737475767778797a7b7c7d7e7f8081838485860082808785")
164
165def test_mbo_supp_oper_classes_us(dev, apdev):
166    """MBO and supported operating classes (US)"""
167    run_mbo_supp_oper_class(dev, apdev, "US",
168                            "515354737475767778797a7b7c7d7e7f8081008280", True,
169                            alt_expected="515354737475767778797a7b7c7d7e7f808183848586890082808785")
170
171def test_mbo_supp_oper_classes_jp(dev, apdev):
172    """MBO and supported operating classes (JP)"""
173    run_mbo_supp_oper_class(dev, apdev, "JP",
174                            "51525354737475767778797a7b8081008280",
175                            True,
176                            alt_expected="51525354737475767778797a7b8081838485860082808785")
177
178def test_mbo_supp_oper_classes_bd(dev, apdev):
179    """MBO and supported operating classes (BD)"""
180    run_mbo_supp_oper_class(dev, apdev, "BD",
181                            "5153547c7d7e7f80", False)
182
183def test_mbo_supp_oper_classes_sy(dev, apdev):
184    """MBO and supported operating classes (SY)"""
185    run_mbo_supp_oper_class(dev, apdev, "SY",
186                            "515354", False)
187
188def test_mbo_supp_oper_classes_us_freq_list(dev, apdev):
189    """MBO and supported operating classes (US) - freq_list"""
190    run_mbo_supp_oper_class(dev, apdev, "US", "515354", False,
191                            freq_list="2412 2437 2462")
192
193def test_mbo_supp_oper_classes_us_disable_ht(dev, apdev):
194    """MBO and supported operating classes (US) - disable_ht"""
195    run_mbo_supp_oper_class(dev, apdev, "US", "517376797c7d", False,
196                            disable_ht=True, alt_expected="517376797c7d8384858689008785")
197
198def test_mbo_supp_oper_classes_us_disable_vht(dev, apdev):
199    """MBO and supported operating classes (US) - disable_vht"""
200    run_mbo_supp_oper_class(dev, apdev, "US",
201                            "515354737475767778797a7b7c7d7e7f", False,
202                            disable_vht=True,
203                            alt_expected="515354737475767778797a7b7c7d7e7f8384858689008785")
204
205def test_mbo_assoc_disallow(dev, apdev, params):
206    """MBO and association disallowed"""
207    hapd1 = hostapd.add_ap(apdev[0], {"ssid": "MBO", "mbo": "1"})
208    hapd2 = hostapd.add_ap(apdev[1], {"ssid": "MBO", "mbo": "1"})
209
210    logger.debug("Set mbo_assoc_disallow with invalid value")
211    if "FAIL" not in hapd1.request("SET mbo_assoc_disallow 6"):
212        raise Exception("Set mbo_assoc_disallow for AP1 succeeded unexpectedly with value 6")
213
214    logger.debug("Disallow associations to AP1 and allow association to AP2")
215    if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
216        raise Exception("Failed to set mbo_assoc_disallow for AP1")
217    if "OK" not in hapd2.request("SET mbo_assoc_disallow 0"):
218        raise Exception("Failed to set mbo_assoc_disallow for AP2")
219
220    dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
221
222    out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
223                     "wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00",
224                     wait=False)
225    if "Destination address: " + hapd1.own_addr() in out:
226        raise Exception("Association request sent to disallowed AP")
227
228    timestamp = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
229                           "wlan.fc.type_subtype == 0x00",
230                           display=['frame.time'], wait=False)
231
232    logger.debug("Allow associations to AP1 and disallow associations to AP2")
233    if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
234        raise Exception("Failed to set mbo_assoc_disallow for AP1")
235    if "OK" not in hapd2.request("SET mbo_assoc_disallow 1"):
236        raise Exception("Failed to set mbo_assoc_disallow for AP2")
237
238    dev[0].request("DISCONNECT")
239    dev[0].wait_disconnected()
240
241    # Force new scan, so the assoc_disallowed indication is updated */
242    dev[0].request("FLUSH")
243
244    dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
245
246    filter = 'wlan.fc.type == 0 && wlan.fc.type_subtype == 0x00 && frame.time > "' + timestamp.rstrip() + '"'
247    out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
248                     filter, wait=False)
249    if "Destination address: " + hapd2.own_addr() in out:
250        raise Exception("Association request sent to disallowed AP 2")
251
252def test_mbo_assoc_disallow_ignore(dev, apdev):
253    """MBO and ignoring disallowed association"""
254    try:
255        _test_mbo_assoc_disallow_ignore(dev, apdev)
256    finally:
257        dev[0].request("SCAN_INTERVAL 5")
258
259def _test_mbo_assoc_disallow_ignore(dev, apdev):
260    hapd1 = hostapd.add_ap(apdev[0], {"ssid": "MBO", "mbo": "1"})
261    if "OK" not in hapd1.request("SET mbo_assoc_disallow 1"):
262        raise Exception("Failed to set mbo_assoc_disallow for AP1")
263
264    if "OK" not in dev[0].request("SCAN_INTERVAL 1"):
265        raise Exception("Failed to set scan interval")
266    dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412", wait_connect=False)
267    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
268    if ev is None:
269        raise Exception("CTRL-EVENT-NETWORK-NOT-FOUND not seen")
270
271    if "OK" not in dev[0].request("SET ignore_assoc_disallow 1"):
272        raise Exception("Failed to set ignore_assoc_disallow")
273    ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
274    if ev is None:
275        raise Exception("CTRL-EVENT-ASSOC-REJECT not seen")
276    if "status_code=17" not in ev:
277        raise Exception("Unexpected association reject reason: " + ev)
278
279    if "OK" not in hapd1.request("SET mbo_assoc_disallow 0"):
280        raise Exception("Failed to set mbo_assoc_disallow for AP1")
281    dev[0].wait_connected()
282
283def test_mbo_assoc_disallow_change(dev, apdev):
284    """MBO and dynamic association disallowed change with passive scanning"""
285    hapd = hostapd.add_ap(apdev[0], {"ssid": "MBO", "mbo": "1"})
286    id = dev[0].connect("MBO", key_mgmt="NONE", scan_freq="2412")
287    dev[0].request("DISCONNECT")
288    dev[0].wait_disconnected()
289    hapd.set("mbo_assoc_disallow", "1")
290    dev[0].scan_for_bss(hapd.own_addr(), 2412, force_scan=True, passive=True)
291    dev[0].request("RECONNECT")
292    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND",
293                            "CTRL-EVENT-ASSOC-REJECT",
294                            "CTRL-EVENT-CONNECTED"], timeout=20)
295    dev[0].request("DISCONNECT")
296    if ev is None:
297        raise Exception("CTRL-EVENT-NETWORK-NOT-FOUND not seen")
298    if "CTRL-EVENT-NETWORK-NOT-FOUND" not in ev:
299        raise Exception("Unexpected connection result: " + ev)
300
301
302@remote_compatible
303def test_mbo_cell_capa_update(dev, apdev):
304    """MBO cellular data capability update"""
305    ssid = "test-wnm-mbo"
306    params = {'ssid': ssid, 'mbo': '1'}
307    hapd = hostapd.add_ap(apdev[0], params)
308    bssid = apdev[0]['bssid']
309    if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
310        raise Exception("Failed to set STA as cellular data capable")
311
312    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
313
314    addr = dev[0].own_addr()
315    sta = hapd.get_sta(addr)
316    if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
317        raise Exception("mbo_cell_capa missing after association")
318
319    if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
320        raise Exception("Failed to set STA as cellular data not-capable")
321    # Duplicate update for additional code coverage
322    if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
323        raise Exception("Failed to set STA as cellular data not-capable")
324
325    time.sleep(0.2)
326    sta = hapd.get_sta(addr)
327    if 'mbo_cell_capa' not in sta:
328        raise Exception("mbo_cell_capa missing after update")
329    if sta['mbo_cell_capa'] != '3':
330        raise Exception("mbo_cell_capa not updated properly")
331
332@remote_compatible
333def test_mbo_cell_capa_update_pmf(dev, apdev):
334    """MBO cellular data capability update with PMF required"""
335    ssid = "test-wnm-mbo"
336    passphrase = "12345678"
337    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
338    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
339    params["ieee80211w"] = "2"
340    params['mbo'] = '1'
341    hapd = hostapd.add_ap(apdev[0], params)
342    bssid = apdev[0]['bssid']
343    if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
344        raise Exception("Failed to set STA as cellular data capable")
345
346    dev[0].connect(ssid, psk=passphrase, key_mgmt="WPA-PSK-SHA256",
347                   proto="WPA2", ieee80211w="2", scan_freq="2412")
348    hapd.wait_sta()
349
350    addr = dev[0].own_addr()
351    sta = hapd.get_sta(addr)
352    if 'mbo_cell_capa' not in sta or sta['mbo_cell_capa'] != '1':
353        raise Exception("mbo_cell_capa missing after association")
354
355    if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
356        raise Exception("Failed to set STA as cellular data not-capable")
357
358    time.sleep(0.2)
359    sta = hapd.get_sta(addr)
360    if 'mbo_cell_capa' not in sta:
361        raise Exception("mbo_cell_capa missing after update")
362    if sta['mbo_cell_capa'] != '3':
363        raise Exception("mbo_cell_capa not updated properly")
364
365def test_mbo_wnm_token_wrap(dev, apdev):
366    """MBO WNM token wrap around"""
367    ssid = "test-wnm-mbo"
368    params = {'ssid': ssid, 'mbo': '1'}
369    hapd = hostapd.add_ap(apdev[0], params)
370    bssid = apdev[0]['bssid']
371
372    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
373
374    # Trigger transmission of 256 WNM-Notification frames to wrap around the
375    # 8-bit mbo_wnm_token counter.
376    for i in range(128):
377        if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
378            raise Exception("Failed to set STA as cellular data capable")
379        if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
380            raise Exception("Failed to set STA as cellular data not-capable")
381
382@remote_compatible
383def test_mbo_non_pref_chan(dev, apdev):
384    """MBO non-preferred channel list"""
385    ssid = "test-wnm-mbo"
386    params = {'ssid': ssid, 'mbo': '1'}
387    hapd = hostapd.add_ap(apdev[0], params)
388    bssid = apdev[0]['bssid']
389    if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:99"):
390        raise Exception("Invalid non_pref_chan value accepted")
391    if "FAIL" not in dev[0].request("SET non_pref_chan 81:15:200:3"):
392        raise Exception("Invalid non_pref_chan value accepted")
393    if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3 81:7:201:3"):
394        raise Exception("Invalid non_pref_chan value accepted")
395    if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
396        raise Exception("Failed to set non-preferred channel list")
397    if "OK" not in dev[0].request("SET non_pref_chan 81:7:200:1 81:9:100:2"):
398        raise Exception("Failed to set non-preferred channel list")
399
400    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
401
402    addr = dev[0].own_addr()
403    sta = hapd.get_sta(addr)
404    logger.debug("STA: " + str(sta))
405    if 'non_pref_chan[0]' not in sta:
406        raise Exception("Missing non_pref_chan[0] value (assoc)")
407    if sta['non_pref_chan[0]'] != '81:200:1:7':
408        raise Exception("Unexpected non_pref_chan[0] value (assoc)")
409    if 'non_pref_chan[1]' not in sta:
410        raise Exception("Missing non_pref_chan[1] value (assoc)")
411    if sta['non_pref_chan[1]'] != '81:100:2:9':
412        raise Exception("Unexpected non_pref_chan[1] value (assoc)")
413    if 'non_pref_chan[2]' in sta:
414        raise Exception("Unexpected non_pref_chan[2] value (assoc)")
415
416    if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2"):
417        raise Exception("Failed to update non-preferred channel list")
418    time.sleep(0.1)
419    sta = hapd.get_sta(addr)
420    logger.debug("STA: " + str(sta))
421    if 'non_pref_chan[0]' not in sta:
422        raise Exception("Missing non_pref_chan[0] value (update 1)")
423    if sta['non_pref_chan[0]'] != '81:100:2:9':
424        raise Exception("Unexpected non_pref_chan[0] value (update 1)")
425    if 'non_pref_chan[1]' in sta:
426        raise Exception("Unexpected non_pref_chan[1] value (update 1)")
427
428    if "OK" not in dev[0].request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1 81:5:100:1"):
429        raise Exception("Failed to update non-preferred channel list")
430    time.sleep(0.1)
431    sta = hapd.get_sta(addr)
432    logger.debug("STA: " + str(sta))
433    if 'non_pref_chan[0]' not in sta:
434        raise Exception("Missing non_pref_chan[0] value (update 2)")
435    if sta['non_pref_chan[0]'] != '81:100:1:7,5':
436        raise Exception("Unexpected non_pref_chan[0] value (update 2)")
437    if 'non_pref_chan[1]' not in sta:
438        raise Exception("Missing non_pref_chan[1] value (update 2)")
439    if sta['non_pref_chan[1]'] != '81:100:2:9,10,8':
440        raise Exception("Unexpected non_pref_chan[1] value (update 2)")
441    if 'non_pref_chan[2]' in sta:
442        raise Exception("Unexpected non_pref_chan[2] value (update 2)")
443
444    if "OK" not in dev[0].request("SET non_pref_chan 81:5:90:2 82:14:91:2"):
445        raise Exception("Failed to update non-preferred channel list")
446    time.sleep(0.1)
447    sta = hapd.get_sta(addr)
448    logger.debug("STA: " + str(sta))
449    if 'non_pref_chan[0]' not in sta:
450        raise Exception("Missing non_pref_chan[0] value (update 3)")
451    if sta['non_pref_chan[0]'] != '81:90:2:5':
452        raise Exception("Unexpected non_pref_chan[0] value (update 3)")
453    if 'non_pref_chan[1]' not in sta:
454        raise Exception("Missing non_pref_chan[1] value (update 3)")
455    if sta['non_pref_chan[1]'] != '82:91:2:14':
456        raise Exception("Unexpected non_pref_chan[1] value (update 3)")
457    if 'non_pref_chan[2]' in sta:
458        raise Exception("Unexpected non_pref_chan[2] value (update 3)")
459
460    if "OK" not in dev[0].request("SET non_pref_chan "):
461        raise Exception("Failed to update non-preferred channel list")
462    time.sleep(0.1)
463    sta = hapd.get_sta(addr)
464    logger.debug("STA: " + str(sta))
465    if 'non_pref_chan[0]' in sta:
466        raise Exception("Unexpected non_pref_chan[0] value (update 4)")
467
468@remote_compatible
469def test_mbo_sta_supp_op_classes(dev, apdev):
470    """MBO STA supported operating classes"""
471    ssid = "test-wnm-mbo"
472    params = {'ssid': ssid, 'mbo': '1'}
473    hapd = hostapd.add_ap(apdev[0], params)
474
475    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
476
477    addr = dev[0].own_addr()
478    sta = hapd.get_sta(addr)
479    logger.debug("STA: " + str(sta))
480    if 'supp_op_classes' not in sta:
481        raise Exception("No supp_op_classes")
482    supp = bytearray(binascii.unhexlify(sta['supp_op_classes']))
483    if supp[0] != 81:
484        raise Exception("Unexpected current operating class %d" % supp[0])
485    if 115 not in supp:
486        raise Exception("Operating class 115 missing")
487
488def test_mbo_failures(dev, apdev):
489    """MBO failure cases"""
490    ssid = "test-wnm-mbo"
491    params = {'ssid': ssid, 'mbo': '1'}
492    hapd = hostapd.add_ap(apdev[0], params)
493
494    with alloc_fail(dev[0], 1, "wpas_mbo_ie"):
495        dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
496
497    with alloc_fail(dev[0], 1, "wpas_mbo_send_wnm_notification"):
498        if "OK" not in dev[0].request("SET mbo_cell_capa 1"):
499            raise Exception("Failed to set STA as cellular data capable")
500    with fail_test(dev[0], 1, "wpas_mbo_send_wnm_notification"):
501        if "OK" not in dev[0].request("SET mbo_cell_capa 3"):
502            raise Exception("Failed to set STA as cellular data not-capable")
503    with alloc_fail(dev[0], 1, "wpas_mbo_update_non_pref_chan"):
504        if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
505            raise Exception("non_pref_chan value accepted during OOM")
506    with alloc_fail(dev[0], 2, "wpas_mbo_update_non_pref_chan"):
507        if "FAIL" not in dev[0].request("SET non_pref_chan 81:7:200:3"):
508            raise Exception("non_pref_chan value accepted during OOM")
509
510def test_mbo_wnm_bss_tm_ie_parsing(dev, apdev):
511    """MBO BSS transition request MBO IE parsing"""
512    ssid = "test-wnm-mbo"
513    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
514    hapd = hostapd.add_ap(apdev[0], params)
515    bssid = apdev[0]['bssid']
516    addr = dev[0].own_addr()
517    dev[0].connect(ssid, psk="12345678", key_mgmt="WPA-PSK",
518                   proto="WPA2", ieee80211w="0", scan_freq="2412")
519
520    dev[0].request("SET ext_mgmt_frame_handling 1")
521    hdr = "d0003a01" + addr.replace(':', '') + bssid.replace(':', '') + bssid.replace(':', '') + "3000"
522    btm_hdr = "0a070100030001"
523
524    tests = [("Truncated attribute in MBO IE", "dd06506f9a160101"),
525             ("Unexpected cell data capa attribute length in MBO IE",
526              "dd09506f9a160501030500"),
527             ("Unexpected transition reason attribute length in MBO IE",
528              "dd06506f9a160600"),
529             ("Unexpected assoc retry delay attribute length in MBO IE",
530              "dd0c506f9a160100080200000800"),
531             ("Unknown attribute id 255 in MBO IE",
532              "dd06506f9a16ff00")]
533
534    for test, mbo_ie in tests:
535        logger.info(test)
536        dev[0].request("NOTE " + test)
537        frame = hdr + btm_hdr + mbo_ie
538        if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
539            raise Exception("MGMT_RX_PROCESS failed")
540
541    logger.info("Unexpected association retry delay")
542    dev[0].request("NOTE Unexpected association retry delay")
543    btm_hdr = "0a070108030001112233445566778899aabbcc"
544    mbo_ie = "dd08506f9a1608020000"
545    frame = hdr + btm_hdr + mbo_ie
546    if "OK" not in dev[0].request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=" + frame):
547        raise Exception("MGMT_RX_PROCESS failed")
548
549    dev[0].request("SET ext_mgmt_frame_handling 0")
550
551def test_mbo_without_pmf(dev, apdev):
552    """MBO and WPA2 without PMF"""
553    ssid = "test-wnm-mbo"
554    params = {'ssid': ssid, 'mbo': '1', "wpa": '2',
555              "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
556              "wpa_passphrase": "12345678"}
557    try:
558        # "MBO: PMF needs to be enabled whenever using WPA2 with MBO"
559        hostapd.add_ap(apdev[0], params)
560        raise Exception("AP setup succeeded unexpectedly")
561    except Exception as e:
562        if "Failed to enable hostapd" in str(e):
563            pass
564        else:
565            raise
566
567def test_mbo_without_pmf_workaround(dev, apdev):
568    """MBO and WPA2 without PMF on misbehaving AP"""
569    ssid = "test-wnm-mbo"
570    params0 = {'ssid': ssid, "wpa": '2',
571               "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
572               "wpa_passphrase": "12345678",
573               "vendor_elements": "dd07506f9a16010100"}
574    params1 = {'ssid': ssid, "mbo": '1', "wpa": '2',
575               "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
576               "wpa_passphrase": "12345678", "ieee80211w": "1"}
577    hapd0 = hostapd.add_ap(apdev[0], params0)
578    dev[0].connect(ssid, psk="12345678", key_mgmt="WPA-PSK",
579                   proto="WPA2", ieee80211w="1", scan_freq="2412")
580    hapd0.wait_sta()
581    sta = hapd0.get_sta(dev[0].own_addr())
582    ext_capab = bytearray(binascii.unhexlify(sta['ext_capab']))
583    if ext_capab[2] & 0x08:
584        raise Exception("STA did not disable BSS Transition capability")
585    hapd1 = hostapd.add_ap(apdev[1], params1)
586    dev[0].scan_for_bss(hapd1.own_addr(), 2412, force_scan=True)
587    dev[0].roam(hapd1.own_addr())
588    hapd1.wait_sta()
589    sta = hapd1.get_sta(dev[0].own_addr())
590    ext_capab = bytearray(binascii.unhexlify(sta['ext_capab']))
591    if not ext_capab[2] & 0x08:
592        raise Exception("STA disabled BSS Transition capability")
593    dev[0].roam(hapd0.own_addr())
594    hapd0.wait_sta()
595    sta = hapd0.get_sta(dev[0].own_addr())
596    ext_capab = bytearray(binascii.unhexlify(sta['ext_capab']))
597    if ext_capab[2] & 0x08:
598        raise Exception("STA did not disable BSS Transition capability")
599
600def check_mbo_anqp(dev, bssid, cell_data_conn_pref):
601    if "OK" not in dev.request("ANQP_GET " + bssid + " 272,mbo:2"):
602        raise Exception("ANQP_GET command failed")
603
604    ev = dev.wait_event(["GAS-QUERY-START"], timeout=5)
605    if ev is None:
606        raise Exception("GAS query start timed out")
607
608    ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
609    if ev is None:
610        raise Exception("GAS query timed out")
611
612    if cell_data_conn_pref is not None:
613        ev = dev.wait_event(["RX-MBO-ANQP"], timeout=1)
614        if ev is None or "cell_conn_pref" not in ev:
615            raise Exception("Did not receive MBO Cellular Data Connection Preference")
616        if cell_data_conn_pref != int(ev.split('=')[1]):
617            raise Exception("Unexpected cell_conn_pref value: " + ev)
618
619    dev.dump_monitor()
620
621def test_mbo_anqp(dev, apdev):
622    """MBO ANQP"""
623    params = {'ssid': "test-wnm-mbo",
624              'mbo': '1',
625              'interworking': '1',
626              'mbo_cell_data_conn_pref': '1'}
627    hapd = hostapd.add_ap(apdev[0], params)
628    bssid = hapd.own_addr()
629
630    dev[0].scan_for_bss(bssid, freq="2412", force_scan=True)
631    check_mbo_anqp(dev[0], bssid, 1)
632
633    hapd.set('mbo_cell_data_conn_pref', '255')
634    check_mbo_anqp(dev[0], bssid, 255)
635
636    hapd.set('mbo_cell_data_conn_pref', '-1')
637    check_mbo_anqp(dev[0], bssid, None)
638