1# Protected management frames tests
2# Copyright (c) 2013-2024, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7from remotehost import remote_compatible
8import binascii
9import os
10import time
11import logging
12logger = logging.getLogger()
13
14import hwsim_utils
15import hostapd
16from utils import *
17from wlantest import Wlantest
18from wpasupplicant import WpaSupplicant
19from test_eap_proto import rx_msg, tx_msg, proxy_msg
20
21@remote_compatible
22def test_ap_pmf_required(dev, apdev):
23    """WPA2-PSK AP with PMF required"""
24    ssid = "test-pmf-required"
25    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
26    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
27    params["ieee80211w"] = "2"
28    hapd = hostapd.add_ap(apdev[0], params)
29    Wlantest.setup(hapd)
30    wt = Wlantest()
31    wt.flush()
32    wt.add_passphrase("12345678")
33    key_mgmt = hapd.get_config()['key_mgmt']
34    if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256":
35        raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
36    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
37                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
38                   scan_freq="2412")
39    if "[WPA2-PSK-SHA256-CCMP]" not in dev[0].request("SCAN_RESULTS"):
40        raise Exception("Scan results missing RSN element info")
41    hwsim_utils.test_connectivity(dev[0], hapd)
42    dev[1].connect(ssid, psk="12345678", ieee80211w="2",
43                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
44                   scan_freq="2412")
45    hwsim_utils.test_connectivity(dev[1], hapd)
46    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
47        raise Exception("SA_QUERY failed")
48    if "OK" not in hapd.request("SA_QUERY " + dev[1].own_addr()):
49        raise Exception("SA_QUERY failed")
50    if "FAIL" not in hapd.request("SA_QUERY foo"):
51        raise Exception("Invalid SA_QUERY accepted")
52    wt.require_ap_pmf_mandatory(apdev[0]['bssid'])
53    wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
54    wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
55    time.sleep(0.1)
56    if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
57                          dev[0].p2p_interface_addr()) < 1:
58        raise Exception("STA did not reply to SA Query")
59    if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
60                          dev[1].p2p_interface_addr()) < 1:
61        raise Exception("STA did not reply to SA Query")
62
63def start_ocv_ap(apdev):
64    ssid = "test-pmf-required"
65    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
66    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
67    params["ieee80211w"] = "2"
68    params["ocv"] = "1"
69    try:
70        hapd = hostapd.add_ap(apdev, params)
71    except Exception as e:
72        if "Failed to set hostapd parameter ocv" in str(e):
73            raise HwsimSkip("OCV not supported")
74        raise
75
76    Wlantest.setup(hapd)
77    wt = Wlantest()
78    wt.flush()
79    wt.add_passphrase("12345678")
80
81    return hapd, ssid, wt
82
83@remote_compatible
84def test_ocv_sa_query(dev, apdev):
85    """Test SA Query with OCV"""
86    hapd, ssid, wt = start_ocv_ap(apdev[0])
87    dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
88                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
89                   scan_freq="2412")
90    hapd.wait_sta() # wait so we can actually request SA_QUERY
91    # Test that client can handle SA Query with OCI element
92    if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
93        raise Exception("SA_QUERY failed")
94    ev = hapd.wait_event(["OCV-FAILURE"], timeout=0.1)
95    if ev:
96        raise Exception("Unexpected OCV failure reported")
97    if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
98                          dev[0].own_addr()) < 1:
99        raise Exception("STA did not reply to SA Query")
100
101    # Test that AP can handle SA Query with OCI element
102    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
103        raise Exception("Triggering SA Query from the STA failed")
104    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
105    if ev is not None:
106        raise Exception("SA Query from the STA failed")
107
108@remote_compatible
109def test_ocv_sa_query_csa(dev, apdev):
110    """Test SA Query with OCV after channel switch"""
111    hapd, ssid, wt = start_ocv_ap(apdev[0])
112    dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
113                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
114                   scan_freq="2412")
115
116    hapd.request("CHAN_SWITCH 5 2437 ht")
117    time.sleep(1)
118    if wt.get_sta_counter("valid_saqueryreq_tx", apdev[0]['bssid'],
119                          dev[0].own_addr()) < 1:
120        raise Exception("STA did not start SA Query after channel switch")
121
122    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=16)
123    if ev is not None:
124        raise Exception("Unexpected disconnection")
125
126def test_ocv_sa_query_csa_no_resp(dev, apdev):
127    """Test SA Query with OCV after channel switch getting no response"""
128    hapd, ssid, wt = start_ocv_ap(apdev[0])
129    dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
130                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
131                   scan_freq="2412")
132
133    hapd.set("ext_mgmt_frame_handling", "1")
134    hapd.request("CHAN_SWITCH 5 2437 ht")
135    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
136    if ev is None:
137        raise Exception("Disconnection after CSA not reported")
138    if "locally_generated=1" not in ev:
139        raise Exception("Unexpectedly disconnected by AP: " + ev)
140
141def test_ocv_sa_query_csa_missing(dev, apdev):
142    """Test SA Query with OCV missing after channel switch"""
143    hapd, ssid, wt = start_ocv_ap(apdev[0])
144    dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
145                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
146                   scan_freq="2412")
147    hapd.wait_sta() # wait so kernel won't drop deauth frame (MFP)
148    hapd.set("ext_mgmt_frame_handling", "1")
149    dev[0].request("DISCONNECT")
150    dev[0].wait_disconnected()
151    ev = hapd.wait_event(['MGMT-RX'], timeout=5)
152    if ev is None:
153        raise Exception("Deauthentication frame RX not reported")
154    hapd.set("ext_mgmt_frame_handling", "0")
155    hapd.request("CHAN_SWITCH 5 2437 ht")
156    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=20)
157    if ev is None:
158        raise Exception("No disconnection event received from hostapd")
159
160@remote_compatible
161def test_ap_pmf_optional(dev, apdev):
162    """WPA2-PSK AP with PMF optional"""
163    ssid = "test-pmf-optional"
164    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
165    params["wpa_key_mgmt"] = "WPA-PSK"
166    params["ieee80211w"] = "1"
167    hapd = hostapd.add_ap(apdev[0], params)
168    Wlantest.setup(hapd)
169    wt = Wlantest()
170    wt.flush()
171    wt.add_passphrase("12345678")
172    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
173                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
174                   scan_freq="2412")
175    hwsim_utils.test_connectivity(dev[0], hapd)
176    dev[1].connect(ssid, psk="12345678", ieee80211w="2",
177                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
178                   scan_freq="2412")
179    hwsim_utils.test_connectivity(dev[1], hapd)
180    wt.require_ap_pmf_optional(apdev[0]['bssid'])
181    wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
182    wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
183
184@remote_compatible
185def test_ap_pmf_optional_2akm(dev, apdev):
186    """WPA2-PSK AP with PMF optional (2 AKMs)"""
187    ssid = "test-pmf-optional-2akm"
188    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
189    params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256"
190    params["ieee80211w"] = "1"
191    hapd = hostapd.add_ap(apdev[0], params)
192    Wlantest.setup(hapd)
193    wt = Wlantest()
194    wt.flush()
195    wt.add_passphrase("12345678")
196    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
197                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
198                   scan_freq="2412")
199    hwsim_utils.test_connectivity(dev[0], hapd)
200    dev[1].connect(ssid, psk="12345678", ieee80211w="2",
201                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
202                   scan_freq="2412")
203    hwsim_utils.test_connectivity(dev[1], hapd)
204    wt.require_ap_pmf_optional(apdev[0]['bssid'])
205    wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
206    wt.require_sta_key_mgmt(apdev[0]['bssid'], dev[0].p2p_interface_addr(),
207                            "PSK-SHA256")
208    wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
209    wt.require_sta_key_mgmt(apdev[0]['bssid'], dev[1].p2p_interface_addr(),
210                            "PSK-SHA256")
211
212@remote_compatible
213def test_ap_pmf_negative(dev, apdev):
214    """WPA2-PSK AP without PMF (negative test)"""
215    ssid = "test-pmf-negative"
216    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
217    hapd = hostapd.add_ap(apdev[0], params)
218    Wlantest.setup(hapd)
219    wt = Wlantest()
220    wt.flush()
221    wt.add_passphrase("12345678")
222    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
223                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
224                   scan_freq="2412")
225    hwsim_utils.test_connectivity(dev[0], hapd)
226    try:
227        dev[1].connect(ssid, psk="12345678", ieee80211w="2",
228                       key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
229                       scan_freq="2412")
230        hwsim_utils.test_connectivity(dev[1], hapd)
231        raise Exception("PMF required STA connected to no PMF AP")
232    except Exception as e:
233        logger.debug("Ignore expected exception: " + str(e))
234    wt.require_ap_no_pmf(apdev[0]['bssid'])
235
236@remote_compatible
237def test_ap_pmf_assoc_comeback(dev, apdev):
238    """WPA2-PSK AP with PMF association comeback"""
239    run_ap_pmf_assoc_comeback(dev, apdev)
240
241def test_ap_pmf_assoc_comeback_10000tu(dev, apdev):
242    """WPA2-PSK AP with PMF association comeback (10000 TUs)"""
243    run_ap_pmf_assoc_comeback(dev, apdev, comeback=10000)
244
245def run_ap_pmf_assoc_comeback(dev, apdev, comeback=None):
246    ssid = "assoc-comeback"
247    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
248    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
249    params["ieee80211w"] = "2"
250    if comeback is not None:
251        params["assoc_sa_query_max_timeout"] = str(comeback)
252    hapd = hostapd.add_ap(apdev[0], params)
253    Wlantest.setup(hapd)
254    wt = Wlantest()
255    wt.flush()
256    wt.add_passphrase("12345678")
257    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
258                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
259                   scan_freq="2412")
260    hapd.wait_sta(wait_4way_hs=True)
261    hapd.set("ext_mgmt_frame_handling", "1")
262    dev[0].request("DISCONNECT")
263    dev[0].wait_disconnected(timeout=10)
264    ev = hapd.wait_event(["MGMT-RX"], timeout=1)
265    if ev is None:
266        raise Exception("Deauthentication frame RX not reported")
267    hapd.set("ext_mgmt_frame_handling", "0")
268    dev[0].request("REASSOCIATE")
269    dev[0].wait_connected(timeout=20, error="Timeout on re-connection")
270    hapd.wait_4way_hs()
271    if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
272                          dev[0].p2p_interface_addr()) < 1:
273        raise Exception("AP did not use association comeback request")
274
275def test_ap_pmf_assoc_comeback_in_wpas(dev, apdev):
276    """WPA2-PSK AP with PMF association comeback in wpa_supplicant"""
277    ssid = "assoc-comeback"
278    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
279    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
280    params["ieee80211w"] = "2"
281    params["test_assoc_comeback_type"] = "255"
282    hapd = hostapd.add_ap(apdev[0], params)
283
284    dev[0].set("test_assoc_comeback_type", "255")
285    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
286                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
287                   scan_freq="2412")
288    hapd.wait_sta(wait_4way_hs=True)
289    hapd.set("ext_mgmt_frame_handling", "1")
290    dev[0].request("DISCONNECT")
291    dev[0].wait_disconnected(timeout=10)
292    ev = hapd.wait_event(["MGMT-RX"], timeout=1)
293    if ev is None:
294        raise Exception("Deauthentication frame RX not reported")
295    hapd.set("ext_mgmt_frame_handling", "0")
296    dev[0].request("REASSOCIATE")
297    ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
298    if ev is None or "status_code=30" not in ev:
299        raise Exception("Association comeback not requested")
300    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
301                            "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
302    if ev is None:
303        raise Exception("Association not reported")
304    if "CTRL-EVENT-ASSOC-REJECT" in ev:
305        raise Exception("Unexpected association rejection: " + ev)
306    hapd.wait_4way_hs()
307
308    hapd.set("ext_mgmt_frame_handling", "1")
309    dev[0].request("DISCONNECT")
310    dev[0].wait_disconnected(timeout=10)
311    ev = hapd.wait_event(["MGMT-RX"], timeout=1)
312    if ev is None:
313        raise Exception("Deauthentication frame RX not reported")
314    hapd.set("ext_mgmt_frame_handling", "0")
315    dev[0].set("test_assoc_comeback_type", "254")
316    dev[0].request("REASSOCIATE")
317    ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
318    if ev is None or "status_code=30" not in ev:
319        raise Exception("Association comeback not requested")
320    ev = dev[0].wait_event(["SME: Temporary assoc reject: missing association comeback time",
321                            "CTRL-EVENT-CONNECTED",
322                            "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
323    if ev is None:
324        raise Exception("Association not reported")
325    if "SME: Temporary assoc reject: missing association comeback time" not in ev:
326        raise Exception("Unexpected result: " + ev)
327    dev[0].wait_connected(timeout=20, error="Timeout on re-connection with misbehaving AP")
328    hapd.wait_4way_hs()
329
330@remote_compatible
331def test_ap_pmf_assoc_comeback2(dev, apdev):
332    """WPA2-PSK AP with PMF association comeback (using DROP_SA)"""
333    ssid = "assoc-comeback"
334    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
335    params["wpa_key_mgmt"] = "WPA-PSK"
336    params["ieee80211w"] = "1"
337    hapd = hostapd.add_ap(apdev[0], params)
338    Wlantest.setup(hapd)
339    wt = Wlantest()
340    wt.flush()
341    wt.add_passphrase("12345678")
342    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
343                   key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
344    if "OK" not in dev[0].request("DROP_SA"):
345        raise Exception("DROP_SA failed")
346    dev[0].request("REASSOCIATE")
347    dev[0].wait_connected(timeout=10, error="Timeout on re-connection")
348    if wt.get_sta_counter("reassocresp_comeback", apdev[0]['bssid'],
349                          dev[0].p2p_interface_addr()) < 1:
350        raise Exception("AP did not use reassociation comeback request")
351
352@remote_compatible
353def test_ap_pmf_assoc_comeback3(dev, apdev):
354    """WPA2-PSK AP with PMF association comeback (using radio_disabled)"""
355    drv_flags = dev[0].get_driver_status_field("capa.flags")
356    if int(drv_flags, 0) & 0x20 == 0:
357        raise HwsimSkip("Driver does not support radio_disabled")
358    ssid = "assoc-comeback"
359    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
360    params["wpa_key_mgmt"] = "WPA-PSK"
361    params["ieee80211w"] = "1"
362    hapd = hostapd.add_ap(apdev[0], params)
363    Wlantest.setup(hapd)
364    wt = Wlantest()
365    wt.flush()
366    wt.add_passphrase("12345678")
367    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
368                   key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
369    dev[0].set("radio_disabled", "1")
370    dev[0].set("radio_disabled", "0")
371    dev[0].request("REASSOCIATE")
372    dev[0].wait_connected(timeout=10, error="Timeout on re-connection")
373    if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
374                          dev[0].own_addr()) < 1:
375        raise Exception("AP did not use reassociation comeback request")
376
377@remote_compatible
378def test_ap_pmf_assoc_comeback_wps(dev, apdev):
379    """WPA2-PSK AP with PMF association comeback (WPS)"""
380    ssid = "assoc-comeback"
381    appin = "12345670"
382    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
383    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
384    params["ieee80211w"] = "2"
385    params["eap_server"] = "1"
386    params["wps_state"] = "2"
387    params["ap_pin"] = appin
388    hapd = hostapd.add_ap(apdev[0], params)
389    Wlantest.setup(hapd)
390    wt = Wlantest()
391    wt.flush()
392    wt.add_passphrase("12345678")
393    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
394                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
395                   scan_freq="2412")
396    hapd.wait_sta(wait_4way_hs=True)
397    hapd.set("ext_mgmt_frame_handling", "1")
398    dev[0].request("DISCONNECT")
399    dev[0].wait_disconnected(timeout=10)
400    ev = hapd.wait_event(["MGMT-RX"], timeout=1)
401    if ev is None:
402        raise Exception("Deauthentication frame RX not reported")
403    hapd.set("ext_mgmt_frame_handling", "0")
404    dev[0].wps_reg(apdev[0]['bssid'], appin)
405    hapd.wait_4way_hs()
406    if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
407                          dev[0].p2p_interface_addr()) < 1:
408        raise Exception("AP did not use association comeback request")
409
410def test_ap_pmf_ap_dropping_sa(dev, apdev):
411    """WPA2-PSK PMF AP dropping SA"""
412    ssid = "pmf"
413    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
414    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
415    params["ieee80211w"] = "2"
416    hapd = hostapd.add_ap(apdev[0], params)
417    bssid = hapd.own_addr()
418    Wlantest.setup(hapd)
419    wt = Wlantest()
420    wt.flush()
421    wt.add_passphrase("12345678")
422    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
423                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
424    addr0 = dev[0].own_addr()
425    dev[0].dump_monitor()
426    hapd.wait_sta()
427    # Drop SA and association at the AP locally without notifying the STA. This
428    # results in the STA getting unprotected Deauthentication frames when trying
429    # to transmit the next Class 3 frame.
430    if "OK" not in hapd.request("DEAUTHENTICATE " + addr0 + " tx=0"):
431        raise Exception("DEAUTHENTICATE command failed")
432    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
433    if ev is not None:
434        raise Exception("Unexpected disconnection event after DEAUTHENTICATE tx=0: " + ev)
435    dev[0].request("DATA_TEST_CONFIG 1")
436    dev[0].request("DATA_TEST_TX " + bssid + " " + addr0)
437    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
438    dev[0].request("DATA_TEST_CONFIG 0")
439    if ev is None or "locally_generated=1" not in ev:
440        raise Exception("Locally generated disconnection not reported")
441
442def test_ap_pmf_known_sta_id(dev, apdev):
443    """WPA2-PSK AP and Known STA Identification to avoid association comeback"""
444    ssid = "assoc-comeback"
445    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
446    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
447    params["ieee80211w"] = "2"
448    params["known_sta_identification"] = "1"
449    hapd = hostapd.add_ap(apdev[0], params)
450    Wlantest.setup(hapd)
451    wt = Wlantest()
452    wt.flush()
453    wt.add_passphrase("12345678")
454    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
455                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
456                   scan_freq="2412")
457    hapd.wait_sta(wait_4way_hs=True)
458    hapd.set("ext_mgmt_frame_handling", "1")
459    dev[0].request("DISCONNECT")
460    dev[0].wait_disconnected(timeout=10)
461    ev = hapd.wait_event(["MGMT-RX"], timeout=1)
462    if ev is None:
463        raise Exception("Deauthentication frame RX not reported")
464    hapd.set("ext_mgmt_frame_handling", "0")
465    dev[0].request("REASSOCIATE")
466    dev[0].wait_connected(timeout=20, error="Timeout on re-connection")
467    hapd.wait_4way_hs()
468    if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
469                          dev[0].own_addr()) > 0:
470        raise Exception("AP used association comeback request")
471
472def test_ap_pmf_valid_broadcast_deauth(dev, apdev):
473    """WPA2-PSK PMF AP sending valid broadcast deauth without dropping SA"""
474    run_ap_pmf_valid(dev, apdev, False, True)
475
476def test_ap_pmf_valid_broadcast_disassoc(dev, apdev):
477    """WPA2-PSK PMF AP sending valid broadcast disassoc without dropping SA"""
478    run_ap_pmf_valid(dev, apdev, True, True)
479
480def test_ap_pmf_valid_unicast_deauth(dev, apdev):
481    """WPA2-PSK PMF AP sending valid unicast deauth without dropping SA"""
482    run_ap_pmf_valid(dev, apdev, False, False)
483
484def test_ap_pmf_valid_unicast_disassoc(dev, apdev):
485    """WPA2-PSK PMF AP sending valid unicast disassoc without dropping SA"""
486    run_ap_pmf_valid(dev, apdev, True, False)
487
488def run_ap_pmf_valid(dev, apdev, disassociate, broadcast):
489    ssid = "pmf"
490    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
491    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
492    params["ieee80211w"] = "2"
493    hapd = hostapd.add_ap(apdev[0], params)
494    bssid = hapd.own_addr()
495    Wlantest.setup(hapd)
496    wt = Wlantest()
497    wt.flush()
498    wt.add_passphrase("12345678")
499    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
500                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
501    addr0 = dev[0].own_addr()
502    dev[0].dump_monitor()
503    hapd.wait_sta()
504    cmd = "DISASSOCIATE " if disassociate else "DEAUTHENTICATE "
505    cmd += "ff:ff:ff:ff:ff:ff" if broadcast else addr0
506    cmd += " test=1"
507    if "OK" not in hapd.request(cmd):
508        raise Exception("hostapd command failed")
509    sta = hapd.get_sta(addr0)
510    if not sta:
511        raise Exception("STA entry lost")
512    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
513    if ev is None:
514        raise Exception("Disconnection not reported")
515    if "locally_generated=1" in ev:
516        raise Exception("Unexpected locally generated disconnection")
517
518    # Wait for SA Query procedure to fail and association comeback to succeed
519    dev[0].wait_connected()
520
521def start_wpas_ap(ssid):
522    wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
523    wpas.interface_add("wlan5")
524    id = wpas.add_network()
525    wpas.set_network(id, "mode", "2")
526    wpas.set_network_quoted(id, "ssid", ssid)
527    wpas.set_network(id, "proto", "WPA2")
528    wpas.set_network(id, "key_mgmt", "WPA-PSK-SHA256")
529    wpas.set_network(id, "ieee80211w", "2")
530    wpas.set_network_quoted(id, "psk", "12345678")
531    wpas.set_network(id, "pairwise", "CCMP")
532    wpas.set_network(id, "group", "CCMP")
533    wpas.set_network(id, "frequency", "2412")
534    wpas.set_network(id, "scan_freq", "2412")
535    wpas.connect_network(id)
536    wpas.dump_monitor()
537    return wpas
538
539def test_ap_pmf_sta_sa_query(dev, apdev):
540    """WPA2-PSK AP with station using SA Query"""
541    ssid = "assoc-comeback"
542    addr = dev[0].own_addr()
543
544    wpas = start_wpas_ap(ssid)
545    bssid = wpas.own_addr()
546
547    Wlantest.setup(wpas)
548    wt = Wlantest()
549    wt.flush()
550    wt.add_passphrase("12345678")
551
552    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
553                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
554                   scan_freq="2412")
555    wpas.dump_monitor()
556    wpas.request("DEAUTHENTICATE " + addr + " test=0")
557    wpas.dump_monitor()
558    wpas.request("DISASSOCIATE " + addr + " test=0")
559    wpas.dump_monitor()
560    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
561    if ev is not None:
562        raise Exception("Unexpected disconnection")
563
564    wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
565    wpas.dump_monitor()
566    wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
567    wpas.dump_monitor()
568    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
569    if ev is not None:
570        raise Exception("Unexpected disconnection")
571    if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
572        raise Exception("STA did not send SA Query")
573    if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) < 1:
574        raise Exception("AP did not reply to SA Query")
575    wpas.dump_monitor()
576
577def test_ap_pmf_sta_sa_query_no_response(dev, apdev):
578    """WPA2-PSK AP with station using SA Query and getting no response"""
579    ssid = "assoc-comeback"
580    addr = dev[0].own_addr()
581
582    wpas = start_wpas_ap(ssid)
583    bssid = wpas.own_addr()
584
585    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
586                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
587                   scan_freq="2412")
588    wpas.dump_monitor()
589    wpas.request("DEAUTHENTICATE " + addr + " test=0")
590    wpas.dump_monitor()
591    wpas.request("DISASSOCIATE " + addr + " test=0")
592    wpas.dump_monitor()
593    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
594    if ev is not None:
595        raise Exception("Unexpected disconnection")
596
597    wpas.request("SET ext_mgmt_frame_handling 1")
598    wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
599    wpas.dump_monitor()
600    wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
601    wpas.dump_monitor()
602    dev[0].wait_disconnected()
603    wpas.dump_monitor()
604    wpas.request("SET ext_mgmt_frame_handling 0")
605    dev[0].wait_connected()
606    wpas.dump_monitor()
607
608def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
609    """WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames"""
610    ssid = "deauth-attack"
611    addr = dev[0].own_addr()
612
613    wpas = start_wpas_ap(ssid)
614    bssid = wpas.own_addr()
615
616    Wlantest.setup(wpas)
617    wt = Wlantest()
618    wt.flush()
619    wt.add_passphrase("12345678")
620
621    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
622                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
623                   scan_freq="2412")
624
625    for i in range(0, 10):
626        wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
627        wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
628    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
629    if ev is not None:
630        raise Exception("Unexpected disconnection")
631    num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
632    num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
633    if num_req < 1:
634        raise Exception("STA did not send SA Query")
635    if num_resp < 1:
636        raise Exception("AP did not reply to SA Query")
637    if num_req > 1:
638        raise Exception("STA initiated too many SA Query procedures (%d)" % num_req)
639
640    time.sleep(10)
641    for i in range(0, 5):
642        wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
643        wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
644    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
645    if ev is not None:
646        raise Exception("Unexpected disconnection")
647    num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
648    num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
649    if num_req != 2 or num_resp != 2:
650        raise Exception("Unexpected number of SA Query procedures (req=%d resp=%d)" % (num_req, num_resp))
651
652def test_ap_pmf_sta_sa_query_oom(dev, apdev):
653    """WPA2-PSK AP with station using SA Query (OOM)"""
654    ssid = "assoc-comeback"
655    addr = dev[0].own_addr()
656    wpas = start_wpas_ap(ssid)
657    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
658                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
659                   scan_freq="2412")
660    with alloc_fail(dev[0], 1, "=sme_sa_query_timer"):
661        wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
662        wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
663    dev[0].request("DISCONNECT")
664    wpas.request("DISCONNECT")
665    dev[0].wait_disconnected()
666
667def test_ap_pmf_sta_sa_query_local_failure(dev, apdev):
668    """WPA2-PSK AP with station using SA Query (local failure)"""
669    ssid = "assoc-comeback"
670    addr = dev[0].own_addr()
671    wpas = start_wpas_ap(ssid)
672    dev[0].connect(ssid, psk="12345678", ieee80211w="1",
673                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
674                   scan_freq="2412")
675    with fail_test(dev[0], 1, "os_get_random;sme_sa_query_timer"):
676        wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
677        wait_fail_trigger(dev[0], "GET_FAIL")
678    dev[0].request("DISCONNECT")
679    wpas.request("DISCONNECT")
680    dev[0].wait_disconnected()
681
682def test_ap_pmf_sta_sa_query_hostapd(dev, apdev):
683    """WPA2-PSK AP with station using SA Query (hostapd)"""
684    ssid = "assoc-comeback"
685    passphrase = "12345678"
686    addr = dev[0].own_addr()
687
688    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
689                                 wpa_key_mgmt="WPA-PSK-SHA256",
690                                 ieee80211w="2")
691    hapd = hostapd.add_ap(apdev[0], params)
692    bssid = hapd.own_addr()
693
694    Wlantest.setup(hapd)
695    wt = Wlantest()
696    wt.flush()
697    wt.add_passphrase("12345678")
698
699    dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
700                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
701                   scan_freq="2412")
702    hapd.wait_sta()
703    if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " test=0") or \
704       "OK" not in hapd.request("DISASSOCIATE " + addr + " test=0"):
705        raise Exception("Failed to send unprotected disconnection messages")
706    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
707    if ev is not None:
708        raise Exception("Unexpected disconnection")
709
710    if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
711       "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
712        raise Exception("Failed to send unprotected disconnection messages (2)")
713    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
714    if ev is not None:
715        raise Exception("Unexpected disconnection")
716    if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
717        raise Exception("STA did not send SA Query")
718    if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) < 1:
719        raise Exception("AP did not reply to SA Query")
720
721def test_ap_pmf_sta_sa_query_no_response_hostapd(dev, apdev):
722    """WPA2-PSK AP with station using SA Query and getting no response (hostapd)"""
723    ssid = "assoc-comeback"
724    passphrase = "12345678"
725    addr = dev[0].own_addr()
726
727    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
728                                 wpa_key_mgmt="WPA-PSK-SHA256",
729                                 ieee80211w="2")
730    hapd = hostapd.add_ap(apdev[0], params)
731    bssid = hapd.own_addr()
732
733    Wlantest.setup(hapd)
734    wt = Wlantest()
735    wt.flush()
736    wt.add_passphrase("12345678")
737
738    dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
739                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
740                   scan_freq="2412")
741    hapd.wait_sta()
742    hapd.set("ext_mgmt_frame_handling", "1")
743    if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
744       "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
745        raise Exception("Failed to send unprotected disconnection messages")
746    dev[0].wait_disconnected()
747    hapd.set("ext_mgmt_frame_handling", "0")
748    if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
749        raise Exception("STA did not send SA Query")
750    if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) > 0:
751        raise Exception("AP replied to SA Query")
752    dev[0].wait_connected()
753
754def test_ap_pmf_sta_unprot_deauth_burst_hostapd(dev, apdev):
755    """WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames (hostapd)"""
756    ssid = "deauth-attack"
757    passphrase = "12345678"
758    addr = dev[0].own_addr()
759
760    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
761                                 wpa_key_mgmt="WPA-PSK-SHA256",
762                                 ieee80211w="2")
763    hapd = hostapd.add_ap(apdev[0], params)
764    bssid = hapd.own_addr()
765
766    Wlantest.setup(hapd)
767    wt = Wlantest()
768    wt.flush()
769    wt.add_passphrase("12345678")
770
771    dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
772                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
773                   scan_freq="2412")
774    hapd.wait_sta()
775    for i in range(10):
776        if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
777           "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
778            raise Exception("Failed to send unprotected disconnection messages")
779    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
780    if ev is not None:
781        raise Exception("Unexpected disconnection")
782    num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
783    num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
784    if num_req < 1:
785        raise Exception("STA did not send SA Query")
786    if num_resp < 1:
787        raise Exception("AP did not reply to SA Query")
788    if num_req > 1:
789        raise Exception("STA initiated too many SA Query procedures (%d)" % num_req)
790
791    time.sleep(10)
792    for i in range(5):
793        if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
794           "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
795            raise Exception("Failed to send unprotected disconnection messages")
796    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
797    if ev is not None:
798        raise Exception("Unexpected disconnection")
799    num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
800    num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
801    if num_req != 2 or num_resp != 2:
802        raise Exception("Unexpected number of SA Query procedures (req=%d resp=%d)" % (num_req, num_resp))
803
804def test_ap_pmf_required_eap(dev, apdev):
805    """WPA2-EAP AP with PMF required"""
806    ssid = "test-pmf-required-eap"
807    params = hostapd.wpa2_eap_params(ssid=ssid)
808    params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
809    params["ieee80211w"] = "2"
810    hapd = hostapd.add_ap(apdev[0], params)
811    key_mgmt = hapd.get_config()['key_mgmt']
812    if key_mgmt.split(' ')[0] != "WPA-EAP-SHA256":
813        raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
814    dev[0].connect("test-pmf-required-eap", key_mgmt="WPA-EAP-SHA256",
815                   ieee80211w="2", eap="PSK", identity="psk.user@example.com",
816                   password_hex="0123456789abcdef0123456789abcdef",
817                   scan_freq="2412")
818    dev[1].connect("test-pmf-required-eap", key_mgmt="WPA-EAP WPA-EAP-SHA256",
819                   ieee80211w="1", eap="PSK", identity="psk.user@example.com",
820                   password_hex="0123456789abcdef0123456789abcdef",
821                   scan_freq="2412")
822
823def test_ap_pmf_optional_eap(dev, apdev):
824    """WPA2EAP AP with PMF optional"""
825    params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
826    params["ieee80211w"] = "1"
827    hapd = hostapd.add_ap(apdev[0], params)
828    dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS",
829                   identity="pap user", anonymous_identity="ttls",
830                   password="password",
831                   ca_cert="auth_serv/ca.pem", phase2="auth=PAP",
832                   ieee80211w="1", scan_freq="2412")
833    dev[1].connect("test-wpa2-eap", key_mgmt="WPA-EAP WPA-EAP-SHA256",
834                   eap="TTLS", identity="pap user", anonymous_identity="ttls",
835                   password="password",
836                   ca_cert="auth_serv/ca.pem", phase2="auth=PAP",
837                   ieee80211w="2", scan_freq="2412")
838
839@remote_compatible
840def test_ap_pmf_required_sha1(dev, apdev):
841    """WPA2-PSK AP with PMF required with SHA1 AKM"""
842    ssid = "test-pmf-required-sha1"
843    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
844    params["wpa_key_mgmt"] = "WPA-PSK"
845    params["ieee80211w"] = "2"
846    hapd = hostapd.add_ap(apdev[0], params)
847    Wlantest.setup(hapd)
848    wt = Wlantest()
849    wt.flush()
850    wt.add_passphrase("12345678")
851    key_mgmt = hapd.get_config()['key_mgmt']
852    if key_mgmt.split(' ')[0] != "WPA-PSK":
853        raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
854    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
855                   key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
856    if "[WPA2-PSK-CCMP]" not in dev[0].request("SCAN_RESULTS"):
857        raise Exception("Scan results missing RSN element info")
858    hwsim_utils.test_connectivity(dev[0], hapd)
859
860@remote_compatible
861def test_ap_pmf_toggle(dev, apdev):
862    """WPA2-PSK AP with PMF optional and changing PMF on reassociation"""
863    try:
864        _test_ap_pmf_toggle(dev, apdev)
865    finally:
866        dev[0].request("SET reassoc_same_bss_optim 0")
867
868def _test_ap_pmf_toggle(dev, apdev):
869    ssid = "test-pmf-optional"
870    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
871    params["wpa_key_mgmt"] = "WPA-PSK"
872    params["ieee80211w"] = "1"
873    params["assoc_sa_query_max_timeout"] = "1"
874    params["assoc_sa_query_retry_timeout"] = "1"
875    hapd = hostapd.add_ap(apdev[0], params)
876    Wlantest.setup(hapd)
877    wt = Wlantest()
878    wt.flush()
879    wt.add_passphrase("12345678")
880    bssid = apdev[0]['bssid']
881    addr = dev[0].own_addr()
882    dev[0].request("SET reassoc_same_bss_optim 1")
883    id = dev[0].connect(ssid, psk="12345678", ieee80211w="1",
884                        key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
885                        scan_freq="2412")
886    wt.require_ap_pmf_optional(bssid)
887    wt.require_sta_pmf(bssid, addr)
888    sta = hapd.get_sta(addr)
889    if '[MFP]' not in sta['flags']:
890        raise Exception("MFP flag not present for STA")
891
892    dev[0].set_network(id, "ieee80211w", "0")
893    dev[0].request("REASSOCIATE")
894    dev[0].wait_connected()
895    wt.require_sta_no_pmf(bssid, addr)
896    sta = hapd.get_sta(addr)
897    if '[MFP]' in sta['flags']:
898        raise Exception("MFP flag unexpectedly present for STA")
899    err, data = hapd.cmd_execute(['iw', 'dev', apdev[0]['ifname'], 'station',
900                                  'get', addr])
901    if "yes" in [l for l in data.splitlines() if "MFP" in l][0]:
902        raise Exception("Kernel STA entry had MFP enabled")
903
904    dev[0].set_network(id, "ieee80211w", "1")
905    dev[0].request("REASSOCIATE")
906    dev[0].wait_connected()
907    wt.require_sta_pmf(bssid, addr)
908    sta = hapd.get_sta(addr)
909    if '[MFP]' not in sta['flags']:
910        raise Exception("MFP flag not present for STA")
911    err, data = hapd.cmd_execute(['iw', 'dev', apdev[0]['ifname'], 'station',
912                                  'get', addr])
913    if "yes" not in [l for l in data.splitlines() if "MFP" in l][0]:
914        raise Exception("Kernel STA entry did not have MFP enabled")
915
916@remote_compatible
917def test_ap_pmf_required_sta_no_pmf(dev, apdev):
918    """WPA2-PSK AP with PMF required and PMF disabled on STA"""
919    ssid = "test-pmf-required"
920    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
921    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
922    params["ieee80211w"] = "2"
923    hapd = hostapd.add_ap(apdev[0], params)
924
925    # Disable PMF on the station and try to connect
926    dev[0].connect(ssid, psk="12345678", ieee80211w="0",
927                   key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
928                   scan_freq="2412", wait_connect=False)
929    ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND",
930                            "CTRL-EVENT-ASSOC-REJECT"], timeout=2)
931    if ev is None:
932        raise Exception("No connection result")
933    if "CTRL-EVENT-ASSOC-REJECT" in ev:
934        raise Exception("Tried to connect to PMF required AP without PMF enabled")
935    dev[0].request("REMOVE_NETWORK all")
936
937def test_ap_pmf_inject_auth(dev, apdev):
938    """WPA2-PSK AP with PMF and Authentication frame injection"""
939    ssid = "test-pmf"
940    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
941    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
942    params["ieee80211w"] = "2"
943    hapd = hostapd.add_ap(apdev[0], params)
944    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
945                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
946                   scan_freq="2412")
947    hapd.wait_sta()
948    hwsim_utils.test_connectivity(dev[0], hapd)
949
950    bssid = hapd.own_addr().replace(':', '')
951    addr = dev[0].own_addr().replace(':', '')
952
953    # Inject an unprotected Authentication frame claiming to be from the
954    # associated STA, from another STA, from the AP's own address, from all
955    # zeros and all ones addresses, and from a multicast address.
956    hapd.request("SET ext_mgmt_frame_handling 1")
957    failed = False
958    addresses = [ addr, "021122334455", bssid, 6*"00", 6*"ff", 6*"01" ]
959    for a in addresses:
960        auth = "b0003a01" + bssid + a + bssid + '1000000001000000'
961        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
962        if "OK" not in res:
963            failed = True
964    hapd.request("SET ext_mgmt_frame_handling 0")
965    if failed:
966        raise Exception("MGMT_RX_PROCESS failed")
967    time.sleep(0.1)
968
969    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
970    if ev:
971        raise Exception("Unexpected disconnection reported on the STA")
972
973    # Verify that original association is still functional.
974    hwsim_utils.test_connectivity(dev[0], hapd)
975
976    # Inject an unprotected Association Request frame (with and without RSNE)
977    # claiming to be from the set of test addresses.
978    hapd.request("SET ext_mgmt_frame_handling 1")
979    for a in addresses:
980        assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '301a0100000fac040100000fac040100000fac06c0000000000fac06'
981        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
982        if "OK" not in res:
983            failed = True
984
985        assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '3000'
986        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
987        if "OK" not in res:
988            failed = True
989
990        assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824'
991        res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
992        if "OK" not in res:
993            failed = True
994    hapd.request("SET ext_mgmt_frame_handling 0")
995    if failed:
996        raise Exception("MGMT_RX_PROCESS failed")
997    time.sleep(5)
998
999    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
1000    if ev:
1001        raise Exception("Unexpected disconnection reported on the STA")
1002
1003    # Verify that original association is still functional.
1004    hwsim_utils.test_connectivity(dev[0], hapd)
1005
1006def test_ap_pmf_inject_assoc(dev, apdev):
1007    """WPA2-PSK with PMF and Association Request frame injection"""
1008    run_ap_pmf_inject_assoc(dev, apdev, False)
1009
1010def test_ap_pmf_inject_assoc_wps(dev, apdev):
1011    """WPA2-PSK/WPS with PMF and Association Request frame injection"""
1012    run_ap_pmf_inject_assoc(dev, apdev, True)
1013
1014def inject_assoc_req(hapd, addr, frame):
1015    hapd.set("ext_mgmt_frame_handling", "1")
1016    res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % frame)
1017    if "OK" not in res:
1018        raise Exception("MGMT_RX_PROCESS failed")
1019    hapd.set("ext_mgmt_frame_handling", "0")
1020    sta = hapd.get_sta(addr)
1021    if "[MFP]" not in sta['flags']:
1022        raise Exception("MFP flag removed")
1023    if sta["AKMSuiteSelector"] != '00-0f-ac-6':
1024        raise Exception("AKMSuiteSelector value changed")
1025
1026def run_ap_pmf_inject_assoc(dev, apdev, wps):
1027    ssid = "test-pmf"
1028    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1029    params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256"
1030    params["ieee80211w"] = "1"
1031    if wps:
1032        params["eap_server"] = "1"
1033        params["wps_state"] = "2"
1034
1035    hapd = hostapd.add_ap(apdev[0], params)
1036    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1037                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1038                   scan_freq="2412")
1039    hapd.wait_sta()
1040    sta = hapd.get_sta(dev[0].own_addr())
1041    if "[MFP]" not in sta['flags']:
1042        raise Exception("MFP flag not reported")
1043    if sta["AKMSuiteSelector"] != '00-0f-ac-6':
1044        raise Exception("Incorrect AKMSuiteSelector value")
1045
1046    bssid = hapd.own_addr().replace(':', '')
1047    addr = dev[0].own_addr().replace(':', '')
1048
1049    # Inject unprotected Association Request frames
1050    assoc1 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
1051    assoc2 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac060000'
1052    assoc3 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824'
1053
1054    inject_assoc_req(hapd, dev[0].own_addr(), assoc1)
1055    time.sleep(0.1)
1056    inject_assoc_req(hapd, dev[0].own_addr(), assoc1)
1057    time.sleep(0.1)
1058    inject_assoc_req(hapd, dev[0].own_addr(), assoc2)
1059    time.sleep(0.1)
1060    inject_assoc_req(hapd, dev[0].own_addr(), assoc3)
1061    time.sleep(0.1)
1062    inject_assoc_req(hapd, dev[0].own_addr(), assoc2)
1063
1064    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5.1)
1065    if ev:
1066        raise Exception("Unexpected disconnection reported on the STA")
1067    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1068    if ev:
1069        raise Exception("Unexpected disconnection event received from hostapd")
1070
1071    # Verify that original association is still functional.
1072    hwsim_utils.test_connectivity(dev[0], hapd)
1073
1074def test_ap_pmf_inject_data(dev, apdev):
1075    """WPA2-PSK AP with PMF and Data frame injection"""
1076    try:
1077        run_ap_pmf_inject_data(dev, apdev)
1078    finally:
1079        stop_monitor(apdev[1]["ifname"])
1080
1081def run_ap_pmf_inject_data(dev, apdev):
1082    ssid = "test-pmf"
1083    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1084    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1085    params["ieee80211w"] = "2"
1086    hapd = hostapd.add_ap(apdev[0], params)
1087    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1088                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1089                   scan_freq="2412")
1090    hapd.wait_sta()
1091    hwsim_utils.test_connectivity(dev[0], hapd)
1092
1093    sock = start_monitor(apdev[1]["ifname"])
1094    radiotap = radiotap_build()
1095
1096    bssid = hapd.own_addr().replace(':', '')
1097    addr = dev[0].own_addr().replace(':', '')
1098
1099    # Inject Data frame with A2=broadcast, A2=multicast, A2=BSSID, A2=STA, and
1100    # A2=unknown unicast
1101    addresses = [ 6*"ff", 6*"01", bssid, addr, "020102030405" ]
1102    for a in addresses:
1103        frame = binascii.unhexlify("48010000" + bssid + a + bssid + "0000")
1104        sock.send(radiotap + frame)
1105
1106    time.sleep(0.1)
1107    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
1108    if ev:
1109        raise Exception("Unexpected disconnection reported on the STA")
1110    hwsim_utils.test_connectivity(dev[0], hapd)
1111
1112def test_ap_pmf_inject_msg1(dev, apdev):
1113    """WPA2-PSK AP with PMF and EAPOL-Key msg 1/4 injection"""
1114    try:
1115        run_ap_pmf_inject_msg1(dev, apdev)
1116    finally:
1117        stop_monitor(apdev[1]["ifname"])
1118
1119def test_ap_pmf_inject_msg1_no_pmf(dev, apdev):
1120    """WPA2-PSK AP without PMF and EAPOL-Key msg 1/4 injection"""
1121    try:
1122        run_ap_pmf_inject_msg1(dev, apdev, pmf=False)
1123    finally:
1124        stop_monitor(apdev[1]["ifname"])
1125
1126def run_ap_pmf_inject_msg1(dev, apdev, pmf=True):
1127    ssid = "test-pmf"
1128    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1129    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1130    if pmf:
1131        params["ieee80211w"] = "2"
1132    hapd = hostapd.add_ap(apdev[0], params)
1133    dev[0].connect(ssid, psk="12345678", ieee80211w="2" if pmf else "0",
1134                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1135                   scan_freq="2412")
1136    hapd.wait_sta()
1137
1138    sock = start_monitor(apdev[1]["ifname"])
1139    radiotap = radiotap_build()
1140
1141    bssid = hapd.own_addr().replace(':', '')
1142    addr = dev[0].own_addr().replace(':', '')
1143
1144    # Inject unprotected EAPOL-Key msg 1/4 with an invalid KDE
1145    f = "88020000" + addr + bssid + bssid + "0000" + "0700"
1146    f += "aaaa03000000" + "888e"
1147    f += "0203006602008b00100000000000000005bcb714da6f98f817b88948485c26ef052922b795814819f1889ae01e11b486910000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000007" + "dd33000fac0400"
1148    frame = binascii.unhexlify(f)
1149    sock.send(radiotap + frame)
1150
1151    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
1152    if ev:
1153        raise Exception("Unexpected disconnection reported on the STA")
1154    hwsim_utils.test_connectivity(dev[0], hapd)
1155    state = dev[0].get_status_field("wpa_state")
1156    if state != "COMPLETED":
1157        raise Exception("Unexpected wpa_state: " + state)
1158
1159def test_ap_pmf_inject_eap(dev, apdev):
1160    """WPA2-EAP AP with PMF and EAP frame injection"""
1161    try:
1162        run_ap_pmf_inject_eap(dev, apdev)
1163    finally:
1164        stop_monitor(apdev[1]["ifname"])
1165
1166def run_ap_pmf_inject_eap(dev, apdev, pmf=True):
1167    ssid = "test-pmf-eap"
1168    params = hostapd.wpa2_eap_params(ssid=ssid)
1169    params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
1170    params["ieee80211w"] = "2"
1171    hapd = hostapd.add_ap(apdev[0], params)
1172    dev[0].connect(ssid, key_mgmt="WPA-EAP-SHA256",
1173                   ieee80211w="2", eap="PSK", identity="psk.user@example.com",
1174                   password_hex="0123456789abcdef0123456789abcdef",
1175                   scan_freq="2412")
1176    hapd.wait_sta()
1177    dev[0].dump_monitor()
1178    hapd.dump_monitor()
1179
1180    sock = start_monitor(apdev[1]["ifname"])
1181    radiotap = radiotap_build()
1182
1183    bssid = hapd.own_addr().replace(':', '')
1184    addr = dev[0].own_addr().replace(':', '')
1185
1186    disconnected = False
1187    eap_start = False
1188    eap_failure = False
1189    ap_disconnected = False
1190
1191    # Inject various unexpected unprotected EAPOL frames to the STA
1192    f = "88020000" + addr + bssid + bssid + "0000" + "0700"
1193    f += "aaaa03000000" + "888e"
1194    tests = []
1195    for i in range(101):
1196        tests += [ "02000005012d000501" ] # EAP-Request/Identity
1197    for i in range(101):
1198        tests += [ "02000022012e00222f00862406a9b45782fee8a62e837457d1367365727665722e77312e6669" ] # EAP-Request/PSK
1199    tests += [ "0200000404780004" ] # EAP-Failure
1200    tests += [ "0200000403780004" ] # EAP-Success
1201    tests += [ "02000006057800060100" ] # EAP-Initiate
1202    tests += [ "0200000406780004" ] # EAP-Finish
1203    tests += [ "0200000400780004" ] # EAP-?
1204    tests += [ "02020000" ] # EAPOL-Logoff
1205    tests += [ "02010000" ] # EAPOL-Start
1206    for t in tests:
1207        dev[0].note("Inject " + t)
1208        frame = binascii.unhexlify(f + t)
1209        sock.send(radiotap + frame)
1210        for i in range(2):
1211            ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED",
1212                                    "CTRL-EVENT-EAP-STARTED",
1213                                    "CTRL-EVENT-EAP-FAILURE"],
1214                                   timeout=0.0001)
1215            if ev is None:
1216                break
1217            if "CTRL-EVENT-DISCONNECTED" in ev:
1218                disconnected = True
1219            if "CTRL-EVENT-EAP-START" in ev:
1220                eap_start = True
1221            if "CTRL-EVENT-EAP-FAILURE" in ev:
1222                eap_failure = True
1223        dev[0].dump_monitor(mon=False)
1224    dev[0].dump_monitor()
1225    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1226    if ev:
1227        ap_disconnected = True
1228    if disconnected or eap_start or eap_failure or ap_disconnected:
1229        raise Exception("Unexpected event:%s%s%s%s" %
1230                        (" disconnected" if disconnected else "",
1231                         " eap_start" if eap_start else "",
1232                         " eap_failure" if eap_failure else "",
1233                         " ap_disconnected" if ap_disconnected else ""))
1234    hwsim_utils.test_connectivity(dev[0], hapd)
1235    state = dev[0].get_status_field("wpa_state")
1236    if state != "COMPLETED":
1237        raise Exception("Unexpected wpa_state: " + state)
1238
1239    dev[0].dump_monitor()
1240    hapd.dump_monitor()
1241
1242    # Inject various unexpected unprotected EAPOL frames to the AP
1243    f = "88010000" + bssid + addr + bssid + "0000" + "0700"
1244    f += "aaaa03000000" + "888e"
1245    tests = []
1246    tests += [ "02020000" ] # EAPOL-Logoff
1247    for i in range(10):
1248        tests += [ "02010000" ] # EAPOL-Start
1249    for t in tests:
1250        hapd.note("Inject " + t)
1251        frame = binascii.unhexlify(f + t)
1252        sock.send(radiotap + frame)
1253        for i in range(2):
1254            ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED",
1255                                    "CTRL-EVENT-EAP-STARTED",
1256                                    "CTRL-EVENT-EAP-FAILURE"],
1257                                   timeout=0.0001)
1258            if ev is None:
1259                break
1260            if "CTRL-EVENT-DISCONNECTED" in ev:
1261                disconnected = True
1262            if "CTRL-EVENT-EAP-START" in ev:
1263                eap_start = True
1264            if "CTRL-EVENT-EAP-FAILURE" in ev:
1265                eap_failure = True
1266        dev[0].dump_monitor(mon=False)
1267    dev[0].dump_monitor()
1268    ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1269    if ev:
1270        ap_disconnected = True
1271    hapd.dump_monitor()
1272    if disconnected or eap_start or eap_failure or ap_disconnected:
1273        raise Exception("Unexpected event(2):%s%s%s%s" %
1274                        (" disconnected" if disconnected else "",
1275                         " eap_start" if eap_start else "",
1276                         " eap_failure" if eap_failure else "",
1277                         " ap_disconnected" if ap_disconnected else ""))
1278    hwsim_utils.test_connectivity(dev[0], hapd)
1279    state = dev[0].get_status_field("wpa_state")
1280    if state != "COMPLETED":
1281        raise Exception("Unexpected wpa_state(2): " + state)
1282
1283def test_ap_pmf_tkip_reject(dev, apdev):
1284    """Mixed mode BSS and MFP-enabled AP rejecting TKIP"""
1285    skip_without_tkip(dev[0])
1286    params = hostapd.wpa2_params(ssid="test-pmf", passphrase="12345678")
1287    params['wpa'] = '3'
1288    params["ieee80211w"] = "1"
1289    params["wpa_pairwise"] = "TKIP CCMP"
1290    params["rsn_pairwise"] = "TKIP CCMP"
1291    hostapd.add_ap(apdev[0], params)
1292
1293    dev[0].connect("test-pmf", psk="12345678", pairwise="CCMP", ieee80211w="2",
1294                   scan_freq="2412")
1295    dev[0].dump_monitor()
1296
1297    dev[1].connect("test-pmf", psk="12345678", proto="WPA", pairwise="TKIP",
1298                   ieee80211w="0", scan_freq="2412")
1299    dev[1].dump_monitor()
1300
1301    dev[2].connect("test-pmf", psk="12345678", pairwise="TKIP",
1302                   ieee80211w="2", scan_freq="2412", wait_connect=False)
1303    ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED",
1304                            "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1305    if ev is None:
1306        raise Exception("No connection result reported")
1307    if "CTRL-EVENT-ASSOC-REJECT" not in ev:
1308        raise Exception("MFP + TKIP connection was not rejected")
1309    if "status_code=31" not in ev:
1310        raise Exception("Unexpected status code in rejection: " + ev)
1311    dev[2].request("DISCONNECT")
1312    dev[2].dump_monitor()
1313
1314def test_ap_pmf_sa_query_timeout(dev, apdev):
1315    """SA Query timeout"""
1316    ssid = "test-pmf-required"
1317    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1318    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1319    params["ieee80211w"] = "2"
1320    hapd = hostapd.add_ap(apdev[0], params)
1321    dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1322                   key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1323                   scan_freq="2412")
1324
1325    hapd.set("ext_mgmt_frame_handling", "1")
1326    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1327        raise Exception("Triggering SA Query from the STA failed")
1328    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=2)
1329    if ev is None:
1330        raise Exception("No disconnection on SA Query timeout seen")
1331    hapd.set("ext_mgmt_frame_handling", "0")
1332    dev[0].wait_connected()
1333    dev[0].dump_monitor()
1334
1335    hapd.set("ext_mgmt_frame_handling", "1")
1336    if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1337        raise Exception("Triggering SA Query from the STA failed")
1338    ev = hapd.mgmt_rx()
1339    hapd.set("ext_mgmt_frame_handling", "0")
1340    dev[0].request("DISCONNECT")
1341    dev[0].wait_disconnected()
1342    dev[0].request("RECONNECT")
1343    dev[0].wait_connected()
1344    hapd.set("ext_mgmt_frame_handling", "1")
1345    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1.5)
1346    if ev is not None:
1347        raise Exception("Unexpected disconnection after reconnection seen")
1348
1349def mac80211_read_key(keydir):
1350    vals = {}
1351    for name in os.listdir(keydir):
1352        try:
1353            with open(os.path.join(keydir, name)) as f:
1354                vals[name] = f.read().strip()
1355        except OSError as e:
1356            pass
1357    return vals
1358
1359def check_mac80211_bigtk(dev, hapd):
1360    sta_key = None
1361    ap_key = None
1362
1363    phy = dev.get_driver_status_field("phyname")
1364    keys = "/sys/kernel/debug/ieee80211/%s/keys" % phy
1365    try:
1366        for key in os.listdir(keys):
1367            keydir = os.path.join(keys, key)
1368            vals = mac80211_read_key(keydir)
1369            keyidx = int(vals['keyidx'])
1370            if keyidx == 6 or keyidx == 7:
1371                sta_key = vals;
1372                break
1373    except OSError as e:
1374        raise HwsimSkip("debugfs not supported in mac80211 (STA)")
1375
1376    phy = hapd.get_driver_status_field("phyname")
1377    keys = "/sys/kernel/debug/ieee80211/%s/keys" % phy
1378    try:
1379        for key in os.listdir(keys):
1380            keydir = os.path.join(keys, key)
1381            vals = mac80211_read_key(keydir)
1382            keyidx = int(vals['keyidx'])
1383            if keyidx == 6 or keyidx == 7:
1384                ap_key = vals;
1385                break
1386    except OSError as e:
1387        raise HwsimSkip("debugfs not supported in mac80211 (AP)")
1388
1389    if not sta_key:
1390        raise Exception("Could not find STA key information from debugfs")
1391    logger.info("STA key: " + str(sta_key))
1392
1393    if not ap_key:
1394        raise Exception("Could not find AP key information from debugfs")
1395    logger.info("AP key: " + str(ap_key))
1396
1397    if sta_key['key'] != ap_key['key']:
1398        raise Exception("AP and STA BIGTK mismatch")
1399
1400    if sta_key['keyidx'] != ap_key['keyidx']:
1401        raise Exception("AP and STA BIGTK keyidx mismatch")
1402
1403    if sta_key['algorithm'] != ap_key['algorithm']:
1404        raise Exception("AP and STA BIGTK algorithm mismatch")
1405
1406    replays = int(sta_key['replays'])
1407    icverrors = int(sta_key['icverrors'])
1408    if replays > 0 or icverrors > 0:
1409        raise Exception("STA reported errors: replays=%d icverrors=%d" % replays, icverrors)
1410
1411    rx_spec = int(sta_key['rx_spec'], base=16)
1412    if rx_spec < 3:
1413        raise Exception("STA did not update BIGTK receive counter sufficiently")
1414
1415    tx_spec = int(ap_key['tx_spec'], base=16)
1416    if tx_spec < 3:
1417        raise Exception("AP did not update BIGTK BIPN sufficiently")
1418
1419def test_ap_pmf_beacon_protection_bip(dev, apdev):
1420    """WPA2-PSK Beacon protection (BIP)"""
1421    run_ap_pmf_beacon_protection(dev, apdev, "AES-128-CMAC")
1422
1423def test_ap_pmf_beacon_protection_bip_cmac_256(dev, apdev):
1424    """WPA2-PSK Beacon protection (BIP-CMAC-256)"""
1425    run_ap_pmf_beacon_protection(dev, apdev, "BIP-CMAC-256")
1426
1427def test_ap_pmf_beacon_protection_bip_gmac_128(dev, apdev):
1428    """WPA2-PSK Beacon protection (BIP-GMAC-128)"""
1429    run_ap_pmf_beacon_protection(dev, apdev, "BIP-GMAC-128")
1430
1431def test_ap_pmf_beacon_protection_bip_gmac_256(dev, apdev):
1432    """WPA2-PSK Beacon protection (BIP-GMAC-256)"""
1433    run_ap_pmf_beacon_protection(dev, apdev, "BIP-GMAC-256")
1434
1435def run_ap_pmf_beacon_protection(dev, apdev, cipher):
1436    ssid = "test-beacon-prot"
1437    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1438    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1439    params["ieee80211w"] = "2"
1440    params["beacon_prot"] = "1"
1441    params["group_mgmt_cipher"] = cipher
1442    try:
1443        hapd = hostapd.add_ap(apdev[0], params)
1444    except Exception as e:
1445        if "Failed to enable hostapd interface" in str(e):
1446            raise HwsimSkip("Beacon protection not supported")
1447        raise
1448
1449    bssid = hapd.own_addr()
1450
1451    Wlantest.setup(hapd)
1452    wt = Wlantest()
1453    wt.flush()
1454    wt.add_passphrase("12345678")
1455
1456    dev[0].flush_scan_cache()
1457
1458    # STA with Beacon protection enabled
1459    dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1460                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1461    if dev[0].get_status_field("bigtk_set") != "1":
1462        raise Exception("bigtk_set=1 not indicated")
1463
1464    # STA with Beacon protection disabled
1465    dev[1].connect(ssid, psk="12345678", ieee80211w="2",
1466                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1467    if dev[1].get_status_field("bigtk_set") == "1":
1468        raise Exception("Unexpected bigtk_set=1 indication")
1469
1470    time.sleep(1)
1471    check_mac80211_bigtk(dev[0], hapd)
1472
1473    valid_bip = wt.get_bss_counter('valid_bip_mmie', bssid)
1474    invalid_bip = wt.get_bss_counter('invalid_bip_mmie', bssid)
1475    missing_bip = wt.get_bss_counter('missing_bip_mmie', bssid)
1476    logger.info("wlantest BIP counters: valid=%d invalid=%d missing=%d" % (valid_bip, invalid_bip, missing_bip))
1477    if valid_bip < 0 or invalid_bip > 0 or missing_bip > 0:
1478        raise Exception("Unexpected wlantest BIP counters: valid=%d invalid=%d missing=%d" % (valid_bip, invalid_bip, missing_bip))
1479
1480    ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=10)
1481    if ev is not None:
1482        raise Exception("Beacon loss detected")
1483
1484    # Verify that the SSID has been successfully verified from a protected
1485    # Beacon frame.
1486    if dev[0].get_status_field("ssid_verified") != "1":
1487        raise Exception("ssid_verified=1 not in STATUS")
1488
1489def test_ap_pmf_beacon_protection_mismatch(dev, apdev):
1490    """WPA2-PSK Beacon protection MIC mismatch"""
1491    run_ap_pmf_beacon_protection_mismatch(dev, apdev, False)
1492
1493def test_ap_pmf_beacon_protection_missing(dev, apdev):
1494    """WPA2-PSK Beacon protection MME missing"""
1495    run_ap_pmf_beacon_protection_mismatch(dev, apdev, True)
1496
1497def run_ap_pmf_beacon_protection_mismatch(dev, apdev, clear):
1498    ssid = "test-beacon-prot"
1499    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1500    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1501    params["ieee80211w"] = "2"
1502    params["beacon_prot"] = "1"
1503    params["group_mgmt_cipher"] = "AES-128-CMAC"
1504    try:
1505        hapd = hostapd.add_ap(apdev[0], params)
1506    except Exception as e:
1507        if "Failed to enable hostapd interface" in str(e):
1508            raise HwsimSkip("Beacon protection not supported")
1509        raise
1510
1511    bssid = hapd.own_addr()
1512
1513    Wlantest.setup(hapd)
1514    wt = Wlantest()
1515    wt.flush()
1516    wt.add_passphrase("12345678")
1517
1518    dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1519                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1520
1521    WPA_ALG_NONE = 0
1522    WPA_ALG_IGTK = 4
1523    KEY_FLAG_DEFAULT = 0x02
1524    KEY_FLAG_TX = 0x08
1525    KEY_FLAG_GROUP = 0x10
1526    KEY_FLAG_GROUP_TX_DEFAULT = KEY_FLAG_GROUP | KEY_FLAG_TX | KEY_FLAG_DEFAULT
1527
1528    addr = "ff:ff:ff:ff:ff:ff"
1529
1530    if clear:
1531        res = hapd.request("SET_KEY %d %s %d %d %s %s %d" % (WPA_ALG_NONE, addr, 6, 1, 6*"00", "", KEY_FLAG_GROUP))
1532    else:
1533        res = hapd.request("SET_KEY %d %s %d %d %s %s %d" % (WPA_ALG_IGTK, addr, 6, 1, 6*"00", 16*"00", KEY_FLAG_GROUP_TX_DEFAULT))
1534    if "OK" not in res:
1535        raise Exception("SET_KEY failed")
1536
1537    ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON"], timeout=5)
1538    if ev is None:
1539        raise Exception("Unprotected Beacon frame not reported")
1540
1541    ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
1542    if ev is None:
1543        raise Exception("Beacon loss not reported")
1544
1545    ev = hapd.wait_event(["CTRL-EVENT-UNPROT-BEACON"], timeout=5)
1546    if ev is None:
1547        raise Exception("WNM-Notification Request frame not reported")
1548
1549def test_ap_pmf_beacon_protection_reconnect(dev, apdev):
1550    """Beacon protection and reconnection"""
1551    ssid = "test-beacon-prot"
1552    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1553    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1554    params["ieee80211w"] = "2"
1555    params["beacon_prot"] = "1"
1556    params["group_mgmt_cipher"] = "AES-128-CMAC"
1557    try:
1558        hapd = hostapd.add_ap(apdev[0], params)
1559    except Exception as e:
1560        if "Failed to enable hostapd interface" in str(e):
1561            raise HwsimSkip("Beacon protection not supported")
1562        raise
1563
1564    dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1565                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1566    dev[0].request("DISCONNECT")
1567    dev[0].wait_disconnected()
1568    dev[0].request("RECONNECT")
1569    dev[0].wait_connected()
1570    time.sleep(1)
1571    check_mac80211_bigtk(dev[0], hapd)
1572    ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
1573    if ev is not None:
1574        raise Exception("Beacon loss detected")
1575
1576def test_ap_pmf_beacon_protection_unicast(dev, apdev):
1577    """WPA2-PSK Beacon protection (BIP) and unicast Beacon frame"""
1578    try:
1579        run_ap_pmf_beacon_protection_unicast(dev, apdev)
1580    finally:
1581        stop_monitor(apdev[1]["ifname"])
1582
1583def run_ap_pmf_beacon_protection_unicast(dev, apdev):
1584    cipher = "AES-128-CMAC"
1585    ssid = "test-beacon-prot"
1586    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1587    params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1588    params["ieee80211w"] = "2"
1589    params["beacon_prot"] = "1"
1590    params["group_mgmt_cipher"] = cipher
1591    try:
1592        hapd = hostapd.add_ap(apdev[0], params)
1593    except Exception as e:
1594        if "Failed to enable hostapd interface" in str(e):
1595            raise HwsimSkip("Beacon protection not supported")
1596        raise
1597
1598    bssid = hapd.own_addr()
1599
1600    Wlantest.setup(hapd)
1601    wt = Wlantest()
1602    wt.flush()
1603    wt.add_passphrase("12345678")
1604
1605    # STA with Beacon protection enabled
1606    dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1607                   key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1608    hapd.wait_sta()
1609
1610    sock = start_monitor(apdev[1]["ifname"])
1611    radiotap = radiotap_build()
1612
1613    bssid = hapd.own_addr().replace(':', '')
1614    addr = dev[0].own_addr().replace(':', '')
1615
1616    h = "80000000" + addr + bssid + bssid + "0000"
1617    h += "c0a0260d27090600"+ "6400" + "1104"
1618    h += "0010746573742d626561636f6e2d70726f74"
1619    h += "010882848b960c121824"
1620    h += "03010"
1621    h += "1050400020000"
1622    h += "2a0104"
1623    h += "32043048606c"
1624    h += "30140100000fac040100000fac040100000fac06cc00"
1625    h += "3b025100"
1626    h += "2d1a0c001bffff000000000000000000000100000000000000000000"
1627    h += "3d1601000000000000000000000000000000000000000000"
1628    h += "7f0b0400000200000040000010"
1629    h += "2503000b01" # CSA
1630    h += "3c0400510b01" # ECSA
1631    h += "dd180050f2020101010003a4000027a4000042435e0062322f00"
1632
1633    frame = binascii.unhexlify(h)
1634    h += "4c1006002100000000002b8fab24bcef3bb1" #MME
1635    frame2 = binascii.unhexlify(h)
1636
1637    sock.send(radiotap + frame)
1638    ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON",
1639                            "CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=5)
1640    if ev:
1641        if "CTRL-EVENT-STARTED-CHANNEL-SWITCH" in ev:
1642            raise Exception("Unexpected channel switch reported")
1643        if hapd.own_addr() not in ev:
1644            raise Exception("Unexpected BSSID in unprotected beacon indication")
1645
1646    time.sleep(10.1)
1647    sock.send(radiotap + frame2)
1648    ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON",
1649                            "CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=5)
1650    if ev:
1651        if "CTRL-EVENT-STARTED-CHANNEL-SWITCH" in ev:
1652            raise Exception("Unexpected channel switch reported")
1653        if hapd.own_addr() not in ev:
1654            raise Exception("Unexpected BSSID in unprotected beacon indication")
1655
1656def test_ap_pmf_sta_global_require(dev, apdev):
1657    """WPA2-PSK AP with PMF optional and wpa_supplicant pmf=2"""
1658    ssid = "test-pmf-optional"
1659    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1660    params["wpa_key_mgmt"] = "WPA-PSK"
1661    params["ieee80211w"] = "1"
1662    hapd = hostapd.add_ap(apdev[0], params)
1663    try:
1664        dev[0].set("pmf", "2")
1665        dev[0].connect(ssid, psk="12345678",
1666                       key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
1667                       scan_freq="2412")
1668        pmf = dev[0].get_status_field("pmf")
1669        if pmf != "1":
1670            raise Exception("Unexpected PMF state: " + str(pmf))
1671    finally:
1672        dev[0].set("pmf", "0")
1673
1674def test_ap_pmf_sta_global_require2(dev, apdev):
1675    """WPA2-PSK AP with PMF optional and wpa_supplicant pmf=2 (2)"""
1676    ssid = "test-pmf-optional"
1677    params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1678    params["wpa_key_mgmt"] = "WPA-PSK"
1679    params["ieee80211w"] = "0"
1680    hapd = hostapd.add_ap(apdev[0], params)
1681    bssid = hapd.own_addr()
1682    try:
1683        dev[0].scan_for_bss(bssid, freq=2412)
1684        dev[0].set("pmf", "2")
1685        dev[0].connect(ssid, psk="12345678",
1686                       key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
1687                       scan_freq="2412", wait_connect=False)
1688        ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
1689                                "CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
1690        if ev is None:
1691            raise Exception("Connection result not reported")
1692        if "CTRL-EVENT-CONNECTED" in ev:
1693            raise Exception("Unexpected connection")
1694    finally:
1695        dev[0].set("pmf", "0")
1696
1697def test_ap_pmf_drop_robust_mgmt_prior_to_keys_installation(dev, apdev):
1698    """Drop non protected Robust Action frames prior to keys installation"""
1699    ssid = "test-pmf-required"
1700    passphrase = '12345678'
1701    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1702    params['delay_eapol_tx'] = '1'
1703    params['ieee80211w'] = '2'
1704    params['wpa_pairwise_update_count'] = '5'
1705    hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
1706
1707    # Spectrum management with Channel Switch element
1708    msg = {'fc': 0x00d0,
1709           'sa': hapd.own_addr(),
1710           'da': dev[0].own_addr(),
1711           'bssid': hapd.own_addr(),
1712           'payload': binascii.unhexlify('00042503000608')
1713           }
1714
1715    dev[0].connect(ssid, psk=passphrase, scan_freq='2412', ieee80211w='1',
1716                   wait_connect=False)
1717
1718    # wait for the first delay before sending the frame
1719    ev = hapd.wait_event(['DELAY-EAPOL-TX-1'], timeout=10)
1720    if ev is None:
1721        raise Exception("EAPOL is not delayed")
1722
1723    # send the Action frame while connecting (prior to keys installation)
1724    hapd.mgmt_tx(msg)
1725
1726    dev[0].wait_connected(timeout=10, error="Timeout on connection")
1727    hapd.wait_sta()
1728    hwsim_utils.test_connectivity(dev[0], hapd)
1729
1730    # Verify no channel switch event
1731    ev = dev[0].wait_event(['CTRL-EVENT-STARTED-CHANNEL-SWITCH'], timeout=5)
1732    if ev is not None:
1733        raise Exception("Unexpected CSA prior to keys installation")
1734
1735    # Send the frame after keys installation and verify channel switch event
1736    hapd.mgmt_tx(msg)
1737    ev = dev[0].wait_event(['CTRL-EVENT-STARTED-CHANNEL-SWITCH'], timeout=5)
1738    if ev is None:
1739        raise Exception("Expected CSA handling after keys installation")
1740
1741def test_ap_pmf_eapol_logoff(dev, apdev):
1742    """WPA2-EAP AP with PMF required and EAPOL-Logoff"""
1743    ssid = "test-pmf-required-eap"
1744    params = hostapd.wpa2_eap_params(ssid=ssid)
1745    params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
1746    params["ieee80211w"] = "2"
1747    hapd = hostapd.add_ap(apdev[0], params)
1748    hapd.request("SET ext_eapol_frame_io 1")
1749
1750    dev[0].set("ext_eapol_frame_io", "1")
1751    dev[0].connect("test-pmf-required-eap", key_mgmt="WPA-EAP-SHA256",
1752                   ieee80211w="2", eap="PSK", identity="psk.user@example.com",
1753                   password_hex="0123456789abcdef0123456789abcdef",
1754                   scan_freq="2412", wait_connect=False)
1755
1756    # EAP-Request/Identity
1757    proxy_msg(hapd, dev[0])
1758
1759    # EAP-Response/Identity RX
1760    msg = rx_msg(dev[0])
1761    # EAPOL-Logoff TX (inject)
1762    tx_msg(dev[0], hapd, "02020000")
1763    # EAP-Response/Identity TX (proxy previously received)
1764    tx_msg(dev[0], hapd, msg)
1765
1766    # Verify that the 10 ms timeout for deauthenticating STA after EAP-Failure
1767    # is not used in this sequence with the EAPOL-Logoff message before the
1768    # successful authentication.
1769    ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.03)
1770    if ev:
1771        raise Exception("Unexpected disconnection")
1772
1773    # EAP-Request/Identity
1774    proxy_msg(hapd, dev[0])
1775    # EAP-Response/Identity
1776    proxy_msg(dev[0], hapd)
1777
1778    # EAP-PSK
1779    proxy_msg(hapd, dev[0])
1780    proxy_msg(dev[0], hapd)
1781    proxy_msg(hapd, dev[0])
1782    proxy_msg(dev[0], hapd)
1783    proxy_msg(hapd, dev[0])
1784
1785    # 4-way handshake
1786    proxy_msg(hapd, dev[0])
1787    proxy_msg(dev[0], hapd)
1788    proxy_msg(hapd, dev[0])
1789    proxy_msg(dev[0], hapd)
1790
1791    ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
1792    if ev is None:
1793        raise Exception("4-way handshake did not complete successfully")
1794    dev[0].wait_connected(timeout=0.1)
1795    hapd.wait_sta()
1796
1797    # Verify that the STA gets disconnected when the EAPOL-Logoff message is
1798    # sent after successful authentication.
1799
1800    # EAPOL-Logoff TX (inject)
1801    tx_msg(dev[0], hapd, "02020000")
1802    hapd.request("SET ext_eapol_frame_io 0")
1803    dev[0].set("ext_eapol_frame_io", "0")
1804    ev = dev[0].wait_disconnected(timeout=1)
1805    if "reason=23" not in ev:
1806        raise Exception("Unexpected disconnection reason: " + ev)
1807