1 # HE tests
2 # Copyright (c) 2019, The Linux Foundation
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import logging
8 logger = logging.getLogger()
9 import os
10 import subprocess, time
11 
12 import hwsim_utils
13 import hostapd
14 from wpasupplicant import WpaSupplicant
15 from utils import *
16 from test_dfs import wait_dfs_event
17 from test_ap_acs import wait_acs
18 
19 def test_he_open(dev, apdev):
20     """HE AP with open mode configuration"""
21     params = {"ssid": "he",
22               "ieee80211ax": "1",
23               "he_bss_color": "42",
24               "he_mu_edca_ac_be_ecwmin": "7",
25               "he_mu_edca_ac_be_ecwmax": "15"}
26     hapd = hostapd.add_ap(apdev[0], params)
27     if hapd.get_status_field("ieee80211ax") != "1":
28         raise Exception("STATUS did not indicate ieee80211ax=1")
29     dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
30     sta = hapd.get_sta(dev[0].own_addr())
31     if "[HE]" not in sta['flags']:
32         raise Exception("Missing STA flag: HE")
33 
34 def test_he_disabled_on_sta(dev, apdev):
35     """HE AP and HE disabled on STA"""
36     params = {"ssid": "he",
37               "ieee80211ax": "1",
38               "he_bss_color": "42",
39               "he_mu_edca_ac_be_ecwmin": "7",
40               "he_mu_edca_ac_be_ecwmax": "15"}
41     hapd = hostapd.add_ap(apdev[0], params)
42     dev[0].connect("he", key_mgmt="NONE", scan_freq="2412", disable_he="1")
43     sta = hapd.get_sta(dev[0].own_addr())
44     if "[HE]" in sta['flags']:
45         raise Exception("Unexpected STA flag: HE")
46 
47 def test_he_params(dev, apdev):
48     """HE AP parameters"""
49     params = {"ssid": "he",
50               "ieee80211ax": "1",
51               "he_bss_color": "42",
52               "he_mu_edca_ac_be_ecwmin": "7",
53               "he_mu_edca_ac_be_ecwmax": "15",
54               "he_su_beamformer": "0",
55               "he_su_beamformee": "0",
56               "he_default_pe_duration": "4",
57               "he_twt_required": "1",
58               "he_rts_threshold": "64",
59               "he_basic_mcs_nss_set": "65535",
60               "he_mu_edca_qos_info_param_count": "0",
61               "he_mu_edca_qos_info_q_ack": "0",
62               "he_mu_edca_qos_info_queue_request": "1",
63               "he_mu_edca_qos_info_txop_request": "0",
64               "he_mu_edca_ac_be_aifsn": "0",
65               "he_mu_edca_ac_be_ecwmin": "15",
66               "he_mu_edca_ac_be_ecwmax": "15",
67               "he_mu_edca_ac_be_timer": "255",
68               "he_mu_edca_ac_bk_aifsn": "0",
69               "he_mu_edca_ac_bk_aci": "1",
70               "he_mu_edca_ac_bk_ecwmin": "15",
71               "he_mu_edca_ac_bk_ecwmax": "15",
72               "he_mu_edca_ac_bk_timer": "255",
73               "he_mu_edca_ac_vi_ecwmin": "15",
74               "he_mu_edca_ac_vi_ecwmax": "15",
75               "he_mu_edca_ac_vi_aifsn": "0",
76               "he_mu_edca_ac_vi_aci": "2",
77               "he_mu_edca_ac_vi_timer": "255",
78               "he_mu_edca_ac_vo_aifsn": "0",
79               "he_mu_edca_ac_vo_aci": "3",
80               "he_mu_edca_ac_vo_ecwmin": "15",
81               "he_mu_edca_ac_vo_ecwmax": "15",
82               "he_mu_edca_ac_vo_timer": "255",
83               "he_spr_sr_control": "0",
84               "he_spr_non_srg_obss_pd_max_offset": "0",
85               "he_spr_srg_obss_pd_min_offset": "0",
86               "he_spr_srg_obss_pd_max_offset": "0",
87               "he_spr_srg_bss_colors": "1 2 10 63",
88               "he_spr_srg_partial_bssid": "0 1 3 63",
89               "he_6ghz_max_ampdu_len_exp": "7",
90               "he_6ghz_rx_ant_pat": "1",
91               "he_6ghz_tx_ant_pat": "1",
92               "he_6ghz_max_mpdu": "2",
93               "he_oper_chwidth": "0",
94               "he_oper_centr_freq_seg0_idx": "1",
95               "he_oper_centr_freq_seg1_idx": "0"}
96     hapd = hostapd.add_ap(apdev[0], params)
97     if hapd.get_status_field("ieee80211ax") != "1":
98         raise Exception("STATUS did not indicate ieee80211ax=1")
99     dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
100 
101 def test_he_spr_params(dev, apdev):
102     """HE AP spatial reuse parameters"""
103     params = {"ssid": "he",
104               "ieee80211ax": "1",
105               "he_spr_sr_control": "12",
106               "he_spr_non_srg_obss_pd_max_offset": "1",
107               "he_spr_srg_obss_pd_min_offset": "2",
108               "he_spr_srg_obss_pd_max_offset": "3",
109               "he_spr_srg_bss_colors": "1 2 10 63",
110               "he_spr_srg_partial_bssid": "0 1 3 63",
111               "he_oper_chwidth": "0",
112               "he_oper_centr_freq_seg0_idx": "1",
113               "he_oper_centr_freq_seg1_idx": "0"}
114     hapd = hostapd.add_ap(apdev[0], params)
115     if hapd.get_status_field("ieee80211ax") != "1":
116         raise Exception("STATUS did not indicate ieee80211ax=1")
117     dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
118 
119 def he_supported():
120     cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
121     out, err = cmd.communicate()
122     reg = out.decode()
123     if "@ 80)" in reg or "@ 160)" in reg:
124         return True
125     return False
126 
127 def test_he80(dev, apdev):
128     """HE with 80 MHz channel width"""
129     try:
130         hapd = None
131         params = {"ssid": "he",
132                   "country_code": "FI",
133                   "hw_mode": "a",
134                   "channel": "36",
135                   "ht_capab": "[HT40+]",
136                   "ieee80211n": "1",
137                   "ieee80211ac": "1",
138                   "ieee80211ax": "1",
139                   "vht_oper_chwidth": "1",
140                   "vht_capab": "[MAX-MPDU-11454]",
141                   "vht_oper_centr_freq_seg0_idx": "42",
142                   "he_oper_chwidth": "1",
143                   "he_oper_centr_freq_seg0_idx": "42"}
144         hapd = hostapd.add_ap(apdev[0], params)
145         bssid = apdev[0]['bssid']
146 
147         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
148         hwsim_utils.test_connectivity(dev[0], hapd)
149         sig = dev[0].request("SIGNAL_POLL").splitlines()
150         if "FREQUENCY=5180" not in sig:
151             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
152         if "WIDTH=80 MHz" not in sig:
153             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
154         est = dev[0].get_bss(bssid)['est_throughput']
155         if est != "600502":
156             raise Exception("Unexpected BSS est_throughput: " + est)
157         status = dev[0].get_status()
158         if status["ieee80211ac"] != "1":
159             raise Exception("Unexpected STATUS ieee80211ac value (STA)")
160         status = hapd.get_status()
161         logger.info("hostapd STATUS: " + str(status))
162         if status["ieee80211n"] != "1":
163             raise Exception("Unexpected STATUS ieee80211n value")
164         if status["ieee80211ac"] != "1":
165             raise Exception("Unexpected STATUS ieee80211ac value")
166         if status["ieee80211ax"] != "1":
167             raise Exception("Unexpected STATUS ieee80211ax value")
168         if status["secondary_channel"] != "1":
169             raise Exception("Unexpected STATUS secondary_channel value")
170         if status["vht_oper_chwidth"] != "1":
171             raise Exception("Unexpected STATUS vht_oper_chwidth value")
172         if status["vht_oper_centr_freq_seg0_idx"] != "42":
173             raise Exception("Unexpected STATUS vht_oper_centr_freq_seg0_idx value")
174         if "vht_caps_info" not in status:
175             raise Exception("Missing vht_caps_info")
176         if status["he_oper_chwidth"] != "1":
177             raise Exception("Unexpected STATUS he_oper_chwidth value")
178         if status["he_oper_centr_freq_seg0_idx"] != "42":
179             raise Exception("Unexpected STATUS he_oper_centr_freq_seg0_idx value")
180 
181         sta = hapd.get_sta(dev[0].own_addr())
182         logger.info("hostapd STA: " + str(sta))
183         if "[HT]" not in sta['flags']:
184             raise Exception("Missing STA flag: HT")
185         if "[VHT]" not in sta['flags']:
186             raise Exception("Missing STA flag: VHT")
187         if "[HE]" not in sta['flags']:
188             raise Exception("Missing STA flag: HE")
189 
190     except Exception as e:
191         if isinstance(e, Exception) and str(e) == "AP startup failed":
192             if not he_supported():
193                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
194         raise
195     finally:
196         dev[0].request("DISCONNECT")
197         clear_regdom(hapd, dev)
198 
199 def _test_he_wifi_generation(dev, apdev, conf, scan_freq):
200     try:
201         hapd = None
202         params = {"ssid": "he",
203                   "country_code": "FI",
204                   "ieee80211n": "1",
205                   "ieee80211ax": "1"}
206         params.update(conf)
207         hapd = hostapd.add_ap(apdev[0], params)
208         bssid = apdev[0]['bssid']
209 
210         dev[0].connect("he", key_mgmt="NONE", scan_freq=scan_freq)
211         status = dev[0].get_status()
212         if 'wifi_generation' not in status:
213             # For now, assume this is because of missing kernel support
214             raise HwsimSkip("Association Request IE reporting not supported")
215             #raise Exception("Missing wifi_generation information")
216         if status['wifi_generation'] != "6":
217             raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
218 
219         wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
220         wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
221         wpas.connect("he", key_mgmt="NONE", scan_freq=scan_freq)
222         status = wpas.get_status()
223         if 'wifi_generation' not in status:
224             # For now, assume this is because of missing kernel support
225             raise HwsimSkip("Association Request IE reporting not supported")
226             #raise Exception("Missing wifi_generation information (connect)")
227         if status['wifi_generation'] != "6":
228             raise Exception("Unexpected wifi_generation value (connect): " + status['wifi_generation'])
229     except Exception as e:
230         if isinstance(e, Exception) and str(e) == "AP startup failed":
231             if not he_supported():
232                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
233         raise
234     finally:
235         dev[0].request("DISCONNECT")
236         clear_regdom(hapd, dev)
237 
238 def test_he_wifi_generation(dev, apdev):
239     """HE and wifi_generation (5 GHz)"""
240     conf = {
241         "vht_oper_chwidth": "1",
242         "hw_mode": "a",
243         "channel": "36",
244         "ht_capab": "[HT40+]",
245         "vht_oper_centr_freq_seg0_idx": "42",
246         "he_oper_chwidth": "1",
247         "he_oper_centr_freq_seg0_idx": "42",
248         "vht_capab": "[MAX-MPDU-11454]",
249         "ieee80211ac": "1",
250     }
251     _test_he_wifi_generation(dev, apdev, conf, "5180")
252 
253 def test_he_wifi_generation_24(dev, apdev):
254     """HE and wifi_generation (2.4 GHz)"""
255     conf = {
256         "hw_mode": "g",
257         "channel": "1",
258     }
259     _test_he_wifi_generation(dev, apdev, conf, "2412")
260 
261 def he80_test(apdev, dev, channel, ht_capab):
262     clear_scan_cache(apdev)
263     try:
264         hapd = None
265         params = {"ssid": "he",
266                   "country_code": "FI",
267                   "hw_mode": "a",
268                   "channel": str(channel),
269                   "ht_capab": ht_capab,
270                   "ieee80211n": "1",
271                   "ieee80211ac": "1",
272                   "ieee80211ax": "1",
273                   "vht_oper_chwidth": "1",
274                   "vht_oper_centr_freq_seg0_idx": "42",
275                   "he_oper_chwidth": "1",
276                   "he_oper_centr_freq_seg0_idx": "42"}
277         hapd = hostapd.add_ap(apdev, params)
278         bssid = apdev['bssid']
279 
280         dev[0].connect("he", key_mgmt="NONE",
281                        scan_freq=str(5000 + 5 * channel))
282         hwsim_utils.test_connectivity(dev[0], hapd)
283     except Exception as e:
284         if isinstance(e, Exception) and str(e) == "AP startup failed":
285             if not he_supported():
286                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
287         raise
288     finally:
289         clear_regdom(hapd, dev)
290 
291 def test_he80b(dev, apdev):
292     """HE with 80 MHz channel width (HT40- channel 40)"""
293     he80_test(apdev[0], dev, 40, "[HT40-]")
294 
295 def test_he80c(dev, apdev):
296     """HE with 80 MHz channel width (HT40+ channel 44)"""
297     he80_test(apdev[0], dev, 44, "[HT40+]")
298 
299 def test_he80d(dev, apdev):
300     """HE with 80 MHz channel width (HT40- channel 48)"""
301     he80_test(apdev[0], dev, 48, "[HT40-]")
302 
303 def test_he80_params(dev, apdev):
304     """HE with 80 MHz channel width and number of optional features enabled"""
305     try:
306         hapd = None
307         params = {"ssid": "he",
308                   "country_code": "FI",
309                   "hw_mode": "a",
310                   "channel": "36",
311                   "ht_capab": "[HT40+][SHORT-GI-40][DSS_CCK-40]",
312                   "ieee80211n": "1",
313                   "ieee80211ac": "1",
314                   "ieee80211ax": "1",
315                   "vht_oper_chwidth": "1",
316                   "vht_capab": "[MAX-MPDU-11454][RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC-1][MAX-A-MPDU-LEN-EXP0]",
317                   "vht_oper_centr_freq_seg0_idx": "42",
318                   "require_vht": "1",
319                   "require_he": "1",
320                   "he_oper_chwidth": "1",
321                   "he_oper_centr_freq_seg0_idx": "42",
322                   "he_su_beamformer": "1",
323                   "he_mu_beamformer": "1",
324                   "he_bss_color":"1",
325                   "he_default_pe_duration":"1",
326                   "he_twt_required":"1",
327                   "he_rts_threshold":"1"}
328         hapd = hostapd.add_ap(apdev[0], params)
329 
330         wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
331         wpas.interface_add("wlan5",
332                            drv_params="extra_bss_membership_selectors=126,122")
333 
334         wpas.connect("he", key_mgmt="NONE", scan_freq="5180",
335                      disable_vht="1", wait_connect=False)
336         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
337         dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
338                        disable_sgi="1")
339         ev = wpas.wait_event(["CTRL-EVENT-ASSOC-REJECT"])
340         if ev is None:
341             raise Exception("Association rejection timed out")
342         if "status_code=104" not in ev:
343             raise Exception("Unexpected rejection status code")
344         wpas.request("DISCONNECT")
345         wpas.request("REMOVE_NETWORK all")
346         wpas.dump_monitor()
347         wpas.connect("he", key_mgmt="NONE", scan_freq="5180",
348                      disable_he="1", wait_connect=False)
349         hwsim_utils.test_connectivity(dev[0], hapd)
350         sta0 = hapd.get_sta(dev[0].own_addr())
351         sta2 = hapd.get_sta(dev[2].own_addr())
352         capab0 = int(sta0['vht_caps_info'], base=16)
353         capab2 = int(sta2['vht_caps_info'], base=16)
354         if capab0 & 0x60 == 0:
355             raise Exception("dev[0] did not support SGI")
356         if capab2 & 0x60 != 0:
357             raise Exception("dev[2] claimed support for SGI")
358         ev = wpas.wait_event(["CTRL-EVENT-ASSOC-REJECT"])
359         if ev is None:
360             raise Exception("Association rejection timed out (2)")
361         if "status_code=124" not in ev:
362             raise Exception("Unexpected rejection status code (2): " + ev)
363     except Exception as e:
364         if isinstance(e, Exception) and str(e) == "AP startup failed":
365             if not he_supported():
366                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
367         raise
368     finally:
369         clear_regdom(hapd, dev, count=3)
370 
371 def test_he80_invalid(dev, apdev):
372     """HE with invalid 80 MHz channel configuration (seg1)"""
373     try:
374         hapd = None
375         params = {"ssid": "he",
376                   "country_code": "US",
377                   "hw_mode": "a",
378                   "channel": "36",
379                   "ht_capab": "[HT40+]",
380                   "ieee80211n": "1",
381                   "ieee80211ac": "1",
382                   "ieee80211ax": "1",
383                   "vht_oper_chwidth": "1",
384                   "vht_oper_centr_freq_seg0_idx": "42",
385                   "vht_oper_centr_freq_seg1_idx": "159",
386                   "he_oper_chwidth": "1",
387                   "he_oper_centr_freq_seg0_idx": "42",
388                   "he_oper_centr_freq_seg1_idx": "155",
389                   'ieee80211d': '1',
390                   'ieee80211h': '1'}
391         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
392         # This fails due to unexpected seg1 configuration
393         ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
394         if ev is None:
395             raise Exception("AP-DISABLED not reported")
396     except Exception as e:
397         if isinstance(e, Exception) and str(e) == "AP startup failed":
398             if not he_supported():
399                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
400         raise
401     finally:
402         clear_regdom(hapd, dev)
403 
404 def test_he80_invalid2(dev, apdev):
405     """HE with invalid 80 MHz channel configuration (seg0)"""
406     try:
407         hapd = None
408         params = {"ssid": "he",
409                   "country_code": "US",
410                   "hw_mode": "a",
411                   "channel": "36",
412                   "ht_capab": "[HT40+]",
413                   "ieee80211n": "1",
414                   "ieee80211ac": "1",
415                   "ieee80211ax": "1",
416                   "vht_oper_chwidth": "1",
417                   "vht_oper_centr_freq_seg0_idx": "42",
418                   "he_oper_chwidth": "1",
419                   "he_oper_centr_freq_seg0_idx": "46",
420                   'ieee80211d': '1',
421                   'ieee80211h': '1'}
422         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
423         # This fails due to invalid seg0 configuration
424         ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
425         if ev is None:
426             raise Exception("AP-DISABLED not reported")
427     except Exception as e:
428         if isinstance(e, Exception) and str(e) == "AP startup failed":
429             if not he_supported():
430                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
431         raise
432     finally:
433         clear_regdom(hapd, dev)
434 
435 def test_he_20(devs, apdevs):
436     """HE and 20 MHz channel"""
437     dev = devs[0]
438     ap = apdevs[0]
439     try:
440         hapd = None
441         params = {"ssid": "test-he20",
442                   "country_code": "DE",
443                   "hw_mode": "a",
444                   "channel": "36",
445                   "ieee80211n": "1",
446                   "ieee80211ac": "1",
447                   "ieee80211ax": "1",
448                   "ht_capab": "",
449                   "vht_capab": "",
450                   "vht_oper_chwidth": "0",
451                   "vht_oper_centr_freq_seg0_idx": "0",
452                   "supported_rates": "60 120 240 360 480 540",
453                   "require_vht": "1",
454                   "he_oper_chwidth": "0",
455                   "he_oper_centr_freq_seg0_idx": "0"}
456         hapd = hostapd.add_ap(ap, params)
457         dev.connect("test-he20", scan_freq="5180", key_mgmt="NONE")
458         hwsim_utils.test_connectivity(dev, hapd)
459     finally:
460         dev.request("DISCONNECT")
461         clear_regdom(hapd, devs)
462 
463 def test_he_40(devs, apdevs):
464     """HE and 40 MHz channel"""
465     dev = devs[0]
466     ap = apdevs[0]
467     try:
468         hapd = None
469         params = {"ssid": "test-he40",
470                   "country_code": "DE",
471                   "hw_mode": "a",
472                   "channel": "36",
473                   "ieee80211n": "1",
474                   "ieee80211ac": "1",
475                   "ieee80211ax": "1",
476                   "ht_capab": "[HT40+]",
477                   "vht_capab": "",
478                   "vht_oper_chwidth": "0",
479                   "vht_oper_centr_freq_seg0_idx": "38",
480                   "he_oper_chwidth": "0",
481                   "he_oper_centr_freq_seg0_idx": "38",
482                   "he_su_beamformer": "1",
483                   "he_mu_beamformer": "1"}
484         hapd = hostapd.add_ap(ap, params)
485         dev.connect("test-he40", scan_freq="5180", key_mgmt="NONE")
486         hwsim_utils.test_connectivity(dev, hapd)
487     finally:
488         dev.request("DISCONNECT")
489         clear_regdom(hapd, devs)
490 
491 @long_duration_test
492 def test_he160(dev, apdev):
493     """HE with 160 MHz channel width (1)"""
494     try:
495         hapd = None
496         params = {"ssid": "he",
497                   "country_code": "FI",
498                   "hw_mode": "a",
499                   "channel": "36",
500                   "ht_capab": "[HT40+]",
501                   "vht_capab": "[VHT160]",
502                   "ieee80211n": "1",
503                   "ieee80211ac": "1",
504                   "ieee80211ax": "1",
505                   "vht_oper_chwidth": "2",
506                   "vht_oper_centr_freq_seg0_idx": "50",
507                   "he_oper_chwidth": "2",
508                   "he_oper_centr_freq_seg0_idx": "50",
509                   'ieee80211d': '1',
510                   'ieee80211h': '1'}
511         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
512         bssid = apdev[0]['bssid']
513 
514         ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
515         if "DFS-CAC-START" not in ev:
516             raise Exception("Unexpected DFS event")
517 
518         state = hapd.get_status_field("state")
519         if state != "DFS":
520             if state == "DISABLED" and not os.path.exists("dfs"):
521                 # Not all systems have recent enough CRDA version and
522                 # wireless-regdb changes to support 160 MHz and DFS. For now,
523                 # do not report failures for this test case.
524                 raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
525             raise Exception("Unexpected interface state: " + state)
526 
527         logger.info("Waiting for CAC to complete")
528 
529         ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
530         if "success=1" not in ev:
531             raise Exception("CAC failed")
532         if "freq=5180" not in ev:
533             raise Exception("Unexpected DFS freq result")
534 
535         ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
536         if not ev:
537             raise Exception("AP setup timed out")
538 
539         state = hapd.get_status_field("state")
540         if state != "ENABLED":
541             raise Exception("Unexpected interface state")
542 
543         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
544         dev[0].wait_regdom(country_ie=True)
545         hwsim_utils.test_connectivity(dev[0], hapd)
546         sig = dev[0].request("SIGNAL_POLL").splitlines()
547         if "FREQUENCY=5180" not in sig:
548             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
549         if "WIDTH=160 MHz" not in sig:
550             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
551         est = dev[0].get_bss(bssid)['est_throughput']
552         if est != "1201002":
553             raise Exception("Unexpected BSS est_throughput: " + est)
554     except Exception as e:
555         if isinstance(e, Exception) and str(e) == "AP startup failed":
556             if not he_supported():
557                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
558         raise
559     finally:
560         if hapd:
561             hapd.request("DISABLE")
562         dev[0].disconnect_and_stop_scan()
563         subprocess.call(['iw', 'reg', 'set', '00'])
564         dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
565         dev[0].flush_scan_cache()
566 
567 @long_duration_test
568 def test_he160b(dev, apdev):
569     """HE with 160 MHz channel width (2)"""
570     try:
571         hapd = None
572 
573         params = {"ssid": "he",
574                   "country_code": "FI",
575                   "hw_mode": "a",
576                   "channel": "104",
577                   "ht_capab": "[HT40-]",
578                   "vht_capab": "[VHT160]",
579                   "ieee80211n": "1",
580                   "ieee80211ac": "1",
581                   "ieee80211ax": "1",
582                   "vht_oper_chwidth": "2",
583                   "vht_oper_centr_freq_seg0_idx": "114",
584                   "he_oper_chwidth": "2",
585                   "he_oper_centr_freq_seg0_idx": "114",
586                   'ieee80211d': '1',
587                   'ieee80211h': '1'}
588         hapd = hostapd.add_ap(apdev[1], params, wait_enabled=False)
589 
590         ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
591         if "DFS-CAC-START" not in ev:
592             raise Exception("Unexpected DFS event(2)")
593 
594         state = hapd.get_status_field("state")
595         if state != "DFS":
596             if state == "DISABLED" and not os.path.exists("dfs"):
597                 # Not all systems have recent enough CRDA version and
598                 # wireless-regdb changes to support 160 MHz and DFS. For now,
599                 # do not report failures for this test case.
600                 raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
601             raise Exception("Unexpected interface state: " + state)
602 
603         logger.info("Waiting for CAC to complete")
604 
605         ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
606         if "success=1" not in ev:
607             raise Exception("CAC failed(2)")
608         if "freq=5520" not in ev:
609             raise Exception("Unexpected DFS freq result(2)")
610 
611         ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
612         if not ev:
613             raise Exception("AP setup timed out(2)")
614 
615         state = hapd.get_status_field("state")
616         if state != "ENABLED":
617             raise Exception("Unexpected interface state(2)")
618 
619         freq = hapd.get_status_field("freq")
620         if freq != "5520":
621             raise Exception("Unexpected frequency(2)")
622 
623         dev[0].connect("he", key_mgmt="NONE", scan_freq="5520")
624         dev[0].wait_regdom(country_ie=True)
625         hwsim_utils.test_connectivity(dev[0], hapd)
626         sig = dev[0].request("SIGNAL_POLL").splitlines()
627         if "FREQUENCY=5520" not in sig:
628             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
629         if "WIDTH=160 MHz" not in sig:
630             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
631     except Exception as e:
632         if isinstance(e, Exception) and str(e) == "AP startup failed":
633             if not he_supported():
634                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
635         raise
636     finally:
637         if hapd:
638             hapd.request("DISABLE")
639         dev[0].disconnect_and_stop_scan()
640         subprocess.call(['iw', 'reg', 'set', '00'])
641         dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
642         dev[0].flush_scan_cache()
643 
644 def test_he160_no_dfs_100_plus(dev, apdev):
645     """HE with 160 MHz channel width and no DFS (100 plus)"""
646     run_ap_he160_no_dfs(dev, apdev, "100", "[HT40+]")
647 
648 def test_he160_no_dfs(dev, apdev):
649     """HE with 160 MHz channel width and no DFS (104 minus)"""
650     run_ap_he160_no_dfs(dev, apdev, "104", "[HT40-]")
651 
652 def test_he160_no_dfs_108_plus(dev, apdev):
653     """HE with 160 MHz channel width and no DFS (108 plus)"""
654     run_ap_he160_no_dfs(dev, apdev, "108", "[HT40+]")
655 
656 def test_he160_no_dfs_112_minus(dev, apdev):
657     """HE with 160 MHz channel width and no DFS (112 minus)"""
658     run_ap_he160_no_dfs(dev, apdev, "112", "[HT40-]")
659 
660 def test_he160_no_dfs_116_plus(dev, apdev):
661     """HE with 160 MHz channel width and no DFS (116 plus)"""
662     run_ap_he160_no_dfs(dev, apdev, "116", "[HT40+]")
663 
664 def test_he160_no_dfs_120_minus(dev, apdev):
665     """HE with 160 MHz channel width and no DFS (120 minus)"""
666     run_ap_he160_no_dfs(dev, apdev, "120", "[HT40-]")
667 
668 def test_he160_no_dfs_124_plus(dev, apdev):
669     """HE with 160 MHz channel width and no DFS (124 plus)"""
670     run_ap_he160_no_dfs(dev, apdev, "124", "[HT40+]")
671 
672 def test_he160_no_dfs_128_minus(dev, apdev):
673     """HE with 160 MHz channel width and no DFS (128 minus)"""
674     run_ap_he160_no_dfs(dev, apdev, "128", "[HT40-]")
675 
676 def run_ap_he160_no_dfs(dev, apdev, channel, ht_capab):
677     try:
678         hapd = None
679         params = {"ssid": "he",
680                   "country_code": "ZA",
681                   "hw_mode": "a",
682                   "channel": channel,
683                   "ht_capab": ht_capab,
684                   "vht_capab": "[VHT160]",
685                   "ieee80211n": "1",
686                   "ieee80211ac": "1",
687                   "ieee80211ax": "1",
688                   "vht_oper_chwidth": "2",
689                   "vht_oper_centr_freq_seg0_idx": "114",
690                   "he_oper_chwidth": "2",
691                   "he_oper_centr_freq_seg0_idx": "114",
692                   'ieee80211d': '1',
693                   'ieee80211h': '1'}
694         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
695         ev = hapd.wait_event(["AP-ENABLED"], timeout=2)
696         if not ev:
697             cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
698             out, err = cmd.communicate()
699             reg = out.splitlines()
700             for r in reg:
701                 if b"5490" in r and b"DFS" in r:
702                     raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
703             raise Exception("AP setup timed out")
704 
705         freq = str(int(channel) * 5 + 5000)
706         dev[0].connect("he", key_mgmt="NONE", scan_freq=freq)
707         dev[0].wait_regdom(country_ie=True)
708         hwsim_utils.test_connectivity(dev[0], hapd)
709         sig = dev[0].request("SIGNAL_POLL").splitlines()
710         if "FREQUENCY=" + freq not in sig:
711             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
712         if "WIDTH=160 MHz" not in sig:
713             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
714     except Exception as e:
715         if isinstance(e, Exception) and str(e) == "AP startup failed":
716             if not he_supported():
717                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
718         raise
719     finally:
720         clear_regdom(hapd, dev)
721 
722 def test_he160_no_ht40(dev, apdev):
723     """HE with 160 MHz channel width and HT40 disabled"""
724     try:
725         hapd = None
726         params = {"ssid": "he",
727                   "country_code": "ZA",
728                   "hw_mode": "a",
729                   "channel": "108",
730                   "ht_capab": "",
731                   "ieee80211n": "1",
732                   "ieee80211ac": "1",
733                   "ieee80211ax": "1",
734                   "vht_oper_chwidth": "2",
735                   "vht_oper_centr_freq_seg0_idx": "114",
736                   "he_oper_chwidth": "2",
737                   "he_oper_centr_freq_seg0_idx": "114",
738                   'ieee80211d': '1',
739                   'ieee80211h': '1'}
740         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
741         ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=2)
742         if not ev:
743             cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
744             out, err = cmd.communicate()
745             reg = out.splitlines()
746             for r in reg:
747                 if "5490" in r and "DFS" in r:
748                     raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
749             raise Exception("AP setup timed out")
750         if "AP-ENABLED" in ev:
751             # This was supposed to fail due to sec_channel_offset == 0
752             raise Exception("Unexpected AP-ENABLED")
753     except Exception as e:
754         if isinstance(e, Exception) and str(e) == "AP startup failed":
755             if not he_supported():
756                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
757         raise
758     finally:
759         clear_regdom(hapd, dev)
760 
761 def test_he80plus80(dev, apdev):
762     """HE with 80+80 MHz channel width"""
763     try:
764         hapd = None
765         hapd2 = None
766         params = {"ssid": "he",
767                   "country_code": "US",
768                   "hw_mode": "a",
769                   "channel": "52",
770                   "ht_capab": "[HT40+]",
771                   "vht_capab": "[VHT160-80PLUS80]",
772                   "ieee80211n": "1",
773                   "ieee80211ac": "1",
774                   "ieee80211ax": "1",
775                   "vht_oper_chwidth": "3",
776                   "vht_oper_centr_freq_seg0_idx": "58",
777                   "vht_oper_centr_freq_seg1_idx": "155",
778                   "he_oper_chwidth": "3",
779                   "he_oper_centr_freq_seg0_idx": "58",
780                   "he_oper_centr_freq_seg1_idx": "155",
781                   'ieee80211d': '1',
782                   'ieee80211h': '1'}
783         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
784         # This will actually fail since DFS on 80+80 is not yet supported
785         ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
786         # ignore result to avoid breaking the test once 80+80 DFS gets enabled
787 
788         params = {"ssid": "he2",
789                   "country_code": "US",
790                   "hw_mode": "a",
791                   "channel": "36",
792                   "ht_capab": "[HT40+]",
793                   "vht_capab": "[VHT160-80PLUS80]",
794                   "ieee80211n": "1",
795                   "ieee80211ac": "1",
796                   "ieee80211ax": "1",
797                   "vht_oper_chwidth": "3",
798                   "vht_oper_centr_freq_seg0_idx": "42",
799                   "vht_oper_centr_freq_seg1_idx": "155",
800                   "he_oper_chwidth": "3",
801                   "he_oper_centr_freq_seg0_idx": "42",
802                   "he_oper_centr_freq_seg1_idx": "155"}
803         hapd2 = hostapd.add_ap(apdev[1], params, wait_enabled=False)
804 
805         ev = hapd2.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=5)
806         if not ev:
807             raise Exception("AP setup timed out(2)")
808         if "AP-DISABLED" in ev:
809             # Assume this failed due to missing regulatory update for now
810             raise HwsimSkip("80+80 MHz channel not supported in regulatory information")
811 
812         state = hapd2.get_status_field("state")
813         if state != "ENABLED":
814             raise Exception("Unexpected interface state(2)")
815 
816         dev[1].connect("he2", key_mgmt="NONE", scan_freq="5180")
817         hwsim_utils.test_connectivity(dev[1], hapd2)
818         sig = dev[1].request("SIGNAL_POLL").splitlines()
819         if "FREQUENCY=5180" not in sig:
820             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
821         if "WIDTH=80+80 MHz" not in sig:
822             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
823         if "CENTER_FRQ1=5210" not in sig:
824             raise Exception("Unexpected SIGNAL_POLL value(3): " + str(sig))
825         if "CENTER_FRQ2=5775" not in sig:
826             raise Exception("Unexpected SIGNAL_POLL value(4): " + str(sig))
827     except Exception as e:
828         if isinstance(e, Exception) and str(e) == "AP startup failed":
829             if not he_supported():
830                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
831         raise
832     finally:
833         dev[0].request("DISCONNECT")
834         dev[1].request("DISCONNECT")
835         if hapd:
836             hapd.request("DISABLE")
837         if hapd2:
838             hapd2.request("DISABLE")
839         subprocess.call(['iw', 'reg', 'set', '00'])
840         dev[0].flush_scan_cache()
841         dev[1].flush_scan_cache()
842 
843 def test_he80plus80_invalid(dev, apdev):
844     """HE with invalid 80+80 MHz channel"""
845     try:
846         hapd = None
847         params = {"ssid": "he",
848                   "country_code": "US",
849                   "hw_mode": "a",
850                   "channel": "36",
851                   "ht_capab": "[HT40+]",
852                   "ieee80211n": "1",
853                   "ieee80211ac": "1",
854                   "ieee80211ax": "1",
855                   "vht_oper_chwidth": "3",
856                   "vht_oper_centr_freq_seg0_idx": "42",
857                   "vht_oper_centr_freq_seg1_idx": "0",
858                   "he_oper_chwidth": "3",
859                   "he_oper_centr_freq_seg0_idx": "42",
860                   "he_oper_centr_freq_seg1_idx": "0",
861                   'ieee80211d': '1',
862                   'ieee80211h': '1'}
863         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
864         # This fails due to missing(invalid) seg1 configuration
865         ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
866         if ev is None:
867             raise Exception("AP-DISABLED not reported")
868     except Exception as e:
869         if isinstance(e, Exception) and str(e) == "AP startup failed":
870             if not he_supported():
871                 raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
872         raise
873     finally:
874         clear_regdom(hapd, dev)
875 
876 def test_he80_csa(dev, apdev):
877     """HE with 80 MHz channel width and CSA"""
878     csa_supported(dev[0])
879     try:
880         hapd = None
881         params = {"ssid": "he",
882                   "country_code": "US",
883                   "hw_mode": "a",
884                   "channel": "149",
885                   "ht_capab": "[HT40+]",
886                   "ieee80211n": "1",
887                   "ieee80211ac": "1",
888                   "ieee80211ax": "1",
889                   "vht_oper_chwidth": "1",
890                   "vht_oper_centr_freq_seg0_idx": "155",
891                   "he_oper_chwidth": "1",
892                   "he_oper_centr_freq_seg0_idx": "155"}
893         hapd = hostapd.add_ap(apdev[0], params)
894 
895         dev[0].connect("he", key_mgmt="NONE", scan_freq="5745")
896         hwsim_utils.test_connectivity(dev[0], hapd)
897 
898         hapd.request("CHAN_SWITCH 5 5180 ht vht he blocktx center_freq1=5210 sec_channel_offset=1 bandwidth=80")
899         ev = hapd.wait_event(["CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=10)
900         if ev is None:
901             raise Exception("Channel switch start event not seen")
902         if "freq=5180" not in ev:
903             raise Exception("Unexpected channel in CS started")
904         ev = hapd.wait_event(["CTRL-EVENT-CHANNEL-SWITCH"], timeout=10)
905         if ev is None:
906             raise Exception("Channel switch completion event not seen")
907         if "freq=5180" not in ev:
908             raise Exception("Unexpected channel in CS completed")
909         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
910         if ev is None:
911             raise Exception("CSA finished event timed out")
912         if "freq=5180" not in ev:
913             raise Exception("Unexpected channel in CSA finished event")
914         time.sleep(0.5)
915         hwsim_utils.test_connectivity(dev[0], hapd)
916 
917         hapd.request("CHAN_SWITCH 5 5745")
918         ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
919         if ev is None:
920             raise Exception("CSA finished event timed out")
921         if "freq=5745" not in ev:
922             raise Exception("Unexpected channel in CSA finished event")
923         time.sleep(0.5)
924         hwsim_utils.test_connectivity(dev[0], hapd)
925 
926         # This CSA to same channel will fail in kernel, so use this only for
927         # extra code coverage.
928         hapd.request("CHAN_SWITCH 5 5745")
929         hapd.wait_event(["AP-CSA-FINISHED"], timeout=1)
930     except Exception as e:
931         if isinstance(e, Exception) and str(e) == "AP startup failed":
932             if not he_supported():
933                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
934         raise
935     finally:
936         dev[0].request("DISCONNECT")
937         clear_regdom(hapd, dev)
938 
939 def test_he_on_24ghz(dev, apdev):
940     """Subset of HE features on 2.4 GHz"""
941     hapd = None
942     params = {"ssid": "test-he-2g",
943               "hw_mode": "g",
944               "channel": "1",
945               "ieee80211n": "1",
946               "ieee80211ax": "1",
947               "vht_oper_chwidth": "0",
948               "vht_oper_centr_freq_seg0_idx": "1",
949               "he_oper_chwidth": "0",
950               "he_oper_centr_freq_seg0_idx": "1"}
951     hapd = hostapd.add_ap(apdev[0], params)
952     try:
953         dev[0].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
954         hwsim_utils.test_connectivity(dev[0], hapd)
955         sta = hapd.get_sta(dev[0].own_addr())
956 
957         dev[1].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
958         sta = hapd.get_sta(dev[1].own_addr())
959 
960     finally:
961         dev[0].request("DISCONNECT")
962         dev[1].request("DISCONNECT")
963         if hapd:
964             hapd.request("DISABLE")
965         subprocess.call(['iw', 'reg', 'set', '00'])
966         dev[0].flush_scan_cache()
967         dev[1].flush_scan_cache()
968 
969 def test_he80_pwr_constraint(dev, apdev):
970     """HE with 80 MHz channel width and local power constraint"""
971     hapd = None
972     try:
973         params = {"ssid": "he",
974                   "country_code": "FI",
975                   "hw_mode": "a",
976                   "channel": "36",
977                   "ht_capab": "[HT40+]",
978                   "ieee80211d": "1",
979                   "local_pwr_constraint": "3",
980                   "ieee80211n": "1",
981                   "ieee80211ac": "1",
982                   "ieee80211ax": "1",
983                   "vht_oper_chwidth": "1",
984                   "vht_oper_centr_freq_seg0_idx": "42",
985                   "he_oper_chwidth": "1",
986                   "he_oper_centr_freq_seg0_idx": "42"}
987         hapd = hostapd.add_ap(apdev[0], params)
988 
989         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
990         dev[0].wait_regdom(country_ie=True)
991     except Exception as e:
992         if isinstance(e, Exception) and str(e) == "AP startup failed":
993             if not he_supported():
994                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
995         raise
996     finally:
997         if hapd:
998             hapd.request("DISABLE")
999         dev[0].disconnect_and_stop_scan()
1000         subprocess.call(['iw', 'reg', 'set', '00'])
1001         dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
1002         dev[0].flush_scan_cache()
1003 
1004 def test_he_use_sta_nsts(dev, apdev):
1005     """HE with 80 MHz channel width and use_sta_nsts=1"""
1006     try:
1007         hapd = None
1008         params = {"ssid": "he",
1009                   "country_code": "FI",
1010                   "hw_mode": "a",
1011                   "channel": "36",
1012                   "ht_capab": "[HT40+]",
1013                   "ieee80211n": "1",
1014                   "ieee80211ac": "1",
1015                   "ieee80211ax": "1",
1016                   "vht_oper_chwidth": "1",
1017                   "vht_oper_centr_freq_seg0_idx": "42",
1018                   "he_oper_chwidth": "1",
1019                   "he_oper_centr_freq_seg0_idx": "42",
1020                   "use_sta_nsts": "1"}
1021         hapd = hostapd.add_ap(apdev[0], params)
1022         bssid = apdev[0]['bssid']
1023 
1024         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1025         hwsim_utils.test_connectivity(dev[0], hapd)
1026     except Exception as e:
1027         if isinstance(e, Exception) and str(e) == "AP startup failed":
1028             if not he_supported():
1029                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
1030         raise
1031     finally:
1032         clear_regdom(hapd, dev)
1033 
1034 def test_he_tkip(dev, apdev):
1035     """HE and TKIP"""
1036     skip_without_tkip(dev[0])
1037     try:
1038         hapd = None
1039         params = {"ssid": "he",
1040                   "wpa": "1",
1041                   "wpa_key_mgmt": "WPA-PSK",
1042                   "wpa_pairwise": "TKIP",
1043                   "wpa_passphrase": "12345678",
1044                   "country_code": "FI",
1045                   "hw_mode": "a",
1046                   "channel": "36",
1047                   "ht_capab": "[HT40+]",
1048                   "ieee80211n": "1",
1049                   "ieee80211ac": "1",
1050                   "ieee80211ax": "1",
1051                   "vht_oper_chwidth": "1",
1052                   "vht_oper_centr_freq_seg0_idx": "42",
1053                   "he_oper_chwidth": "1",
1054                   "he_oper_centr_freq_seg0_idx": "42"}
1055         hapd = hostapd.add_ap(apdev[0], params)
1056         bssid = apdev[0]['bssid']
1057 
1058         dev[0].connect("he", psk="12345678", scan_freq="5180")
1059         hwsim_utils.test_connectivity(dev[0], hapd)
1060         sig = dev[0].request("SIGNAL_POLL").splitlines()
1061         if "FREQUENCY=5180" not in sig:
1062             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1063         if "WIDTH=20 MHz (no HT)" not in sig:
1064             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1065         status = hapd.get_status()
1066         logger.info("hostapd STATUS: " + str(status))
1067         if status["ieee80211n"] != "0":
1068             raise Exception("Unexpected STATUS ieee80211n value")
1069         if status["ieee80211ac"] != "0":
1070             raise Exception("Unexpected STATUS ieee80211ac value")
1071         if status["ieee80211ax"] != "0":
1072             raise Exception("Unexpected STATUS ieee80211ax value")
1073         if status["secondary_channel"] != "0":
1074             raise Exception("Unexpected STATUS secondary_channel value")
1075     except Exception as e:
1076         if isinstance(e, Exception) and str(e) == "AP startup failed":
1077             if not he_supported():
1078                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
1079         raise
1080     finally:
1081         dev[0].request("DISCONNECT")
1082         clear_regdom(hapd, dev)
1083 
1084 def test_he_40_fallback_to_20(devs, apdevs):
1085     """HE and 40 MHz channel configuration falling back to 20 MHz"""
1086     dev = devs[0]
1087     ap = apdevs[0]
1088     try:
1089         hapd = None
1090         params = {"ssid": "test-he40",
1091                   "country_code": "US",
1092                   "hw_mode": "a",
1093                   "basic_rates": "60 120 240",
1094                   "channel": "161",
1095                   "ieee80211d": "1",
1096                   "ieee80211h": "1",
1097                   "ieee80211n": "1",
1098                   "ieee80211ac": "1",
1099                   "ieee80211ax": "1",
1100                   "ht_capab": "[HT40+][SHORT-GI-20][SHORT-GI-40][DSSS_CCK-40]",
1101                   "vht_capab": "[RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC1][MAX-MPDU-11454][MAX-A-MPDU-LEN-EXP7]",
1102                   "vht_oper_chwidth": "0",
1103                   "vht_oper_centr_freq_seg0_idx": "155",
1104                   "he_oper_chwidth": "0",
1105                   "he_oper_centr_freq_seg0_idx": "155"}
1106         hapd = hostapd.add_ap(ap, params)
1107         dev.connect("test-he40", scan_freq="5805", key_mgmt="NONE")
1108         dev.wait_regdom(country_ie=True)
1109         hwsim_utils.test_connectivity(dev, hapd)
1110     finally:
1111         clear_regdom(hapd, devs)
1112 
1113 def test_he80_to_24g_he(dev, apdev):
1114     """HE with 80 MHz channel width reconfigured to 2.4 GHz HE"""
1115     try:
1116         hapd = None
1117         params = {"ssid": "he",
1118                   "country_code": "FI",
1119                   "hw_mode": "a",
1120                   "channel": "36",
1121                   "ht_capab": "[HT40+]",
1122                   "ieee80211n": "1",
1123                   "ieee80211ac": "1",
1124                   "ieee80211ax": "1",
1125                   "vht_oper_chwidth": "1",
1126                   "vht_capab": "[MAX-MPDU-11454]",
1127                   "vht_oper_centr_freq_seg0_idx": "42",
1128                   "he_oper_chwidth": "1",
1129                   "he_oper_centr_freq_seg0_idx": "42"}
1130         hapd = hostapd.add_ap(apdev[0], params)
1131         bssid = apdev[0]['bssid']
1132 
1133         hapd.disable()
1134         hapd.set("ieee80211ac", "0")
1135         hapd.set("hw_mode", "g")
1136         hapd.set("channel", "1")
1137         hapd.set("ht_capab", "")
1138         hapd.set("vht_capab", "")
1139         hapd.set("he_oper_chwidth", "")
1140         hapd.set("he_oper_centr_freq_seg0_idx", "")
1141         hapd.set("vht_oper_chwidth", "")
1142         hapd.set("vht_oper_centr_freq_seg0_idx", "")
1143         hapd.enable()
1144 
1145         dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1146     except Exception as e:
1147         if isinstance(e, Exception) and str(e) == "AP startup failed":
1148             if not he_supported():
1149                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
1150         raise
1151     finally:
1152         dev[0].request("DISCONNECT")
1153         clear_regdom(hapd, dev)
1154 
1155 def test_he_twt(dev, apdev):
1156     """HE and TWT"""
1157     params = {"ssid": "he",
1158               "ieee80211ax": "1",
1159               "he_bss_color": "42",
1160               "he_twt_required":"1"}
1161     hapd = hostapd.add_ap(apdev[0], params)
1162 
1163     dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1164     if "OK" not in dev[0].request("TWT_SETUP"):
1165         raise Exception("TWT_SETUP failed")
1166     if "OK" not in dev[0].request("TWT_TEARDOWN"):
1167         raise Exception("TWT_SETUP failed")
1168     if "OK" not in dev[0].request("TWT_SETUP dialog=123 exponent=9 mantissa=10 min_twt=254 setup_cmd=1 twt=1234567890 requestor=1 trigger=0 implicit=0 flow_type=0 flow_id=2 protection=1 twt_channel=3 control=16"):
1169         raise Exception("TWT_SETUP failed")
1170     if "OK" not in dev[0].request("TWT_TEARDOWN flags=255"):
1171         raise Exception("TWT_SETUP failed")
1172 
1173 def test_he_6ghz(dev, apdev):
1174     """HE with 20 MHz channel width on 6 GHz"""
1175     check_sae_capab(dev[0])
1176 
1177     try:
1178         dev[0].set("sae_pwe", "1")
1179         hapd = None
1180         params = {"ssid": "he",
1181                   "country_code": "DE",
1182                   "op_class": "131",
1183                   "channel": "5",
1184                   "ieee80211ax": "1",
1185                   "wpa": "2",
1186                   "rsn_pairwise": "CCMP",
1187                   "wpa_key_mgmt": "SAE",
1188                   "sae_pwe": "1",
1189                   "sae_password": "password",
1190                   "ieee80211w": "2"}
1191         hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1192         bssid = apdev[0]['bssid']
1193 
1194         dev[0].set("sae_groups", "")
1195         dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1196                        ieee80211w="2", scan_freq="5975")
1197         hwsim_utils.test_connectivity(dev[0], hapd)
1198         sig = dev[0].request("SIGNAL_POLL").splitlines()
1199         if "FREQUENCY=5975" not in sig:
1200             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1201         if "WIDTH=20 MHz" not in sig:
1202             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1203         status = dev[0].get_status()
1204         if 'wifi_generation' not in status:
1205             # For now, assume this is because of missing kernel support
1206             raise HwsimSkip("Association Request IE reporting not supported")
1207             #raise Exception("Missing wifi_generation information")
1208         if status['wifi_generation'] != "6":
1209             raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1210         status = hapd.get_status()
1211         logger.info("hostapd STATUS: " + str(status))
1212         if status["ieee80211ax"] != "1":
1213             raise Exception("Unexpected STATUS ieee80211ax value")
1214         if status["he_oper_chwidth"] != "0":
1215             raise Exception("Unexpected STATUS he_oper_chwidth value")
1216 
1217         sta = hapd.get_sta(dev[0].own_addr())
1218         logger.info("hostapd STA: " + str(sta))
1219         if "[HE]" not in sta['flags']:
1220             raise Exception("Missing STA flag: HE")
1221 
1222     except Exception as e:
1223         if isinstance(e, Exception) and str(e) == "AP startup failed":
1224             if not he_supported():
1225                 raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1226         raise
1227     finally:
1228         dev[0].request("DISCONNECT")
1229         dev[0].set("sae_pwe", "0")
1230         clear_regdom(hapd, dev)
1231 
1232 def test_he_6ghz_auto_security(dev, apdev):
1233     """HE on 6 GHz and automatic security settings on STA"""
1234     check_sae_capab(dev[0])
1235     try:
1236         hapd = None
1237         params = {"ssid": "he",
1238                   "country_code": "DE",
1239                   "op_class": "131",
1240                   "channel": "5",
1241                   "ieee80211ax": "1",
1242                   "wpa": "2",
1243                   "ieee80211w": "2",
1244                   "rsn_pairwise": "CCMP",
1245                   "wpa_key_mgmt": "SAE",
1246                   "sae_password": "password"}
1247         hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1248         bssid = apdev[0]['bssid']
1249 
1250         dev[0].set("sae_groups", "")
1251         dev[0].connect("he", psk="password", key_mgmt="SAE WPA-PSK",
1252                        ieee80211w="1", scan_freq="5975")
1253         status = dev[0].get_status()
1254         if "pmf" not in status:
1255             raise Exception("pmf missing from status")
1256         if status["pmf"] != "2":
1257             raise Exception("Unexpected pmf status value: " + status["pmf"])
1258 
1259         hapd.wait_sta()
1260         sta = hapd.get_sta(dev[0].own_addr())
1261         if sta["hostapdMFPR"] != "1":
1262             raise Exception("STA did not indicate MFPR=1")
1263     except Exception as e:
1264         if isinstance(e, Exception) and str(e) == "AP startup failed":
1265             if not he_supported():
1266                 raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1267         raise
1268     finally:
1269         dev[0].request("DISCONNECT")
1270         clear_regdom(hapd, dev)
1271 
1272     params = hostapd.wpa2_params(ssid="he", passphrase="password")
1273     hapd = hostapd.add_ap(apdev[1], params)
1274     bssid = apdev[1]['bssid']
1275     dev[0].scan_for_bss(bssid, freq=2412)
1276     dev[0].request("RECONNECT")
1277     dev[0].wait_connected()
1278     status = dev[0].get_status()
1279     if "pmf" in status:
1280         raise Exception("Unexpected pmf status value(2): " + status["pmf"])
1281     hapd.wait_sta()
1282     sta = hapd.get_sta(dev[0].own_addr())
1283     if "[MFP]" in sta["flags"]:
1284         raise Exception("MFP reported unexpectedly(2)")
1285 
1286 def he_6ghz_acs(dev, apdev, op_class, bw):
1287     check_sae_capab(dev[0])
1288 
1289     try:
1290         dev[0].set("sae_pwe", "1")
1291         hapd = None
1292         params = {"ssid": "he",
1293                   "country_code": "DE",
1294                   "op_class": str(op_class),
1295                   "hw_mode": "a",
1296                   "channel": "0",
1297                   "ieee80211ax": "1",
1298                   "wpa": "2",
1299                   "rsn_pairwise": "CCMP",
1300                   "wpa_key_mgmt": "SAE",
1301                   "sae_pwe": "1",
1302                   "sae_password": "password",
1303                   "ieee80211w": "2"}
1304         hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
1305         wait_acs(hapd)
1306         bssid = apdev[0]['bssid']
1307 
1308         freq = hapd.get_status_field("freq")
1309         if int(freq) < 5955:
1310             raise Exception("Unexpected frequency: " + freq)
1311 
1312         sec = hapd.get_status_field("secondary_channel")
1313         if bw > 20 and int(sec) == 0:
1314             raise Exception("Secondary channel not set")
1315         if bw == 20 and int(sec) != 0:
1316             raise Exception("Secondary channel set")
1317 
1318         dev[0].set("sae_groups", "")
1319         dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1320                        ieee80211w="2", scan_freq=freq)
1321         hwsim_utils.test_connectivity(dev[0], hapd)
1322         sig = dev[0].request("SIGNAL_POLL").splitlines()
1323         if "FREQUENCY=" + freq not in sig:
1324             raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1325         if "WIDTH=" + str(bw) + " MHz" not in sig:
1326             raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1327     except Exception as e:
1328         if isinstance(e, Exception) and str(e) == "AP startup failed":
1329             if not he_supported():
1330                 raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1331         raise
1332     finally:
1333         dev[0].request("DISCONNECT")
1334         dev[0].set("sae_pwe", "0")
1335         clear_regdom(hapd, dev)
1336 
1337 def test_he_6ghz_acs_20mhz(dev, apdev):
1338     """HE with ACS on 6 GHz using a 20 MHz channel"""
1339     he_6ghz_acs(dev, apdev, 131, 20)
1340 
1341 def test_he_6ghz_acs_40mhz(dev, apdev):
1342     """HE with ACS on 6 GHz using a 40 MHz channel"""
1343     he_6ghz_acs(dev, apdev, 132, 40)
1344 
1345 def test_he_6ghz_acs_80mhz(dev, apdev):
1346     """HE with ACS on 6 GHz using an 80 MHz channel"""
1347     he_6ghz_acs(dev, apdev, 133, 80)
1348 
1349 def test_he_6ghz_acs_160mhz(dev, apdev):
1350     """HE with ACS on 6 GHz using a 160 MHz channel"""
1351     he_6ghz_acs(dev, apdev, 134, 160)
1352 
1353 def test_he_6ghz_security(dev, apdev):
1354     """HE AP and 6 GHz security parameter validation"""
1355     params = {"ssid": "he",
1356               "ieee80211ax": "1",
1357               "op_class": "131",
1358               "channel": "1"}
1359     hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
1360 
1361     # Pre-RSNA security methods are not allowed in 6 GHz
1362     if "FAIL" not in hapd.request("ENABLE"):
1363         raise Exception("Invalid configuration accepted(1)")
1364 
1365     # Management frame protection is required in 6 GHz"
1366     hapd.set("wpa", "2")
1367     hapd.set("wpa_passphrase", "12345678")
1368     hapd.set("wpa_key_mgmt", "SAE")
1369     hapd.set("rsn_pairwise", "CCMP")
1370     hapd.set("ieee80211w", "1")
1371     if "FAIL" not in hapd.request("ENABLE"):
1372         raise Exception("Invalid configuration accepted(2)")
1373 
1374     # Invalid AKM suite for 6 GHz
1375     hapd.set("ieee80211w", "2")
1376     hapd.set("wpa_key_mgmt", "SAE WPA-PSK")
1377     if "FAIL" not in hapd.request("ENABLE"):
1378         raise Exception("Invalid configuration accepted(3)")
1379 
1380     # Invalid pairwise cipher suite for 6 GHz
1381     hapd.set("wpa_key_mgmt", "SAE")
1382     hapd.set("rsn_pairwise", "CCMP TKIP")
1383     if "FAIL" not in hapd.request("ENABLE"):
1384         raise Exception("Invalid configuration accepted(4)")
1385 
1386     # Invalid group cipher suite for 6 GHz
1387     hapd.set("wpa_key_mgmt", "SAE")
1388     hapd.set("rsn_pairwise", "CCMP")
1389     hapd.set("group_cipher", "TKIP")
1390     if "FAIL" not in hapd.request("ENABLE"):
1391         raise Exception("Invalid configuration accepted(5)")
1392 
1393 def test_he_prefer_he20(dev, apdev):
1394     """Preference on HE20 over HT20"""
1395     params = {"ssid": "he",
1396               "channel": "1",
1397               "ieee80211ax": "0",
1398               "ieee80211n": "1"}
1399     hapd = hostapd.add_ap(apdev[0], params)
1400     bssid = apdev[0]['bssid']
1401     params = {"ssid": "test",
1402               "channel": "1",
1403               "ieee80211ax": "1",
1404               "ieee80211n": "1"}
1405     hapd2 = hostapd.add_ap(apdev[1], params)
1406     bssid2 = apdev[1]['bssid']
1407 
1408     dev[0].scan_for_bss(bssid, freq=2412)
1409     dev[0].scan_for_bss(bssid2, freq=2412)
1410     dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
1411     if dev[0].get_status_field('bssid') != bssid2:
1412         raise Exception("Unexpected BSS selected")
1413 
1414     est = dev[0].get_bss(bssid)['est_throughput']
1415     if est != "65000":
1416         raise Exception("Unexpected BSS0 est_throughput: " + est)
1417 
1418     est = dev[0].get_bss(bssid2)['est_throughput']
1419     if est != "143402":
1420         raise Exception("Unexpected BSS1 est_throughput: " + est)
1421 
1422 def test_he_capab_parsing(dev, apdev):
1423     """HE AP and capability parsing"""
1424     params = {"ssid": "he",
1425               "ieee80211ax": "1",
1426               "he_bss_color": "42",
1427               "he_mu_edca_ac_be_ecwmin": "7",
1428               "he_mu_edca_ac_be_ecwmax": "15"}
1429     hapd = hostapd.add_ap(apdev[0], params)
1430 
1431     hapd.set("ext_mgmt_frame_handling", "1")
1432     bssid = hapd.own_addr().replace(':', '')
1433     addr = "020304050607"
1434     addr_ = "02:03:04:05:06:07"
1435 
1436     tests = []
1437     mac_capa = binascii.unhexlify("0178c81a4000")
1438     phy_capa = binascii.unhexlify("00bfce0000000000000000")
1439     mcs_nss = binascii.unhexlify("faff")
1440     payload = mac_capa + phy_capa + 2*mcs_nss
1441     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1442     tests += [ (hdr + payload, True) ]
1443 
1444     phy_capa = binascii.unhexlify("08bfce0000000000000000")
1445     payload = mac_capa + phy_capa + 4*mcs_nss
1446     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1447     tests += [ (hdr + payload, True) ]
1448 
1449     phy_capa = binascii.unhexlify("10bfce0000000000000000")
1450     payload = mac_capa + phy_capa + 4*mcs_nss
1451     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1452     tests += [ (hdr + payload, True) ]
1453 
1454     phy_capa = binascii.unhexlify("18bfce0000000000000000")
1455     payload = mac_capa + phy_capa + 6*mcs_nss
1456     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1457     tests += [ (hdr + payload, True) ]
1458 
1459     # Missing PPE Threshold field
1460     phy_capa = binascii.unhexlify("00bfce0000008000000000")
1461     payload = mac_capa + phy_capa + 2*mcs_nss
1462     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1463     tests += [ (hdr + payload, False) ]
1464 
1465     # Truncated PPE Threshold field
1466     phy_capa = binascii.unhexlify("00bfce0000008000000000")
1467     payload = mac_capa + phy_capa + 2*mcs_nss + struct.pack('B', 0x79)
1468     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1469     tests += [ (hdr + payload, False) ]
1470 
1471     # Extra field at the end (without PPE Threshold field)
1472     phy_capa = binascii.unhexlify("00bfce0000000000000000")
1473     payload = mac_capa + phy_capa + 2*mcs_nss
1474     payload += struct.pack('B', 0)
1475     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1476     tests += [ (hdr + payload, True) ]
1477 
1478     # Extra field at the end (with PPE Threshold field)
1479     phy_capa = binascii.unhexlify("00bfce0000008000000000")
1480     payload = mac_capa + phy_capa + 2*mcs_nss
1481     payload += binascii.unhexlify("79000000000000")
1482     payload += struct.pack('B', 0)
1483     hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1484     tests += [ (hdr + payload, True) ]
1485 
1486     ppet = []
1487     # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x0
1488     # --> 3 + 4 + 0 * 6 * 1 = 7 bits --> 1 octet
1489     ppet += ["00"]
1490     # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x1
1491     # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1492     ppet += ["08" + "00"]
1493     # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x8
1494     # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1495     ppet += ["40" + "00"]
1496     # NSTS=2 (i.e., NSTS field value 1), RU Index Bitmask=0xf
1497     # --> 3 + 4 + 4 * 6 * 2 = 55 bits --> 7 octets
1498     ppet += ["79" + 6*"00"]
1499     # NSTS=3 (i.e., NSTS field value 2), RU Index Bitmask=0xf
1500     # --> 3 + 4 + 4 * 6 * 3 = 79 bits --> 10 octets
1501     ppet += ["7a" + 9*"00"]
1502     # NSTS=4 (i.e., NSTS field value 3), RU Index Bitmask=0x5
1503     # --> 3 + 4 + 2 * 6 * 4 = 55 bits --> 7 octets
1504     ppet += ["2b" + 6*"00"]
1505     # NSTS=8 (i.e., NSTS field value 7), RU Index Bitmask=0xf
1506     # --> 3 + 4 + 4 * 6 * 8 = 199 bits --> 25 octets
1507     ppet += ["ff" + 24*"00"]
1508     for p in ppet:
1509         phy_capa = binascii.unhexlify("00bfce0000008000000000")
1510         payload = mac_capa + phy_capa + 2*mcs_nss + binascii.unhexlify(p)
1511         hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1512         tests += [ (hdr + payload, True) ]
1513 
1514     for capab, result in tests:
1515         auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
1516         if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
1517             raise Exception("MGMT_RX_PROCESS failed")
1518 
1519         he_capab = binascii.hexlify(capab).decode()
1520 
1521         ies = "00026865" # SSID
1522         ies += "010802040b160c121824" # Supp Rates
1523         ies += "32043048606c" # Ext Supp Rates
1524         ies += "2d1afe131bffff000000000000000000000100000000000000000000" # HT Capab
1525         ies += "7f0b04004a0201404040000120" # Ext Capab
1526         ies += he_capab
1527         ies += "3b155151525354737475767778797a7b7c7d7e7f808182" # Supp Op Classes
1528         ies += "dd070050f202000100" # WMM
1529 
1530         assoc_req = "00003a01" + bssid + addr + bssid + "2000" + "21040500" + ies
1531         if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
1532             raise Exception("MGMT_RX_PROCESS failed")
1533 
1534         sta = hapd.get_sta(addr_)
1535         if result:
1536             if "[HE]" not in sta['flags']:
1537                 raise Exception("Missing STA flag: HE (HE Capab: %s)" % he_capab)
1538         else:
1539             if "[HE]" in sta['flags']:
1540                 raise Exception("Unexpected STA flag: HE (HE Capab: %s)" % he_capab)
1541 
1542         deauth = "c0003a01" + bssid + addr + bssid + "3000" + "0300"
1543         if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % deauth):
1544             raise Exception("MGMT_RX_PROCESS failed")
1545 
1546         hapd.dump_monitor()
1547 
1548 def test_he_cw_change_notification(dev, apdev):
1549     """HE AP on 80 MHz channel and CW change notification"""
1550     try:
1551         hapd = None
1552         params = {"ssid": "he",
1553                   "country_code": "FI",
1554                   "hw_mode": "a",
1555                   "channel": "36",
1556                   "ht_capab": "[HT40+]",
1557                   "ieee80211n": "1",
1558                   "ieee80211ac": "1",
1559                   "ieee80211ax": "1",
1560                   "vht_oper_chwidth": "1",
1561                   "vht_oper_centr_freq_seg0_idx": "42",
1562                   "he_oper_chwidth": "1",
1563                   "he_oper_centr_freq_seg0_idx": "42"}
1564         hapd = hostapd.add_ap(apdev[0], params)
1565         bssid = apdev[0]['bssid']
1566 
1567         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1568         dev[1].connect("he", key_mgmt="NONE", scan_freq="5180",
1569                        disable_he="1")
1570         dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
1571                        disable_he="1", disable_vht="1")
1572 
1573         sta = hapd.get_sta(dev[0].own_addr())
1574         logger.info("hostapd STA0: " + str(sta))
1575         if "[HT]" not in sta['flags']:
1576             raise Exception("Missing STA0 flag: HT")
1577         if "[VHT]" not in sta['flags']:
1578             raise Exception("Missing STA0 flag: VHT")
1579         if "[HE]" not in sta['flags']:
1580             raise Exception("Missing STA0 flag: HE")
1581 
1582         sta = hapd.get_sta(dev[1].own_addr())
1583         logger.info("hostapd STA1: " + str(sta))
1584         if "[HT]" not in sta['flags']:
1585             raise Exception("Missing STA1 flag: HT")
1586         if "[VHT]" not in sta['flags']:
1587             raise Exception("Missing STA1 flag: VHT")
1588         if "[HE]" in sta['flags']:
1589             raise Exception("Unexpected STA1 flag: HE")
1590 
1591         sta = hapd.get_sta(dev[2].own_addr())
1592         logger.info("hostapd STA1: " + str(sta))
1593         if "[HT]" not in sta['flags']:
1594             raise Exception("Missing STA2 flag: HT")
1595         if "[VHT]" in sta['flags']:
1596             raise Exception("Unxpected STA2 flag: VHT")
1597         if "[HE]" in sta['flags']:
1598             raise Exception("Unexpected STA2 flag: HE")
1599 
1600         for i in [2, 1, 0]:
1601             if "OK" not in hapd.request("NOTIFY_CW_CHANGE %d" % i):
1602                 raise Exception("NOTIFY_CW_CHANGE %d failed" % i)
1603 
1604             time.sleep(1)
1605             hwsim_utils.test_connectivity(dev[0], hapd)
1606             hwsim_utils.test_connectivity(dev[1], hapd)
1607             hwsim_utils.test_connectivity(dev[2], hapd)
1608     except Exception as e:
1609         if isinstance(e, Exception) and str(e) == "AP startup failed":
1610             if not he_supported():
1611                 raise HwsimSkip("80 MHz channel not supported in regulatory information")
1612         raise
1613     finally:
1614         dev[0].request("DISCONNECT")
1615         dev[1].request("DISCONNECT")
1616         dev[2].request("DISCONNECT")
1617         clear_regdom(hapd, dev)
1618 
1619 def he_verify_status(wpas, hapd, freq, bw, is_6ghz=True):
1620     status = hapd.get_status()
1621     logger.info("hostapd STATUS: " + str(status))
1622 
1623     if status["ieee80211n"] != "1":
1624         raise Exception("Unexpected STATUS ieee80211n value")
1625     if status["ieee80211ac"] != "1":
1626         raise Exception("Unexpected STATUS ieee80211ac value")
1627     if status["ieee80211ax"] != "1":
1628         raise Exception("Unexpected STATUS ieee80211ax value")
1629 
1630     sta = hapd.get_sta(wpas.own_addr())
1631     if "[HE]" not in sta['flags']:
1632         raise Exception("Missing STA flag: HE")
1633     if is_6ghz and "[6GHZ]" not in sta['flags']:
1634         raise Exception("Missing STA flag: 6GHZ")
1635 
1636     sig = wpas.request("SIGNAL_POLL").splitlines()
1637     if "FREQUENCY=%s" % freq not in sig:
1638         raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1639     if "WIDTH=%s MHz" % bw not in sig:
1640         raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1641 
1642 def he_verify_wifi_version(dev):
1643     status = dev.get_status()
1644     logger.info("station status: " + str(status))
1645 
1646     # For now, assume this is because of missing kernel support
1647     if 'wifi_generation' not in status:
1648         raise HwsimSkip("Association Request IE reporting not supported")
1649         #raise Exception("Missing wifi_generation information")
1650 
1651     if status['wifi_generation'] != "6":
1652         raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1653 
1654 def test_he_6ghz_reg(dev, apdev):
1655     """TX power control on 6 GHz"""
1656     check_sae_capab(dev[0])
1657     try:
1658         ssid = "HE_6GHz_regulatory"
1659         freq = 5975
1660         bw = "20"
1661         hapd = None
1662         params = {"ssid": ssid,
1663                   "country_code": "DE",
1664                   "hw_mode": "a",
1665                   "ieee80211ax": "1",
1666                   "wpa": "2",
1667                   "rsn_pairwise": "CCMP",
1668                   "wpa_key_mgmt": "SAE",
1669                   "sae_pwe": "1",
1670                   "sae_password": "password",
1671                   "ieee80211w": "2",
1672                   "channel": "5",
1673                   "op_class": "131",
1674                   "he_oper_centr_freq_seg0_idx": "5",
1675                   "ieee80211d": "1",
1676                   "ieee80211h": "1",
1677                   "ieee80211n": "1",
1678                   "ieee80211ac": "1",
1679                   "local_pwr_constraint": "4",
1680                   # Set the 6 GHz regulatory power configuration
1681                   "he_6ghz_reg_pwr_type": "0",
1682                   # Note: hostapd uses "Maximum Transmit Power Interpretation"
1683                   # set to "Regulatory client EIRP PSD", so the values should
1684                   # be set accordingly.
1685                   "reg_def_cli_eirp_psd": "3",
1686                   "reg_sub_cli_eirp_psd": "2"}
1687 
1688         hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1689 
1690         dev[0].set("sae_pwe", "1")
1691         dev[0].set("sae_groups", "")
1692         dev[0].connect(ssid, sae_password="password", key_mgmt="SAE",
1693                        ieee80211w="2", scan_freq=str(freq))
1694         hapd.wait_sta()
1695 
1696         he_verify_status(dev[0], hapd, freq, bw)
1697         he_verify_wifi_version(dev[0])
1698         hwsim_utils.test_connectivity(dev[0], hapd)
1699 
1700         # Configure different values related to power constraints and update
1701         # the Beacon frame contents.
1702         hapd.set("local_pwr_constraint", "2")
1703         hapd.set("he_6ghz_reg_pwr_type", "2")
1704         hapd.set("reg_def_cli_eirp_psd", "2")
1705         hapd.set("reg_sub_cli_eirp_psd", "1")
1706 
1707         # In addition, inject a Transmit Power Envelope as an vendor element
1708         hapd.set("vendor_elements", "c303190202")
1709 
1710         if "OK" not in hapd.request("UPDATE_BEACON"):
1711             raise Exception("UPDATE_BEACON failed")
1712 
1713         # Allow few more Beacon frames
1714         time.sleep(0.5)
1715 
1716         # Modify the regulatory power type to SP and provide the client EIRP
1717         # limit
1718         # EIRP = PSD + 10 * log(channel width)
1719         # 16 = 3 + 10 * log(20)
1720         hapd.set("vendor_elements", "")
1721         hapd.set("he_6ghz_reg_pwr_type", "1")
1722         hapd.set("reg_def_cli_eirp", "14")
1723 
1724         if "OK" not in hapd.request("UPDATE_BEACON"):
1725             raise Exception("UPDATE_BEACON failed")
1726 
1727         # Allow few more Beacon frames
1728         time.sleep(0.5)
1729     except Exception as e:
1730         if isinstance(e, Exception) and str(e) == "AP startup failed":
1731             if not he_supported():
1732                 raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1733         raise
1734     finally:
1735         dev[0].request("DISCONNECT")
1736         dev[0].set("sae_pwe", "0")
1737         dev[0].wait_disconnected()
1738         clear_regdom(hapd, dev)
1739 
1740 def test_he_downgrade_40mhz_to_20mhz(dev, apdev):
1741     """HE AP and downgrade from 40 MHz to 20 MHz due to regulatory constraints"""
1742     # Try to configure 40 MHz channel when the regdb limits this frequency to
1743     # 20 MHz.
1744     params = {"ssid": "he",
1745               "country_code": "AM",
1746               "channel": "36",
1747               "op_class": "116",
1748               "ieee80211n": "1",
1749               "ieee80211ac": "1",
1750               "ieee80211ax": "1",
1751               "hw_mode": "a",
1752               "ht_capab": "[HT40+]",
1753               "vht_oper_chwidth": "0",
1754               "he_oper_chwidth": "0" }
1755     run_he_downgrade_to_20_mhz(dev, apdev, params)
1756 
1757 def test_he_downgrade_40mhz_plus_minus_to_20mhz(dev, apdev):
1758     """HE AP and downgrade from 40 MHz (+/-) to 20 MHz due to regulatory constraints"""
1759     # Try to configure 40 MHz channel when the regdb limits this frequency to
1760     # 20 MHz.
1761     params = {"ssid": "he",
1762               "country_code": "AM",
1763               "channel": "36",
1764               "op_class": "116",
1765               "ieee80211n": "1",
1766               "ieee80211ac": "1",
1767               "ieee80211ax": "1",
1768               "hw_mode": "a",
1769               "ht_capab": "[HT40+][HT40-]",
1770               "vht_oper_chwidth": "0",
1771               "he_oper_chwidth": "0" }
1772     run_he_downgrade_to_20_mhz(dev, apdev, params)
1773 
1774 def test_he_downgrade_80mhz_to_20mhz(dev, apdev):
1775     """HE AP and downgrade from 80 MHz to 20 MHz due to regulatory constraints"""
1776     # Try to configure 80 MHz channel when the regdb limits this frequency to
1777     # 20 MHz.
1778     params = {"ssid": "he",
1779               "country_code": "AM",
1780               "channel": "36",
1781               "op_class": "128",
1782               "ieee80211n": "1",
1783               "ieee80211ac": "1",
1784               "ieee80211ax": "1",
1785               "hw_mode": "a",
1786               "ht_capab": "[HT40+]",
1787               "vht_oper_centr_freq_seg0_idx": "42",
1788               "he_oper_centr_freq_seg0_idx": "42",
1789               "vht_oper_chwidth": "1",
1790               "he_oper_chwidth": "1" }
1791     run_he_downgrade_to_20_mhz(dev, apdev, params)
1792 
1793 def run_he_downgrade_to_20_mhz(dev, apdev, params):
1794     try:
1795         hapd = None
1796         hapd = hostapd.add_ap(apdev[0], params)
1797         dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1798         sig = dev[0].request("SIGNAL_POLL").splitlines()
1799         logger.info("SIGNAL_POLL: " + str(sig))
1800         if "WIDTH=20 MHz" not in sig:
1801             raise Exception("20 MHz channel width not reported")
1802         dev[0].request("DISCONNECT")
1803         dev[0].wait_disconnected()
1804         hapd.wait_sta_disconnect()
1805     finally:
1806         dev[0].request("DISCONNECT")
1807         clear_regdom(hapd, dev)
1808 
1809 def test_he_bss_color_change(dev, apdev):
1810     """HE AP with color change"""
1811     params = {"ssid": "test_he",
1812               "ieee80211ax": "1",
1813               "he_bss_color": "42",
1814               "he_mu_edca_ac_be_ecwmin": "7",
1815               "he_mu_edca_ac_be_ecwmax": "15"}
1816     hapd = hostapd.add_ap(apdev[0], params)
1817     if hapd.get_status_field("ieee80211ax") != "1":
1818         raise Exception("STATUS did not indicate ieee80211ax=1")
1819 
1820     color = hapd.get_status_field("he_bss_color")
1821     if color != "42":
1822         raise Exception("Expected current he_bss_color to be 42; was " + color)
1823 
1824     # Small sleep to capture Beacon frames before the change
1825     time.sleep(0.5)
1826 
1827     # Change color by doing CCA
1828     if "OK" not in hapd.request("COLOR_CHANGE 20"):
1829         raise Exception("COLOR_CHANGE failed")
1830     time.sleep(1.5)
1831 
1832     color = hapd.get_status_field("he_bss_color")
1833     if color != "20":
1834         raise Exception("Expected current he_bss_color to be 20")
1835 
1836     # Disable color by setting value to 0
1837     if "OK" not in hapd.request("COLOR_CHANGE 0"):
1838         raise Exception("COLOR_CHANGE failed")
1839     time.sleep(1.5)
1840 
1841     color = hapd.get_status_field("he_bss_color")
1842     if color is not None:
1843         raise Exception("Expected he_bss_color to get disabled but found " + color)
1844 
1845     # Enable color back by setting same previous color value
1846     if "OK" not in hapd.request("COLOR_CHANGE 20"):
1847         raise Exception("COLOR_CHANGE failed")
1848     time.sleep(1.5)
1849 
1850     color = hapd.get_status_field("he_bss_color")
1851     if color != "20":
1852         raise Exception("Expected current he_bss_color to be 20")
1853 
1854     hapd.dump_monitor()
1855