1 # Protected management frames tests
2 # Copyright (c) 2013-2024, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 from remotehost import remote_compatible
8 import binascii
9 import os
10 import time
11 import logging
12 logger = logging.getLogger()
13 
14 import hwsim_utils
15 import hostapd
16 from utils import *
17 from wlantest import Wlantest
18 from wpasupplicant import WpaSupplicant
19 from test_eap_proto import rx_msg, tx_msg, proxy_msg
20 
21 @remote_compatible
22 def test_ap_pmf_required(dev, apdev):
23     """WPA2-PSK AP with PMF required"""
24     ssid = "test-pmf-required"
25     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
26     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
27     params["ieee80211w"] = "2"
28     hapd = hostapd.add_ap(apdev[0], params)
29     Wlantest.setup(hapd)
30     wt = Wlantest()
31     wt.flush()
32     wt.add_passphrase("12345678")
33     key_mgmt = hapd.get_config()['key_mgmt']
34     if key_mgmt.split(' ')[0] != "WPA-PSK-SHA256":
35         raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
36     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
37                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
38                    scan_freq="2412")
39     if "[WPA2-PSK-SHA256-CCMP]" not in dev[0].request("SCAN_RESULTS"):
40         raise Exception("Scan results missing RSN element info")
41     hwsim_utils.test_connectivity(dev[0], hapd)
42     dev[1].connect(ssid, psk="12345678", ieee80211w="2",
43                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
44                    scan_freq="2412")
45     hwsim_utils.test_connectivity(dev[1], hapd)
46     if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
47         raise Exception("SA_QUERY failed")
48     if "OK" not in hapd.request("SA_QUERY " + dev[1].own_addr()):
49         raise Exception("SA_QUERY failed")
50     if "FAIL" not in hapd.request("SA_QUERY foo"):
51         raise Exception("Invalid SA_QUERY accepted")
52     wt.require_ap_pmf_mandatory(apdev[0]['bssid'])
53     wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
54     wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
55     time.sleep(0.1)
56     if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
57                           dev[0].p2p_interface_addr()) < 1:
58         raise Exception("STA did not reply to SA Query")
59     if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
60                           dev[1].p2p_interface_addr()) < 1:
61         raise Exception("STA did not reply to SA Query")
62 
63 def start_ocv_ap(apdev):
64     ssid = "test-pmf-required"
65     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
66     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
67     params["ieee80211w"] = "2"
68     params["ocv"] = "1"
69     try:
70         hapd = hostapd.add_ap(apdev, params)
71     except Exception as e:
72         if "Failed to set hostapd parameter ocv" in str(e):
73             raise HwsimSkip("OCV not supported")
74         raise
75 
76     Wlantest.setup(hapd)
77     wt = Wlantest()
78     wt.flush()
79     wt.add_passphrase("12345678")
80 
81     return hapd, ssid, wt
82 
83 @remote_compatible
84 def test_ocv_sa_query(dev, apdev):
85     """Test SA Query with OCV"""
86     hapd, ssid, wt = start_ocv_ap(apdev[0])
87     dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
88                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
89                    scan_freq="2412")
90     hapd.wait_sta() # wait so we can actually request SA_QUERY
91     # Test that client can handle SA Query with OCI element
92     if "OK" not in hapd.request("SA_QUERY " + dev[0].own_addr()):
93         raise Exception("SA_QUERY failed")
94     ev = hapd.wait_event(["OCV-FAILURE"], timeout=0.1)
95     if ev:
96         raise Exception("Unexpected OCV failure reported")
97     if wt.get_sta_counter("valid_saqueryresp_tx", apdev[0]['bssid'],
98                           dev[0].own_addr()) < 1:
99         raise Exception("STA did not reply to SA Query")
100 
101     # Test that AP can handle SA Query with OCI element
102     if "OK" not in dev[0].request("UNPROT_DEAUTH"):
103         raise Exception("Triggering SA Query from the STA failed")
104     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=3)
105     if ev is not None:
106         raise Exception("SA Query from the STA failed")
107 
108 @remote_compatible
109 def test_ocv_sa_query_csa(dev, apdev):
110     """Test SA Query with OCV after channel switch"""
111     hapd, ssid, wt = start_ocv_ap(apdev[0])
112     dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
113                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
114                    scan_freq="2412")
115 
116     hapd.request("CHAN_SWITCH 5 2437 ht")
117     time.sleep(1)
118     if wt.get_sta_counter("valid_saqueryreq_tx", apdev[0]['bssid'],
119                           dev[0].own_addr()) < 1:
120         raise Exception("STA did not start SA Query after channel switch")
121 
122     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=16)
123     if ev is not None:
124         raise Exception("Unexpected disconnection")
125 
126 def test_ocv_sa_query_csa_no_resp(dev, apdev):
127     """Test SA Query with OCV after channel switch getting no response"""
128     hapd, ssid, wt = start_ocv_ap(apdev[0])
129     dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
130                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
131                    scan_freq="2412")
132 
133     hapd.set("ext_mgmt_frame_handling", "1")
134     hapd.request("CHAN_SWITCH 5 2437 ht")
135     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
136     if ev is None:
137         raise Exception("Disconnection after CSA not reported")
138     if "locally_generated=1" not in ev:
139         raise Exception("Unexpectedly disconnected by AP: " + ev)
140 
141 def test_ocv_sa_query_csa_missing(dev, apdev):
142     """Test SA Query with OCV missing after channel switch"""
143     hapd, ssid, wt = start_ocv_ap(apdev[0])
144     dev[0].connect(ssid, psk="12345678", ieee80211w="1", ocv="1",
145                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
146                    scan_freq="2412")
147     hapd.wait_sta() # wait so kernel won't drop deauth frame (MFP)
148     hapd.set("ext_mgmt_frame_handling", "1")
149     dev[0].request("DISCONNECT")
150     dev[0].wait_disconnected()
151     ev = hapd.wait_event(['MGMT-RX'], timeout=5)
152     if ev is None:
153         raise Exception("Deauthentication frame RX not reported")
154     hapd.set("ext_mgmt_frame_handling", "0")
155     hapd.request("CHAN_SWITCH 5 2437 ht")
156     ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=20)
157     if ev is None:
158         raise Exception("No disconnection event received from hostapd")
159 
160 @remote_compatible
161 def test_ap_pmf_optional(dev, apdev):
162     """WPA2-PSK AP with PMF optional"""
163     ssid = "test-pmf-optional"
164     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
165     params["wpa_key_mgmt"] = "WPA-PSK"
166     params["ieee80211w"] = "1"
167     hapd = hostapd.add_ap(apdev[0], params)
168     Wlantest.setup(hapd)
169     wt = Wlantest()
170     wt.flush()
171     wt.add_passphrase("12345678")
172     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
173                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
174                    scan_freq="2412")
175     hwsim_utils.test_connectivity(dev[0], hapd)
176     dev[1].connect(ssid, psk="12345678", ieee80211w="2",
177                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
178                    scan_freq="2412")
179     hwsim_utils.test_connectivity(dev[1], hapd)
180     wt.require_ap_pmf_optional(apdev[0]['bssid'])
181     wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
182     wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
183 
184 @remote_compatible
185 def test_ap_pmf_optional_2akm(dev, apdev):
186     """WPA2-PSK AP with PMF optional (2 AKMs)"""
187     ssid = "test-pmf-optional-2akm"
188     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
189     params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256"
190     params["ieee80211w"] = "1"
191     hapd = hostapd.add_ap(apdev[0], params)
192     Wlantest.setup(hapd)
193     wt = Wlantest()
194     wt.flush()
195     wt.add_passphrase("12345678")
196     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
197                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
198                    scan_freq="2412")
199     hwsim_utils.test_connectivity(dev[0], hapd)
200     dev[1].connect(ssid, psk="12345678", ieee80211w="2",
201                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
202                    scan_freq="2412")
203     hwsim_utils.test_connectivity(dev[1], hapd)
204     wt.require_ap_pmf_optional(apdev[0]['bssid'])
205     wt.require_sta_pmf(apdev[0]['bssid'], dev[0].p2p_interface_addr())
206     wt.require_sta_key_mgmt(apdev[0]['bssid'], dev[0].p2p_interface_addr(),
207                             "PSK-SHA256")
208     wt.require_sta_pmf_mandatory(apdev[0]['bssid'], dev[1].p2p_interface_addr())
209     wt.require_sta_key_mgmt(apdev[0]['bssid'], dev[1].p2p_interface_addr(),
210                             "PSK-SHA256")
211 
212 @remote_compatible
213 def test_ap_pmf_negative(dev, apdev):
214     """WPA2-PSK AP without PMF (negative test)"""
215     ssid = "test-pmf-negative"
216     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
217     hapd = hostapd.add_ap(apdev[0], params)
218     Wlantest.setup(hapd)
219     wt = Wlantest()
220     wt.flush()
221     wt.add_passphrase("12345678")
222     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
223                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
224                    scan_freq="2412")
225     hwsim_utils.test_connectivity(dev[0], hapd)
226     try:
227         dev[1].connect(ssid, psk="12345678", ieee80211w="2",
228                        key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
229                        scan_freq="2412")
230         hwsim_utils.test_connectivity(dev[1], hapd)
231         raise Exception("PMF required STA connected to no PMF AP")
232     except Exception as e:
233         logger.debug("Ignore expected exception: " + str(e))
234     wt.require_ap_no_pmf(apdev[0]['bssid'])
235 
236 @remote_compatible
237 def test_ap_pmf_assoc_comeback(dev, apdev):
238     """WPA2-PSK AP with PMF association comeback"""
239     run_ap_pmf_assoc_comeback(dev, apdev)
240 
241 def test_ap_pmf_assoc_comeback_10000tu(dev, apdev):
242     """WPA2-PSK AP with PMF association comeback (10000 TUs)"""
243     run_ap_pmf_assoc_comeback(dev, apdev, comeback=10000)
244 
245 def run_ap_pmf_assoc_comeback(dev, apdev, comeback=None):
246     ssid = "assoc-comeback"
247     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
248     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
249     params["ieee80211w"] = "2"
250     if comeback is not None:
251         params["assoc_sa_query_max_timeout"] = str(comeback)
252     hapd = hostapd.add_ap(apdev[0], params)
253     Wlantest.setup(hapd)
254     wt = Wlantest()
255     wt.flush()
256     wt.add_passphrase("12345678")
257     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
258                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
259                    scan_freq="2412")
260     hapd.wait_sta(wait_4way_hs=True)
261     hapd.set("ext_mgmt_frame_handling", "1")
262     dev[0].request("DISCONNECT")
263     dev[0].wait_disconnected(timeout=10)
264     ev = hapd.wait_event(["MGMT-RX"], timeout=1)
265     if ev is None:
266         raise Exception("Deauthentication frame RX not reported")
267     hapd.set("ext_mgmt_frame_handling", "0")
268     dev[0].request("REASSOCIATE")
269     dev[0].wait_connected(timeout=20, error="Timeout on re-connection")
270     hapd.wait_4way_hs()
271     if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
272                           dev[0].p2p_interface_addr()) < 1:
273         raise Exception("AP did not use association comeback request")
274 
275 def test_ap_pmf_assoc_comeback_in_wpas(dev, apdev):
276     """WPA2-PSK AP with PMF association comeback in wpa_supplicant"""
277     ssid = "assoc-comeback"
278     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
279     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
280     params["ieee80211w"] = "2"
281     params["test_assoc_comeback_type"] = "255"
282     hapd = hostapd.add_ap(apdev[0], params)
283 
284     dev[0].set("test_assoc_comeback_type", "255")
285     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
286                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
287                    scan_freq="2412")
288     hapd.wait_sta(wait_4way_hs=True)
289     hapd.set("ext_mgmt_frame_handling", "1")
290     dev[0].request("DISCONNECT")
291     dev[0].wait_disconnected(timeout=10)
292     ev = hapd.wait_event(["MGMT-RX"], timeout=1)
293     if ev is None:
294         raise Exception("Deauthentication frame RX not reported")
295     hapd.set("ext_mgmt_frame_handling", "0")
296     dev[0].request("REASSOCIATE")
297     ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
298     if ev is None or "status_code=30" not in ev:
299         raise Exception("Association comeback not requested")
300     ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
301                             "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
302     if ev is None:
303         raise Exception("Association not reported")
304     if "CTRL-EVENT-ASSOC-REJECT" in ev:
305         raise Exception("Unexpected association rejection: " + ev)
306     hapd.wait_4way_hs()
307 
308     hapd.set("ext_mgmt_frame_handling", "1")
309     dev[0].request("DISCONNECT")
310     dev[0].wait_disconnected(timeout=10)
311     ev = hapd.wait_event(["MGMT-RX"], timeout=1)
312     if ev is None:
313         raise Exception("Deauthentication frame RX not reported")
314     hapd.set("ext_mgmt_frame_handling", "0")
315     dev[0].set("test_assoc_comeback_type", "254")
316     dev[0].request("REASSOCIATE")
317     ev = dev[0].wait_event(["CTRL-EVENT-ASSOC-REJECT"], timeout=10)
318     if ev is None or "status_code=30" not in ev:
319         raise Exception("Association comeback not requested")
320     ev = dev[0].wait_event(["SME: Temporary assoc reject: missing association comeback time",
321                             "CTRL-EVENT-CONNECTED",
322                             "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
323     if ev is None:
324         raise Exception("Association not reported")
325     if "SME: Temporary assoc reject: missing association comeback time" not in ev:
326         raise Exception("Unexpected result: " + ev)
327     dev[0].wait_connected(timeout=20, error="Timeout on re-connection with misbehaving AP")
328     hapd.wait_4way_hs()
329 
330 @remote_compatible
331 def test_ap_pmf_assoc_comeback2(dev, apdev):
332     """WPA2-PSK AP with PMF association comeback (using DROP_SA)"""
333     ssid = "assoc-comeback"
334     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
335     params["wpa_key_mgmt"] = "WPA-PSK"
336     params["ieee80211w"] = "1"
337     hapd = hostapd.add_ap(apdev[0], params)
338     Wlantest.setup(hapd)
339     wt = Wlantest()
340     wt.flush()
341     wt.add_passphrase("12345678")
342     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
343                    key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
344     if "OK" not in dev[0].request("DROP_SA"):
345         raise Exception("DROP_SA failed")
346     dev[0].request("REASSOCIATE")
347     dev[0].wait_connected(timeout=10, error="Timeout on re-connection")
348     if wt.get_sta_counter("reassocresp_comeback", apdev[0]['bssid'],
349                           dev[0].p2p_interface_addr()) < 1:
350         raise Exception("AP did not use reassociation comeback request")
351 
352 @remote_compatible
353 def test_ap_pmf_assoc_comeback3(dev, apdev):
354     """WPA2-PSK AP with PMF association comeback (using radio_disabled)"""
355     drv_flags = dev[0].get_driver_status_field("capa.flags")
356     if int(drv_flags, 0) & 0x20 == 0:
357         raise HwsimSkip("Driver does not support radio_disabled")
358     ssid = "assoc-comeback"
359     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
360     params["wpa_key_mgmt"] = "WPA-PSK"
361     params["ieee80211w"] = "1"
362     hapd = hostapd.add_ap(apdev[0], params)
363     Wlantest.setup(hapd)
364     wt = Wlantest()
365     wt.flush()
366     wt.add_passphrase("12345678")
367     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
368                    key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
369     dev[0].set("radio_disabled", "1")
370     dev[0].set("radio_disabled", "0")
371     dev[0].request("REASSOCIATE")
372     dev[0].wait_connected(timeout=10, error="Timeout on re-connection")
373     if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
374                           dev[0].own_addr()) < 1:
375         raise Exception("AP did not use reassociation comeback request")
376 
377 @remote_compatible
378 def test_ap_pmf_assoc_comeback_wps(dev, apdev):
379     """WPA2-PSK AP with PMF association comeback (WPS)"""
380     ssid = "assoc-comeback"
381     appin = "12345670"
382     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
383     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
384     params["ieee80211w"] = "2"
385     params["eap_server"] = "1"
386     params["wps_state"] = "2"
387     params["ap_pin"] = appin
388     hapd = hostapd.add_ap(apdev[0], params)
389     Wlantest.setup(hapd)
390     wt = Wlantest()
391     wt.flush()
392     wt.add_passphrase("12345678")
393     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
394                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
395                    scan_freq="2412")
396     hapd.wait_sta(wait_4way_hs=True)
397     hapd.set("ext_mgmt_frame_handling", "1")
398     dev[0].request("DISCONNECT")
399     dev[0].wait_disconnected(timeout=10)
400     ev = hapd.wait_event(["MGMT-RX"], timeout=1)
401     if ev is None:
402         raise Exception("Deauthentication frame RX not reported")
403     hapd.set("ext_mgmt_frame_handling", "0")
404     dev[0].wps_reg(apdev[0]['bssid'], appin)
405     hapd.wait_4way_hs()
406     if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
407                           dev[0].p2p_interface_addr()) < 1:
408         raise Exception("AP did not use association comeback request")
409 
410 def test_ap_pmf_ap_dropping_sa(dev, apdev):
411     """WPA2-PSK PMF AP dropping SA"""
412     ssid = "pmf"
413     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
414     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
415     params["ieee80211w"] = "2"
416     hapd = hostapd.add_ap(apdev[0], params)
417     bssid = hapd.own_addr()
418     Wlantest.setup(hapd)
419     wt = Wlantest()
420     wt.flush()
421     wt.add_passphrase("12345678")
422     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
423                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
424     addr0 = dev[0].own_addr()
425     dev[0].dump_monitor()
426     hapd.wait_sta()
427     # Drop SA and association at the AP locally without notifying the STA. This
428     # results in the STA getting unprotected Deauthentication frames when trying
429     # to transmit the next Class 3 frame.
430     if "OK" not in hapd.request("DEAUTHENTICATE " + addr0 + " tx=0"):
431         raise Exception("DEAUTHENTICATE command failed")
432     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
433     if ev is not None:
434         raise Exception("Unexpected disconnection event after DEAUTHENTICATE tx=0: " + ev)
435     dev[0].request("DATA_TEST_CONFIG 1")
436     dev[0].request("DATA_TEST_TX " + bssid + " " + addr0)
437     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
438     dev[0].request("DATA_TEST_CONFIG 0")
439     if ev is None or "locally_generated=1" not in ev:
440         raise Exception("Locally generated disconnection not reported")
441 
442 def test_ap_pmf_known_sta_id(dev, apdev):
443     """WPA2-PSK AP and Known STA Identification to avoid association comeback"""
444     ssid = "assoc-comeback"
445     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
446     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
447     params["ieee80211w"] = "2"
448     params["known_sta_identification"] = "1"
449     hapd = hostapd.add_ap(apdev[0], params)
450     Wlantest.setup(hapd)
451     wt = Wlantest()
452     wt.flush()
453     wt.add_passphrase("12345678")
454     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
455                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
456                    scan_freq="2412")
457     hapd.wait_sta(wait_4way_hs=True)
458     hapd.set("ext_mgmt_frame_handling", "1")
459     dev[0].request("DISCONNECT")
460     dev[0].wait_disconnected(timeout=10)
461     ev = hapd.wait_event(["MGMT-RX"], timeout=1)
462     if ev is None:
463         raise Exception("Deauthentication frame RX not reported")
464     hapd.set("ext_mgmt_frame_handling", "0")
465     dev[0].request("REASSOCIATE")
466     dev[0].wait_connected(timeout=20, error="Timeout on re-connection")
467     hapd.wait_4way_hs()
468     if wt.get_sta_counter("assocresp_comeback", apdev[0]['bssid'],
469                           dev[0].own_addr()) > 0:
470         raise Exception("AP used association comeback request")
471 
472 def test_ap_pmf_valid_broadcast_deauth(dev, apdev):
473     """WPA2-PSK PMF AP sending valid broadcast deauth without dropping SA"""
474     run_ap_pmf_valid(dev, apdev, False, True)
475 
476 def test_ap_pmf_valid_broadcast_disassoc(dev, apdev):
477     """WPA2-PSK PMF AP sending valid broadcast disassoc without dropping SA"""
478     run_ap_pmf_valid(dev, apdev, True, True)
479 
480 def test_ap_pmf_valid_unicast_deauth(dev, apdev):
481     """WPA2-PSK PMF AP sending valid unicast deauth without dropping SA"""
482     run_ap_pmf_valid(dev, apdev, False, False)
483 
484 def test_ap_pmf_valid_unicast_disassoc(dev, apdev):
485     """WPA2-PSK PMF AP sending valid unicast disassoc without dropping SA"""
486     run_ap_pmf_valid(dev, apdev, True, False)
487 
488 def run_ap_pmf_valid(dev, apdev, disassociate, broadcast):
489     ssid = "pmf"
490     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
491     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
492     params["ieee80211w"] = "2"
493     hapd = hostapd.add_ap(apdev[0], params)
494     bssid = hapd.own_addr()
495     Wlantest.setup(hapd)
496     wt = Wlantest()
497     wt.flush()
498     wt.add_passphrase("12345678")
499     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
500                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
501     addr0 = dev[0].own_addr()
502     dev[0].dump_monitor()
503     hapd.wait_sta()
504     cmd = "DISASSOCIATE " if disassociate else "DEAUTHENTICATE "
505     cmd += "ff:ff:ff:ff:ff:ff" if broadcast else addr0
506     cmd += " test=1"
507     if "OK" not in hapd.request(cmd):
508         raise Exception("hostapd command failed")
509     sta = hapd.get_sta(addr0)
510     if not sta:
511         raise Exception("STA entry lost")
512     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5)
513     if ev is None:
514         raise Exception("Disconnection not reported")
515     if "locally_generated=1" in ev:
516         raise Exception("Unexpected locally generated disconnection")
517 
518     # Wait for SA Query procedure to fail and association comeback to succeed
519     dev[0].wait_connected()
520 
521 def start_wpas_ap(ssid):
522     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
523     wpas.interface_add("wlan5")
524     id = wpas.add_network()
525     wpas.set_network(id, "mode", "2")
526     wpas.set_network_quoted(id, "ssid", ssid)
527     wpas.set_network(id, "proto", "WPA2")
528     wpas.set_network(id, "key_mgmt", "WPA-PSK-SHA256")
529     wpas.set_network(id, "ieee80211w", "2")
530     wpas.set_network_quoted(id, "psk", "12345678")
531     wpas.set_network(id, "pairwise", "CCMP")
532     wpas.set_network(id, "group", "CCMP")
533     wpas.set_network(id, "frequency", "2412")
534     wpas.set_network(id, "scan_freq", "2412")
535     wpas.connect_network(id)
536     wpas.dump_monitor()
537     return wpas
538 
539 def test_ap_pmf_sta_sa_query(dev, apdev):
540     """WPA2-PSK AP with station using SA Query"""
541     ssid = "assoc-comeback"
542     addr = dev[0].own_addr()
543 
544     wpas = start_wpas_ap(ssid)
545     bssid = wpas.own_addr()
546 
547     Wlantest.setup(wpas)
548     wt = Wlantest()
549     wt.flush()
550     wt.add_passphrase("12345678")
551 
552     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
553                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
554                    scan_freq="2412")
555     wpas.dump_monitor()
556     wpas.request("DEAUTHENTICATE " + addr + " test=0")
557     wpas.dump_monitor()
558     wpas.request("DISASSOCIATE " + addr + " test=0")
559     wpas.dump_monitor()
560     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
561     if ev is not None:
562         raise Exception("Unexpected disconnection")
563 
564     wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
565     wpas.dump_monitor()
566     wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
567     wpas.dump_monitor()
568     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
569     if ev is not None:
570         raise Exception("Unexpected disconnection")
571     if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
572         raise Exception("STA did not send SA Query")
573     if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) < 1:
574         raise Exception("AP did not reply to SA Query")
575     wpas.dump_monitor()
576 
577 def test_ap_pmf_sta_sa_query_no_response(dev, apdev):
578     """WPA2-PSK AP with station using SA Query and getting no response"""
579     ssid = "assoc-comeback"
580     addr = dev[0].own_addr()
581 
582     wpas = start_wpas_ap(ssid)
583     bssid = wpas.own_addr()
584 
585     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
586                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
587                    scan_freq="2412")
588     wpas.dump_monitor()
589     wpas.request("DEAUTHENTICATE " + addr + " test=0")
590     wpas.dump_monitor()
591     wpas.request("DISASSOCIATE " + addr + " test=0")
592     wpas.dump_monitor()
593     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
594     if ev is not None:
595         raise Exception("Unexpected disconnection")
596 
597     wpas.request("SET ext_mgmt_frame_handling 1")
598     wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
599     wpas.dump_monitor()
600     wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
601     wpas.dump_monitor()
602     dev[0].wait_disconnected()
603     wpas.dump_monitor()
604     wpas.request("SET ext_mgmt_frame_handling 0")
605     dev[0].wait_connected()
606     wpas.dump_monitor()
607 
608 def test_ap_pmf_sta_unprot_deauth_burst(dev, apdev):
609     """WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames"""
610     ssid = "deauth-attack"
611     addr = dev[0].own_addr()
612 
613     wpas = start_wpas_ap(ssid)
614     bssid = wpas.own_addr()
615 
616     Wlantest.setup(wpas)
617     wt = Wlantest()
618     wt.flush()
619     wt.add_passphrase("12345678")
620 
621     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
622                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
623                    scan_freq="2412")
624 
625     for i in range(0, 10):
626         wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
627         wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
628     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
629     if ev is not None:
630         raise Exception("Unexpected disconnection")
631     num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
632     num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
633     if num_req < 1:
634         raise Exception("STA did not send SA Query")
635     if num_resp < 1:
636         raise Exception("AP did not reply to SA Query")
637     if num_req > 1:
638         raise Exception("STA initiated too many SA Query procedures (%d)" % num_req)
639 
640     time.sleep(10)
641     for i in range(0, 5):
642         wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
643         wpas.request("DISASSOCIATE " + addr + " reason=7 test=0")
644     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
645     if ev is not None:
646         raise Exception("Unexpected disconnection")
647     num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
648     num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
649     if num_req != 2 or num_resp != 2:
650         raise Exception("Unexpected number of SA Query procedures (req=%d resp=%d)" % (num_req, num_resp))
651 
652 def test_ap_pmf_sta_sa_query_oom(dev, apdev):
653     """WPA2-PSK AP with station using SA Query (OOM)"""
654     ssid = "assoc-comeback"
655     addr = dev[0].own_addr()
656     wpas = start_wpas_ap(ssid)
657     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
658                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
659                    scan_freq="2412")
660     with alloc_fail(dev[0], 1, "=sme_sa_query_timer"):
661         wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
662         wait_fail_trigger(dev[0], "GET_ALLOC_FAIL")
663     dev[0].request("DISCONNECT")
664     wpas.request("DISCONNECT")
665     dev[0].wait_disconnected()
666 
667 def test_ap_pmf_sta_sa_query_local_failure(dev, apdev):
668     """WPA2-PSK AP with station using SA Query (local failure)"""
669     ssid = "assoc-comeback"
670     addr = dev[0].own_addr()
671     wpas = start_wpas_ap(ssid)
672     dev[0].connect(ssid, psk="12345678", ieee80211w="1",
673                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
674                    scan_freq="2412")
675     with fail_test(dev[0], 1, "os_get_random;sme_sa_query_timer"):
676         wpas.request("DEAUTHENTICATE " + addr + " reason=6 test=0")
677         wait_fail_trigger(dev[0], "GET_FAIL")
678     dev[0].request("DISCONNECT")
679     wpas.request("DISCONNECT")
680     dev[0].wait_disconnected()
681 
682 def test_ap_pmf_sta_sa_query_hostapd(dev, apdev):
683     """WPA2-PSK AP with station using SA Query (hostapd)"""
684     ssid = "assoc-comeback"
685     passphrase = "12345678"
686     addr = dev[0].own_addr()
687 
688     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
689                                  wpa_key_mgmt="WPA-PSK-SHA256",
690                                  ieee80211w="2")
691     hapd = hostapd.add_ap(apdev[0], params)
692     bssid = hapd.own_addr()
693 
694     Wlantest.setup(hapd)
695     wt = Wlantest()
696     wt.flush()
697     wt.add_passphrase("12345678")
698 
699     dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
700                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
701                    scan_freq="2412")
702     hapd.wait_sta()
703     if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " test=0") or \
704        "OK" not in hapd.request("DISASSOCIATE " + addr + " test=0"):
705         raise Exception("Failed to send unprotected disconnection messages")
706     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
707     if ev is not None:
708         raise Exception("Unexpected disconnection")
709 
710     if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
711        "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
712         raise Exception("Failed to send unprotected disconnection messages (2)")
713     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
714     if ev is not None:
715         raise Exception("Unexpected disconnection")
716     if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
717         raise Exception("STA did not send SA Query")
718     if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) < 1:
719         raise Exception("AP did not reply to SA Query")
720 
721 def test_ap_pmf_sta_sa_query_no_response_hostapd(dev, apdev):
722     """WPA2-PSK AP with station using SA Query and getting no response (hostapd)"""
723     ssid = "assoc-comeback"
724     passphrase = "12345678"
725     addr = dev[0].own_addr()
726 
727     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
728                                  wpa_key_mgmt="WPA-PSK-SHA256",
729                                  ieee80211w="2")
730     hapd = hostapd.add_ap(apdev[0], params)
731     bssid = hapd.own_addr()
732 
733     Wlantest.setup(hapd)
734     wt = Wlantest()
735     wt.flush()
736     wt.add_passphrase("12345678")
737 
738     dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
739                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
740                    scan_freq="2412")
741     hapd.wait_sta()
742     hapd.set("ext_mgmt_frame_handling", "1")
743     if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
744        "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
745         raise Exception("Failed to send unprotected disconnection messages")
746     dev[0].wait_disconnected()
747     hapd.set("ext_mgmt_frame_handling", "0")
748     if wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr) < 1:
749         raise Exception("STA did not send SA Query")
750     if wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr) > 0:
751         raise Exception("AP replied to SA Query")
752     dev[0].wait_connected()
753 
754 def test_ap_pmf_sta_unprot_deauth_burst_hostapd(dev, apdev):
755     """WPA2-PSK AP with station receiving burst of unprotected Deauthentication frames (hostapd)"""
756     ssid = "deauth-attack"
757     passphrase = "12345678"
758     addr = dev[0].own_addr()
759 
760     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
761                                  wpa_key_mgmt="WPA-PSK-SHA256",
762                                  ieee80211w="2")
763     hapd = hostapd.add_ap(apdev[0], params)
764     bssid = hapd.own_addr()
765 
766     Wlantest.setup(hapd)
767     wt = Wlantest()
768     wt.flush()
769     wt.add_passphrase("12345678")
770 
771     dev[0].connect(ssid, psk=passphrase, ieee80211w="2",
772                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
773                    scan_freq="2412")
774     hapd.wait_sta()
775     for i in range(10):
776         if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
777            "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
778             raise Exception("Failed to send unprotected disconnection messages")
779     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
780     if ev is not None:
781         raise Exception("Unexpected disconnection")
782     num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
783     num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
784     if num_req < 1:
785         raise Exception("STA did not send SA Query")
786     if num_resp < 1:
787         raise Exception("AP did not reply to SA Query")
788     if num_req > 1:
789         raise Exception("STA initiated too many SA Query procedures (%d)" % num_req)
790 
791     time.sleep(10)
792     for i in range(5):
793         if "OK" not in hapd.request("DEAUTHENTICATE " + addr + " reason=6 test=0") or \
794            "OK" not in hapd.request("DISASSOCIATE " + addr + " reason=7 test=0"):
795             raise Exception("Failed to send unprotected disconnection messages")
796     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1)
797     if ev is not None:
798         raise Exception("Unexpected disconnection")
799     num_req = wt.get_sta_counter("valid_saqueryreq_tx", bssid, addr)
800     num_resp = wt.get_sta_counter("valid_saqueryresp_rx", bssid, addr)
801     if num_req != 2 or num_resp != 2:
802         raise Exception("Unexpected number of SA Query procedures (req=%d resp=%d)" % (num_req, num_resp))
803 
804 def test_ap_pmf_required_eap(dev, apdev):
805     """WPA2-EAP AP with PMF required"""
806     ssid = "test-pmf-required-eap"
807     params = hostapd.wpa2_eap_params(ssid=ssid)
808     params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
809     params["ieee80211w"] = "2"
810     hapd = hostapd.add_ap(apdev[0], params)
811     key_mgmt = hapd.get_config()['key_mgmt']
812     if key_mgmt.split(' ')[0] != "WPA-EAP-SHA256":
813         raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
814     dev[0].connect("test-pmf-required-eap", key_mgmt="WPA-EAP-SHA256",
815                    ieee80211w="2", eap="PSK", identity="psk.user@example.com",
816                    password_hex="0123456789abcdef0123456789abcdef",
817                    scan_freq="2412")
818     dev[1].connect("test-pmf-required-eap", key_mgmt="WPA-EAP WPA-EAP-SHA256",
819                    ieee80211w="1", eap="PSK", identity="psk.user@example.com",
820                    password_hex="0123456789abcdef0123456789abcdef",
821                    scan_freq="2412")
822 
823 def test_ap_pmf_optional_eap(dev, apdev):
824     """WPA2EAP AP with PMF optional"""
825     params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
826     params["ieee80211w"] = "1"
827     hapd = hostapd.add_ap(apdev[0], params)
828     dev[0].connect("test-wpa2-eap", key_mgmt="WPA-EAP", eap="TTLS",
829                    identity="pap user", anonymous_identity="ttls",
830                    password="password",
831                    ca_cert="auth_serv/ca.pem", phase2="auth=PAP",
832                    ieee80211w="1", scan_freq="2412")
833     dev[1].connect("test-wpa2-eap", key_mgmt="WPA-EAP WPA-EAP-SHA256",
834                    eap="TTLS", identity="pap user", anonymous_identity="ttls",
835                    password="password",
836                    ca_cert="auth_serv/ca.pem", phase2="auth=PAP",
837                    ieee80211w="2", scan_freq="2412")
838 
839 @remote_compatible
840 def test_ap_pmf_required_sha1(dev, apdev):
841     """WPA2-PSK AP with PMF required with SHA1 AKM"""
842     ssid = "test-pmf-required-sha1"
843     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
844     params["wpa_key_mgmt"] = "WPA-PSK"
845     params["ieee80211w"] = "2"
846     hapd = hostapd.add_ap(apdev[0], params)
847     Wlantest.setup(hapd)
848     wt = Wlantest()
849     wt.flush()
850     wt.add_passphrase("12345678")
851     key_mgmt = hapd.get_config()['key_mgmt']
852     if key_mgmt.split(' ')[0] != "WPA-PSK":
853         raise Exception("Unexpected GET_CONFIG(key_mgmt): " + key_mgmt)
854     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
855                    key_mgmt="WPA-PSK", proto="WPA2", scan_freq="2412")
856     if "[WPA2-PSK-CCMP]" not in dev[0].request("SCAN_RESULTS"):
857         raise Exception("Scan results missing RSN element info")
858     hwsim_utils.test_connectivity(dev[0], hapd)
859 
860 @remote_compatible
861 def test_ap_pmf_toggle(dev, apdev):
862     """WPA2-PSK AP with PMF optional and changing PMF on reassociation"""
863     try:
864         _test_ap_pmf_toggle(dev, apdev)
865     finally:
866         dev[0].request("SET reassoc_same_bss_optim 0")
867 
868 def _test_ap_pmf_toggle(dev, apdev):
869     ssid = "test-pmf-optional"
870     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
871     params["wpa_key_mgmt"] = "WPA-PSK"
872     params["ieee80211w"] = "1"
873     params["assoc_sa_query_max_timeout"] = "1"
874     params["assoc_sa_query_retry_timeout"] = "1"
875     hapd = hostapd.add_ap(apdev[0], params)
876     Wlantest.setup(hapd)
877     wt = Wlantest()
878     wt.flush()
879     wt.add_passphrase("12345678")
880     bssid = apdev[0]['bssid']
881     addr = dev[0].own_addr()
882     dev[0].request("SET reassoc_same_bss_optim 1")
883     id = dev[0].connect(ssid, psk="12345678", ieee80211w="1",
884                         key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
885                         scan_freq="2412")
886     wt.require_ap_pmf_optional(bssid)
887     wt.require_sta_pmf(bssid, addr)
888     sta = hapd.get_sta(addr)
889     if '[MFP]' not in sta['flags']:
890         raise Exception("MFP flag not present for STA")
891 
892     dev[0].set_network(id, "ieee80211w", "0")
893     dev[0].request("REASSOCIATE")
894     dev[0].wait_connected()
895     wt.require_sta_no_pmf(bssid, addr)
896     sta = hapd.get_sta(addr)
897     if '[MFP]' in sta['flags']:
898         raise Exception("MFP flag unexpectedly present for STA")
899     err, data = hapd.cmd_execute(['iw', 'dev', apdev[0]['ifname'], 'station',
900                                   'get', addr])
901     if "yes" in [l for l in data.splitlines() if "MFP" in l][0]:
902         raise Exception("Kernel STA entry had MFP enabled")
903 
904     dev[0].set_network(id, "ieee80211w", "1")
905     dev[0].request("REASSOCIATE")
906     dev[0].wait_connected()
907     wt.require_sta_pmf(bssid, addr)
908     sta = hapd.get_sta(addr)
909     if '[MFP]' not in sta['flags']:
910         raise Exception("MFP flag not present for STA")
911     err, data = hapd.cmd_execute(['iw', 'dev', apdev[0]['ifname'], 'station',
912                                   'get', addr])
913     if "yes" not in [l for l in data.splitlines() if "MFP" in l][0]:
914         raise Exception("Kernel STA entry did not have MFP enabled")
915 
916 @remote_compatible
917 def test_ap_pmf_required_sta_no_pmf(dev, apdev):
918     """WPA2-PSK AP with PMF required and PMF disabled on STA"""
919     ssid = "test-pmf-required"
920     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
921     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
922     params["ieee80211w"] = "2"
923     hapd = hostapd.add_ap(apdev[0], params)
924 
925     # Disable PMF on the station and try to connect
926     dev[0].connect(ssid, psk="12345678", ieee80211w="0",
927                    key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
928                    scan_freq="2412", wait_connect=False)
929     ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND",
930                             "CTRL-EVENT-ASSOC-REJECT"], timeout=2)
931     if ev is None:
932         raise Exception("No connection result")
933     if "CTRL-EVENT-ASSOC-REJECT" in ev:
934         raise Exception("Tried to connect to PMF required AP without PMF enabled")
935     dev[0].request("REMOVE_NETWORK all")
936 
937 def test_ap_pmf_inject_auth(dev, apdev):
938     """WPA2-PSK AP with PMF and Authentication frame injection"""
939     ssid = "test-pmf"
940     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
941     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
942     params["ieee80211w"] = "2"
943     hapd = hostapd.add_ap(apdev[0], params)
944     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
945                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
946                    scan_freq="2412")
947     hapd.wait_sta()
948     hwsim_utils.test_connectivity(dev[0], hapd)
949 
950     bssid = hapd.own_addr().replace(':', '')
951     addr = dev[0].own_addr().replace(':', '')
952 
953     # Inject an unprotected Authentication frame claiming to be from the
954     # associated STA, from another STA, from the AP's own address, from all
955     # zeros and all ones addresses, and from a multicast address.
956     hapd.request("SET ext_mgmt_frame_handling 1")
957     failed = False
958     addresses = [ addr, "021122334455", bssid, 6*"00", 6*"ff", 6*"01" ]
959     for a in addresses:
960         auth = "b0003a01" + bssid + a + bssid + '1000000001000000'
961         res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth)
962         if "OK" not in res:
963             failed = True
964     hapd.request("SET ext_mgmt_frame_handling 0")
965     if failed:
966         raise Exception("MGMT_RX_PROCESS failed")
967     time.sleep(0.1)
968 
969     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
970     if ev:
971         raise Exception("Unexpected disconnection reported on the STA")
972 
973     # Verify that original association is still functional.
974     hwsim_utils.test_connectivity(dev[0], hapd)
975 
976     # Inject an unprotected Association Request frame (with and without RSNE)
977     # claiming to be from the set of test addresses.
978     hapd.request("SET ext_mgmt_frame_handling 1")
979     for a in addresses:
980         assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '301a0100000fac040100000fac040100000fac06c0000000000fac06'
981         res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
982         if "OK" not in res:
983             failed = True
984 
985         assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '3000'
986         res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
987         if "OK" not in res:
988             failed = True
989 
990         assoc = "00003a01" + bssid + a + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824'
991         res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc)
992         if "OK" not in res:
993             failed = True
994     hapd.request("SET ext_mgmt_frame_handling 0")
995     if failed:
996         raise Exception("MGMT_RX_PROCESS failed")
997     time.sleep(5)
998 
999     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
1000     if ev:
1001         raise Exception("Unexpected disconnection reported on the STA")
1002 
1003     # Verify that original association is still functional.
1004     hwsim_utils.test_connectivity(dev[0], hapd)
1005 
1006 def test_ap_pmf_inject_assoc(dev, apdev):
1007     """WPA2-PSK with PMF and Association Request frame injection"""
1008     run_ap_pmf_inject_assoc(dev, apdev, False)
1009 
1010 def test_ap_pmf_inject_assoc_wps(dev, apdev):
1011     """WPA2-PSK/WPS with PMF and Association Request frame injection"""
1012     run_ap_pmf_inject_assoc(dev, apdev, True)
1013 
1014 def inject_assoc_req(hapd, addr, frame):
1015     hapd.set("ext_mgmt_frame_handling", "1")
1016     res = hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % frame)
1017     if "OK" not in res:
1018         raise Exception("MGMT_RX_PROCESS failed")
1019     hapd.set("ext_mgmt_frame_handling", "0")
1020     sta = hapd.get_sta(addr)
1021     if "[MFP]" not in sta['flags']:
1022         raise Exception("MFP flag removed")
1023     if sta["AKMSuiteSelector"] != '00-0f-ac-6':
1024         raise Exception("AKMSuiteSelector value changed")
1025 
1026 def run_ap_pmf_inject_assoc(dev, apdev, wps):
1027     ssid = "test-pmf"
1028     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1029     params["wpa_key_mgmt"] = "WPA-PSK WPA-PSK-SHA256"
1030     params["ieee80211w"] = "1"
1031     if wps:
1032         params["eap_server"] = "1"
1033         params["wps_state"] = "2"
1034 
1035     hapd = hostapd.add_ap(apdev[0], params)
1036     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1037                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1038                    scan_freq="2412")
1039     hapd.wait_sta()
1040     sta = hapd.get_sta(dev[0].own_addr())
1041     if "[MFP]" not in sta['flags']:
1042         raise Exception("MFP flag not reported")
1043     if sta["AKMSuiteSelector"] != '00-0f-ac-6':
1044         raise Exception("Incorrect AKMSuiteSelector value")
1045 
1046     bssid = hapd.own_addr().replace(':', '')
1047     addr = dev[0].own_addr().replace(':', '')
1048 
1049     # Inject unprotected Association Request frames
1050     assoc1 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac020000'
1051     assoc2 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824' + '30140100000fac040100000fac040100000fac060000'
1052     assoc3 = "00003a01" + bssid + addr + bssid + '2000' + '31040500' + '0008746573742d706d66' + '010802040b160c121824'
1053 
1054     inject_assoc_req(hapd, dev[0].own_addr(), assoc1)
1055     time.sleep(0.1)
1056     inject_assoc_req(hapd, dev[0].own_addr(), assoc1)
1057     time.sleep(0.1)
1058     inject_assoc_req(hapd, dev[0].own_addr(), assoc2)
1059     time.sleep(0.1)
1060     inject_assoc_req(hapd, dev[0].own_addr(), assoc3)
1061     time.sleep(0.1)
1062     inject_assoc_req(hapd, dev[0].own_addr(), assoc2)
1063 
1064     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=5.1)
1065     if ev:
1066         raise Exception("Unexpected disconnection reported on the STA")
1067     ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1068     if ev:
1069         raise Exception("Unexpected disconnection event received from hostapd")
1070 
1071     # Verify that original association is still functional.
1072     hwsim_utils.test_connectivity(dev[0], hapd)
1073 
1074 def test_ap_pmf_inject_data(dev, apdev):
1075     """WPA2-PSK AP with PMF and Data frame injection"""
1076     try:
1077         run_ap_pmf_inject_data(dev, apdev)
1078     finally:
1079         stop_monitor(apdev[1]["ifname"])
1080 
1081 def run_ap_pmf_inject_data(dev, apdev):
1082     ssid = "test-pmf"
1083     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1084     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1085     params["ieee80211w"] = "2"
1086     hapd = hostapd.add_ap(apdev[0], params)
1087     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1088                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1089                    scan_freq="2412")
1090     hapd.wait_sta()
1091     hwsim_utils.test_connectivity(dev[0], hapd)
1092 
1093     sock = start_monitor(apdev[1]["ifname"])
1094     radiotap = radiotap_build()
1095 
1096     bssid = hapd.own_addr().replace(':', '')
1097     addr = dev[0].own_addr().replace(':', '')
1098 
1099     # Inject Data frame with A2=broadcast, A2=multicast, A2=BSSID, A2=STA, and
1100     # A2=unknown unicast
1101     addresses = [ 6*"ff", 6*"01", bssid, addr, "020102030405" ]
1102     for a in addresses:
1103         frame = binascii.unhexlify("48010000" + bssid + a + bssid + "0000")
1104         sock.send(radiotap + frame)
1105 
1106     time.sleep(0.1)
1107     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.1)
1108     if ev:
1109         raise Exception("Unexpected disconnection reported on the STA")
1110     hwsim_utils.test_connectivity(dev[0], hapd)
1111 
1112 def test_ap_pmf_inject_msg1(dev, apdev):
1113     """WPA2-PSK AP with PMF and EAPOL-Key msg 1/4 injection"""
1114     try:
1115         run_ap_pmf_inject_msg1(dev, apdev)
1116     finally:
1117         stop_monitor(apdev[1]["ifname"])
1118 
1119 def test_ap_pmf_inject_msg1_no_pmf(dev, apdev):
1120     """WPA2-PSK AP without PMF and EAPOL-Key msg 1/4 injection"""
1121     try:
1122         run_ap_pmf_inject_msg1(dev, apdev, pmf=False)
1123     finally:
1124         stop_monitor(apdev[1]["ifname"])
1125 
1126 def run_ap_pmf_inject_msg1(dev, apdev, pmf=True):
1127     ssid = "test-pmf"
1128     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1129     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1130     if pmf:
1131         params["ieee80211w"] = "2"
1132     hapd = hostapd.add_ap(apdev[0], params)
1133     dev[0].connect(ssid, psk="12345678", ieee80211w="2" if pmf else "0",
1134                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1135                    scan_freq="2412")
1136     hapd.wait_sta()
1137 
1138     sock = start_monitor(apdev[1]["ifname"])
1139     radiotap = radiotap_build()
1140 
1141     bssid = hapd.own_addr().replace(':', '')
1142     addr = dev[0].own_addr().replace(':', '')
1143 
1144     # Inject unprotected EAPOL-Key msg 1/4 with an invalid KDE
1145     f = "88020000" + addr + bssid + bssid + "0000" + "0700"
1146     f += "aaaa03000000" + "888e"
1147     f += "0203006602008b00100000000000000005bcb714da6f98f817b88948485c26ef052922b795814819f1889ae01e11b486910000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000007" + "dd33000fac0400"
1148     frame = binascii.unhexlify(f)
1149     sock.send(radiotap + frame)
1150 
1151     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.5)
1152     if ev:
1153         raise Exception("Unexpected disconnection reported on the STA")
1154     hwsim_utils.test_connectivity(dev[0], hapd)
1155     state = dev[0].get_status_field("wpa_state")
1156     if state != "COMPLETED":
1157         raise Exception("Unexpected wpa_state: " + state)
1158 
1159 def test_ap_pmf_inject_eap(dev, apdev):
1160     """WPA2-EAP AP with PMF and EAP frame injection"""
1161     try:
1162         run_ap_pmf_inject_eap(dev, apdev)
1163     finally:
1164         stop_monitor(apdev[1]["ifname"])
1165 
1166 def run_ap_pmf_inject_eap(dev, apdev, pmf=True):
1167     ssid = "test-pmf-eap"
1168     params = hostapd.wpa2_eap_params(ssid=ssid)
1169     params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
1170     params["ieee80211w"] = "2"
1171     hapd = hostapd.add_ap(apdev[0], params)
1172     dev[0].connect(ssid, key_mgmt="WPA-EAP-SHA256",
1173                    ieee80211w="2", eap="PSK", identity="psk.user@example.com",
1174                    password_hex="0123456789abcdef0123456789abcdef",
1175                    scan_freq="2412")
1176     hapd.wait_sta()
1177     dev[0].dump_monitor()
1178     hapd.dump_monitor()
1179 
1180     sock = start_monitor(apdev[1]["ifname"])
1181     radiotap = radiotap_build()
1182 
1183     bssid = hapd.own_addr().replace(':', '')
1184     addr = dev[0].own_addr().replace(':', '')
1185 
1186     disconnected = False
1187     eap_start = False
1188     eap_failure = False
1189     ap_disconnected = False
1190 
1191     # Inject various unexpected unprotected EAPOL frames to the STA
1192     f = "88020000" + addr + bssid + bssid + "0000" + "0700"
1193     f += "aaaa03000000" + "888e"
1194     tests = []
1195     for i in range(101):
1196         tests += [ "02000005012d000501" ] # EAP-Request/Identity
1197     for i in range(101):
1198         tests += [ "02000022012e00222f00862406a9b45782fee8a62e837457d1367365727665722e77312e6669" ] # EAP-Request/PSK
1199     tests += [ "0200000404780004" ] # EAP-Failure
1200     tests += [ "0200000403780004" ] # EAP-Success
1201     tests += [ "02000006057800060100" ] # EAP-Initiate
1202     tests += [ "0200000406780004" ] # EAP-Finish
1203     tests += [ "0200000400780004" ] # EAP-?
1204     tests += [ "02020000" ] # EAPOL-Logoff
1205     tests += [ "02010000" ] # EAPOL-Start
1206     for t in tests:
1207         dev[0].note("Inject " + t)
1208         frame = binascii.unhexlify(f + t)
1209         sock.send(radiotap + frame)
1210         for i in range(2):
1211             ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED",
1212                                     "CTRL-EVENT-EAP-STARTED",
1213                                     "CTRL-EVENT-EAP-FAILURE"],
1214                                    timeout=0.0001)
1215             if ev is None:
1216                 break
1217             if "CTRL-EVENT-DISCONNECTED" in ev:
1218                 disconnected = True
1219             if "CTRL-EVENT-EAP-START" in ev:
1220                 eap_start = True
1221             if "CTRL-EVENT-EAP-FAILURE" in ev:
1222                 eap_failure = True
1223         dev[0].dump_monitor(mon=False)
1224     dev[0].dump_monitor()
1225     ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1226     if ev:
1227         ap_disconnected = True
1228     if disconnected or eap_start or eap_failure or ap_disconnected:
1229         raise Exception("Unexpected event:%s%s%s%s" %
1230                         (" disconnected" if disconnected else "",
1231                          " eap_start" if eap_start else "",
1232                          " eap_failure" if eap_failure else "",
1233                          " ap_disconnected" if ap_disconnected else ""))
1234     hwsim_utils.test_connectivity(dev[0], hapd)
1235     state = dev[0].get_status_field("wpa_state")
1236     if state != "COMPLETED":
1237         raise Exception("Unexpected wpa_state: " + state)
1238 
1239     dev[0].dump_monitor()
1240     hapd.dump_monitor()
1241 
1242     # Inject various unexpected unprotected EAPOL frames to the AP
1243     f = "88010000" + bssid + addr + bssid + "0000" + "0700"
1244     f += "aaaa03000000" + "888e"
1245     tests = []
1246     tests += [ "02020000" ] # EAPOL-Logoff
1247     for i in range(10):
1248         tests += [ "02010000" ] # EAPOL-Start
1249     for t in tests:
1250         hapd.note("Inject " + t)
1251         frame = binascii.unhexlify(f + t)
1252         sock.send(radiotap + frame)
1253         for i in range(2):
1254             ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED",
1255                                     "CTRL-EVENT-EAP-STARTED",
1256                                     "CTRL-EVENT-EAP-FAILURE"],
1257                                    timeout=0.0001)
1258             if ev is None:
1259                 break
1260             if "CTRL-EVENT-DISCONNECTED" in ev:
1261                 disconnected = True
1262             if "CTRL-EVENT-EAP-START" in ev:
1263                 eap_start = True
1264             if "CTRL-EVENT-EAP-FAILURE" in ev:
1265                 eap_failure = True
1266         dev[0].dump_monitor(mon=False)
1267     dev[0].dump_monitor()
1268     ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=0.1)
1269     if ev:
1270         ap_disconnected = True
1271     hapd.dump_monitor()
1272     if disconnected or eap_start or eap_failure or ap_disconnected:
1273         raise Exception("Unexpected event(2):%s%s%s%s" %
1274                         (" disconnected" if disconnected else "",
1275                          " eap_start" if eap_start else "",
1276                          " eap_failure" if eap_failure else "",
1277                          " ap_disconnected" if ap_disconnected else ""))
1278     hwsim_utils.test_connectivity(dev[0], hapd)
1279     state = dev[0].get_status_field("wpa_state")
1280     if state != "COMPLETED":
1281         raise Exception("Unexpected wpa_state(2): " + state)
1282 
1283 def test_ap_pmf_tkip_reject(dev, apdev):
1284     """Mixed mode BSS and MFP-enabled AP rejecting TKIP"""
1285     skip_without_tkip(dev[0])
1286     params = hostapd.wpa2_params(ssid="test-pmf", passphrase="12345678")
1287     params['wpa'] = '3'
1288     params["ieee80211w"] = "1"
1289     params["wpa_pairwise"] = "TKIP CCMP"
1290     params["rsn_pairwise"] = "TKIP CCMP"
1291     hostapd.add_ap(apdev[0], params)
1292 
1293     dev[0].connect("test-pmf", psk="12345678", pairwise="CCMP", ieee80211w="2",
1294                    scan_freq="2412")
1295     dev[0].dump_monitor()
1296 
1297     dev[1].connect("test-pmf", psk="12345678", proto="WPA", pairwise="TKIP",
1298                    ieee80211w="0", scan_freq="2412")
1299     dev[1].dump_monitor()
1300 
1301     dev[2].connect("test-pmf", psk="12345678", pairwise="TKIP",
1302                    ieee80211w="2", scan_freq="2412", wait_connect=False)
1303     ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED",
1304                             "CTRL-EVENT-ASSOC-REJECT"], timeout=10)
1305     if ev is None:
1306         raise Exception("No connection result reported")
1307     if "CTRL-EVENT-ASSOC-REJECT" not in ev:
1308         raise Exception("MFP + TKIP connection was not rejected")
1309     if "status_code=31" not in ev:
1310         raise Exception("Unexpected status code in rejection: " + ev)
1311     dev[2].request("DISCONNECT")
1312     dev[2].dump_monitor()
1313 
1314 def test_ap_pmf_sa_query_timeout(dev, apdev):
1315     """SA Query timeout"""
1316     ssid = "test-pmf-required"
1317     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1318     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1319     params["ieee80211w"] = "2"
1320     hapd = hostapd.add_ap(apdev[0], params)
1321     dev[0].connect(ssid, psk="12345678", ieee80211w="2",
1322                    key_mgmt="WPA-PSK-SHA256", proto="WPA2",
1323                    scan_freq="2412")
1324 
1325     hapd.set("ext_mgmt_frame_handling", "1")
1326     if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1327         raise Exception("Triggering SA Query from the STA failed")
1328     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=2)
1329     if ev is None:
1330         raise Exception("No disconnection on SA Query timeout seen")
1331     hapd.set("ext_mgmt_frame_handling", "0")
1332     dev[0].wait_connected()
1333     dev[0].dump_monitor()
1334 
1335     hapd.set("ext_mgmt_frame_handling", "1")
1336     if "OK" not in dev[0].request("UNPROT_DEAUTH"):
1337         raise Exception("Triggering SA Query from the STA failed")
1338     ev = hapd.mgmt_rx()
1339     hapd.set("ext_mgmt_frame_handling", "0")
1340     dev[0].request("DISCONNECT")
1341     dev[0].wait_disconnected()
1342     dev[0].request("RECONNECT")
1343     dev[0].wait_connected()
1344     hapd.set("ext_mgmt_frame_handling", "1")
1345     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=1.5)
1346     if ev is not None:
1347         raise Exception("Unexpected disconnection after reconnection seen")
1348 
1349 def mac80211_read_key(keydir):
1350     vals = {}
1351     for name in os.listdir(keydir):
1352         try:
1353             with open(os.path.join(keydir, name)) as f:
1354                 vals[name] = f.read().strip()
1355         except OSError as e:
1356             pass
1357     return vals
1358 
1359 def check_mac80211_bigtk(dev, hapd):
1360     sta_key = None
1361     ap_key = None
1362 
1363     phy = dev.get_driver_status_field("phyname")
1364     keys = "/sys/kernel/debug/ieee80211/%s/keys" % phy
1365     try:
1366         for key in os.listdir(keys):
1367             keydir = os.path.join(keys, key)
1368             vals = mac80211_read_key(keydir)
1369             keyidx = int(vals['keyidx'])
1370             if keyidx == 6 or keyidx == 7:
1371                 sta_key = vals;
1372                 break
1373     except OSError as e:
1374         raise HwsimSkip("debugfs not supported in mac80211 (STA)")
1375 
1376     phy = hapd.get_driver_status_field("phyname")
1377     keys = "/sys/kernel/debug/ieee80211/%s/keys" % phy
1378     try:
1379         for key in os.listdir(keys):
1380             keydir = os.path.join(keys, key)
1381             vals = mac80211_read_key(keydir)
1382             keyidx = int(vals['keyidx'])
1383             if keyidx == 6 or keyidx == 7:
1384                 ap_key = vals;
1385                 break
1386     except OSError as e:
1387         raise HwsimSkip("debugfs not supported in mac80211 (AP)")
1388 
1389     if not sta_key:
1390         raise Exception("Could not find STA key information from debugfs")
1391     logger.info("STA key: " + str(sta_key))
1392 
1393     if not ap_key:
1394         raise Exception("Could not find AP key information from debugfs")
1395     logger.info("AP key: " + str(ap_key))
1396 
1397     if sta_key['key'] != ap_key['key']:
1398         raise Exception("AP and STA BIGTK mismatch")
1399 
1400     if sta_key['keyidx'] != ap_key['keyidx']:
1401         raise Exception("AP and STA BIGTK keyidx mismatch")
1402 
1403     if sta_key['algorithm'] != ap_key['algorithm']:
1404         raise Exception("AP and STA BIGTK algorithm mismatch")
1405 
1406     replays = int(sta_key['replays'])
1407     icverrors = int(sta_key['icverrors'])
1408     if replays > 0 or icverrors > 0:
1409         raise Exception("STA reported errors: replays=%d icverrors=%d" % replays, icverrors)
1410 
1411     rx_spec = int(sta_key['rx_spec'], base=16)
1412     if rx_spec < 3:
1413         raise Exception("STA did not update BIGTK receive counter sufficiently")
1414 
1415     tx_spec = int(ap_key['tx_spec'], base=16)
1416     if tx_spec < 3:
1417         raise Exception("AP did not update BIGTK BIPN sufficiently")
1418 
1419 def test_ap_pmf_beacon_protection_bip(dev, apdev):
1420     """WPA2-PSK Beacon protection (BIP)"""
1421     run_ap_pmf_beacon_protection(dev, apdev, "AES-128-CMAC")
1422 
1423 def test_ap_pmf_beacon_protection_bip_cmac_256(dev, apdev):
1424     """WPA2-PSK Beacon protection (BIP-CMAC-256)"""
1425     run_ap_pmf_beacon_protection(dev, apdev, "BIP-CMAC-256")
1426 
1427 def test_ap_pmf_beacon_protection_bip_gmac_128(dev, apdev):
1428     """WPA2-PSK Beacon protection (BIP-GMAC-128)"""
1429     run_ap_pmf_beacon_protection(dev, apdev, "BIP-GMAC-128")
1430 
1431 def test_ap_pmf_beacon_protection_bip_gmac_256(dev, apdev):
1432     """WPA2-PSK Beacon protection (BIP-GMAC-256)"""
1433     run_ap_pmf_beacon_protection(dev, apdev, "BIP-GMAC-256")
1434 
1435 def run_ap_pmf_beacon_protection(dev, apdev, cipher):
1436     ssid = "test-beacon-prot"
1437     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1438     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1439     params["ieee80211w"] = "2"
1440     params["beacon_prot"] = "1"
1441     params["group_mgmt_cipher"] = cipher
1442     try:
1443         hapd = hostapd.add_ap(apdev[0], params)
1444     except Exception as e:
1445         if "Failed to enable hostapd interface" in str(e):
1446             raise HwsimSkip("Beacon protection not supported")
1447         raise
1448 
1449     bssid = hapd.own_addr()
1450 
1451     Wlantest.setup(hapd)
1452     wt = Wlantest()
1453     wt.flush()
1454     wt.add_passphrase("12345678")
1455 
1456     dev[0].flush_scan_cache()
1457 
1458     # STA with Beacon protection enabled
1459     dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1460                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1461     if dev[0].get_status_field("bigtk_set") != "1":
1462         raise Exception("bigtk_set=1 not indicated")
1463 
1464     # STA with Beacon protection disabled
1465     dev[1].connect(ssid, psk="12345678", ieee80211w="2",
1466                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1467     if dev[1].get_status_field("bigtk_set") == "1":
1468         raise Exception("Unexpected bigtk_set=1 indication")
1469 
1470     time.sleep(1)
1471     check_mac80211_bigtk(dev[0], hapd)
1472 
1473     valid_bip = wt.get_bss_counter('valid_bip_mmie', bssid)
1474     invalid_bip = wt.get_bss_counter('invalid_bip_mmie', bssid)
1475     missing_bip = wt.get_bss_counter('missing_bip_mmie', bssid)
1476     logger.info("wlantest BIP counters: valid=%d invalid=%d missing=%d" % (valid_bip, invalid_bip, missing_bip))
1477     if valid_bip < 0 or invalid_bip > 0 or missing_bip > 0:
1478         raise Exception("Unexpected wlantest BIP counters: valid=%d invalid=%d missing=%d" % (valid_bip, invalid_bip, missing_bip))
1479 
1480     ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=10)
1481     if ev is not None:
1482         raise Exception("Beacon loss detected")
1483 
1484     # Verify that the SSID has been successfully verified from a protected
1485     # Beacon frame.
1486     if dev[0].get_status_field("ssid_verified") != "1":
1487         raise Exception("ssid_verified=1 not in STATUS")
1488 
1489 def test_ap_pmf_beacon_protection_mismatch(dev, apdev):
1490     """WPA2-PSK Beacon protection MIC mismatch"""
1491     run_ap_pmf_beacon_protection_mismatch(dev, apdev, False)
1492 
1493 def test_ap_pmf_beacon_protection_missing(dev, apdev):
1494     """WPA2-PSK Beacon protection MME missing"""
1495     run_ap_pmf_beacon_protection_mismatch(dev, apdev, True)
1496 
1497 def run_ap_pmf_beacon_protection_mismatch(dev, apdev, clear):
1498     ssid = "test-beacon-prot"
1499     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1500     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1501     params["ieee80211w"] = "2"
1502     params["beacon_prot"] = "1"
1503     params["group_mgmt_cipher"] = "AES-128-CMAC"
1504     try:
1505         hapd = hostapd.add_ap(apdev[0], params)
1506     except Exception as e:
1507         if "Failed to enable hostapd interface" in str(e):
1508             raise HwsimSkip("Beacon protection not supported")
1509         raise
1510 
1511     bssid = hapd.own_addr()
1512 
1513     Wlantest.setup(hapd)
1514     wt = Wlantest()
1515     wt.flush()
1516     wt.add_passphrase("12345678")
1517 
1518     dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1519                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1520 
1521     WPA_ALG_NONE = 0
1522     WPA_ALG_IGTK = 4
1523     KEY_FLAG_DEFAULT = 0x02
1524     KEY_FLAG_TX = 0x08
1525     KEY_FLAG_GROUP = 0x10
1526     KEY_FLAG_GROUP_TX_DEFAULT = KEY_FLAG_GROUP | KEY_FLAG_TX | KEY_FLAG_DEFAULT
1527 
1528     addr = "ff:ff:ff:ff:ff:ff"
1529 
1530     if clear:
1531         res = hapd.request("SET_KEY %d %s %d %d %s %s %d" % (WPA_ALG_NONE, addr, 6, 1, 6*"00", "", KEY_FLAG_GROUP))
1532     else:
1533         res = hapd.request("SET_KEY %d %s %d %d %s %s %d" % (WPA_ALG_IGTK, addr, 6, 1, 6*"00", 16*"00", KEY_FLAG_GROUP_TX_DEFAULT))
1534     if "OK" not in res:
1535         raise Exception("SET_KEY failed")
1536 
1537     ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON"], timeout=5)
1538     if ev is None:
1539         raise Exception("Unprotected Beacon frame not reported")
1540 
1541     ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
1542     if ev is None:
1543         raise Exception("Beacon loss not reported")
1544 
1545     ev = hapd.wait_event(["CTRL-EVENT-UNPROT-BEACON"], timeout=5)
1546     if ev is None:
1547         raise Exception("WNM-Notification Request frame not reported")
1548 
1549 def test_ap_pmf_beacon_protection_reconnect(dev, apdev):
1550     """Beacon protection and reconnection"""
1551     ssid = "test-beacon-prot"
1552     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1553     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1554     params["ieee80211w"] = "2"
1555     params["beacon_prot"] = "1"
1556     params["group_mgmt_cipher"] = "AES-128-CMAC"
1557     try:
1558         hapd = hostapd.add_ap(apdev[0], params)
1559     except Exception as e:
1560         if "Failed to enable hostapd interface" in str(e):
1561             raise HwsimSkip("Beacon protection not supported")
1562         raise
1563 
1564     dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1565                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1566     dev[0].request("DISCONNECT")
1567     dev[0].wait_disconnected()
1568     dev[0].request("RECONNECT")
1569     dev[0].wait_connected()
1570     time.sleep(1)
1571     check_mac80211_bigtk(dev[0], hapd)
1572     ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
1573     if ev is not None:
1574         raise Exception("Beacon loss detected")
1575 
1576 def test_ap_pmf_beacon_protection_unicast(dev, apdev):
1577     """WPA2-PSK Beacon protection (BIP) and unicast Beacon frame"""
1578     try:
1579         run_ap_pmf_beacon_protection_unicast(dev, apdev)
1580     finally:
1581         stop_monitor(apdev[1]["ifname"])
1582 
1583 def run_ap_pmf_beacon_protection_unicast(dev, apdev):
1584     cipher = "AES-128-CMAC"
1585     ssid = "test-beacon-prot"
1586     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1587     params["wpa_key_mgmt"] = "WPA-PSK-SHA256"
1588     params["ieee80211w"] = "2"
1589     params["beacon_prot"] = "1"
1590     params["group_mgmt_cipher"] = cipher
1591     try:
1592         hapd = hostapd.add_ap(apdev[0], params)
1593     except Exception as e:
1594         if "Failed to enable hostapd interface" in str(e):
1595             raise HwsimSkip("Beacon protection not supported")
1596         raise
1597 
1598     bssid = hapd.own_addr()
1599 
1600     Wlantest.setup(hapd)
1601     wt = Wlantest()
1602     wt.flush()
1603     wt.add_passphrase("12345678")
1604 
1605     # STA with Beacon protection enabled
1606     dev[0].connect(ssid, psk="12345678", ieee80211w="2", beacon_prot="1",
1607                    key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
1608     hapd.wait_sta()
1609 
1610     sock = start_monitor(apdev[1]["ifname"])
1611     radiotap = radiotap_build()
1612 
1613     bssid = hapd.own_addr().replace(':', '')
1614     addr = dev[0].own_addr().replace(':', '')
1615 
1616     h = "80000000" + addr + bssid + bssid + "0000"
1617     h += "c0a0260d27090600"+ "6400" + "1104"
1618     h += "0010746573742d626561636f6e2d70726f74"
1619     h += "010882848b960c121824"
1620     h += "03010"
1621     h += "1050400020000"
1622     h += "2a0104"
1623     h += "32043048606c"
1624     h += "30140100000fac040100000fac040100000fac06cc00"
1625     h += "3b025100"
1626     h += "2d1a0c001bffff000000000000000000000100000000000000000000"
1627     h += "3d1601000000000000000000000000000000000000000000"
1628     h += "7f0b0400000200000040000010"
1629     h += "2503000b01" # CSA
1630     h += "3c0400510b01" # ECSA
1631     h += "dd180050f2020101010003a4000027a4000042435e0062322f00"
1632 
1633     frame = binascii.unhexlify(h)
1634     h += "4c1006002100000000002b8fab24bcef3bb1" #MME
1635     frame2 = binascii.unhexlify(h)
1636 
1637     sock.send(radiotap + frame)
1638     ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON",
1639                             "CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=5)
1640     if ev:
1641         if "CTRL-EVENT-STARTED-CHANNEL-SWITCH" in ev:
1642             raise Exception("Unexpected channel switch reported")
1643         if hapd.own_addr() not in ev:
1644             raise Exception("Unexpected BSSID in unprotected beacon indication")
1645 
1646     time.sleep(10.1)
1647     sock.send(radiotap + frame2)
1648     ev = dev[0].wait_event(["CTRL-EVENT-UNPROT-BEACON",
1649                             "CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=5)
1650     if ev:
1651         if "CTRL-EVENT-STARTED-CHANNEL-SWITCH" in ev:
1652             raise Exception("Unexpected channel switch reported")
1653         if hapd.own_addr() not in ev:
1654             raise Exception("Unexpected BSSID in unprotected beacon indication")
1655 
1656 def test_ap_pmf_sta_global_require(dev, apdev):
1657     """WPA2-PSK AP with PMF optional and wpa_supplicant pmf=2"""
1658     ssid = "test-pmf-optional"
1659     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1660     params["wpa_key_mgmt"] = "WPA-PSK"
1661     params["ieee80211w"] = "1"
1662     hapd = hostapd.add_ap(apdev[0], params)
1663     try:
1664         dev[0].set("pmf", "2")
1665         dev[0].connect(ssid, psk="12345678",
1666                        key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
1667                        scan_freq="2412")
1668         pmf = dev[0].get_status_field("pmf")
1669         if pmf != "1":
1670             raise Exception("Unexpected PMF state: " + str(pmf))
1671     finally:
1672         dev[0].set("pmf", "0")
1673 
1674 def test_ap_pmf_sta_global_require2(dev, apdev):
1675     """WPA2-PSK AP with PMF optional and wpa_supplicant pmf=2 (2)"""
1676     ssid = "test-pmf-optional"
1677     params = hostapd.wpa2_params(ssid=ssid, passphrase="12345678")
1678     params["wpa_key_mgmt"] = "WPA-PSK"
1679     params["ieee80211w"] = "0"
1680     hapd = hostapd.add_ap(apdev[0], params)
1681     bssid = hapd.own_addr()
1682     try:
1683         dev[0].scan_for_bss(bssid, freq=2412)
1684         dev[0].set("pmf", "2")
1685         dev[0].connect(ssid, psk="12345678",
1686                        key_mgmt="WPA-PSK WPA-PSK-SHA256", proto="WPA2",
1687                        scan_freq="2412", wait_connect=False)
1688         ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED",
1689                                 "CTRL-EVENT-NETWORK-NOT-FOUND"], timeout=10)
1690         if ev is None:
1691             raise Exception("Connection result not reported")
1692         if "CTRL-EVENT-CONNECTED" in ev:
1693             raise Exception("Unexpected connection")
1694     finally:
1695         dev[0].set("pmf", "0")
1696 
1697 def test_ap_pmf_drop_robust_mgmt_prior_to_keys_installation(dev, apdev):
1698     """Drop non protected Robust Action frames prior to keys installation"""
1699     ssid = "test-pmf-required"
1700     passphrase = '12345678'
1701     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1702     params['delay_eapol_tx'] = '1'
1703     params['ieee80211w'] = '2'
1704     params['wpa_pairwise_update_count'] = '5'
1705     hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
1706 
1707     # Spectrum management with Channel Switch element
1708     msg = {'fc': 0x00d0,
1709            'sa': hapd.own_addr(),
1710            'da': dev[0].own_addr(),
1711            'bssid': hapd.own_addr(),
1712            'payload': binascii.unhexlify('00042503000608')
1713            }
1714 
1715     dev[0].connect(ssid, psk=passphrase, scan_freq='2412', ieee80211w='1',
1716                    wait_connect=False)
1717 
1718     # wait for the first delay before sending the frame
1719     ev = hapd.wait_event(['DELAY-EAPOL-TX-1'], timeout=10)
1720     if ev is None:
1721         raise Exception("EAPOL is not delayed")
1722 
1723     # send the Action frame while connecting (prior to keys installation)
1724     hapd.mgmt_tx(msg)
1725 
1726     dev[0].wait_connected(timeout=10, error="Timeout on connection")
1727     hapd.wait_sta()
1728     hwsim_utils.test_connectivity(dev[0], hapd)
1729 
1730     # Verify no channel switch event
1731     ev = dev[0].wait_event(['CTRL-EVENT-STARTED-CHANNEL-SWITCH'], timeout=5)
1732     if ev is not None:
1733         raise Exception("Unexpected CSA prior to keys installation")
1734 
1735     # Send the frame after keys installation and verify channel switch event
1736     hapd.mgmt_tx(msg)
1737     ev = dev[0].wait_event(['CTRL-EVENT-STARTED-CHANNEL-SWITCH'], timeout=5)
1738     if ev is None:
1739         raise Exception("Expected CSA handling after keys installation")
1740 
1741 def test_ap_pmf_eapol_logoff(dev, apdev):
1742     """WPA2-EAP AP with PMF required and EAPOL-Logoff"""
1743     ssid = "test-pmf-required-eap"
1744     params = hostapd.wpa2_eap_params(ssid=ssid)
1745     params["wpa_key_mgmt"] = "WPA-EAP-SHA256"
1746     params["ieee80211w"] = "2"
1747     hapd = hostapd.add_ap(apdev[0], params)
1748     hapd.request("SET ext_eapol_frame_io 1")
1749 
1750     dev[0].set("ext_eapol_frame_io", "1")
1751     dev[0].connect("test-pmf-required-eap", key_mgmt="WPA-EAP-SHA256",
1752                    ieee80211w="2", eap="PSK", identity="psk.user@example.com",
1753                    password_hex="0123456789abcdef0123456789abcdef",
1754                    scan_freq="2412", wait_connect=False)
1755 
1756     # EAP-Request/Identity
1757     proxy_msg(hapd, dev[0])
1758 
1759     # EAP-Response/Identity RX
1760     msg = rx_msg(dev[0])
1761     # EAPOL-Logoff TX (inject)
1762     tx_msg(dev[0], hapd, "02020000")
1763     # EAP-Response/Identity TX (proxy previously received)
1764     tx_msg(dev[0], hapd, msg)
1765 
1766     # Verify that the 10 ms timeout for deauthenticating STA after EAP-Failure
1767     # is not used in this sequence with the EAPOL-Logoff message before the
1768     # successful authentication.
1769     ev = dev[0].wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=0.03)
1770     if ev:
1771         raise Exception("Unexpected disconnection")
1772 
1773     # EAP-Request/Identity
1774     proxy_msg(hapd, dev[0])
1775     # EAP-Response/Identity
1776     proxy_msg(dev[0], hapd)
1777 
1778     # EAP-PSK
1779     proxy_msg(hapd, dev[0])
1780     proxy_msg(dev[0], hapd)
1781     proxy_msg(hapd, dev[0])
1782     proxy_msg(dev[0], hapd)
1783     proxy_msg(hapd, dev[0])
1784 
1785     # 4-way handshake
1786     proxy_msg(hapd, dev[0])
1787     proxy_msg(dev[0], hapd)
1788     proxy_msg(hapd, dev[0])
1789     proxy_msg(dev[0], hapd)
1790 
1791     ev = hapd.wait_event(["EAPOL-4WAY-HS-COMPLETED"], timeout=1)
1792     if ev is None:
1793         raise Exception("4-way handshake did not complete successfully")
1794     dev[0].wait_connected(timeout=0.1)
1795     hapd.wait_sta()
1796 
1797     # Verify that the STA gets disconnected when the EAPOL-Logoff message is
1798     # sent after successful authentication.
1799 
1800     # EAPOL-Logoff TX (inject)
1801     tx_msg(dev[0], hapd, "02020000")
1802     hapd.request("SET ext_eapol_frame_io 0")
1803     dev[0].set("ext_eapol_frame_io", "0")
1804     ev = dev[0].wait_disconnected(timeout=1)
1805     if "reason=23" not in ev:
1806         raise Exception("Unexpected disconnection reason: " + ev)
1807