1# HE tests
2# Copyright (c) 2019, The Linux Foundation
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8logger = logging.getLogger()
9import os
10import subprocess, time
11
12import hwsim_utils
13import hostapd
14from wpasupplicant import WpaSupplicant
15from utils import *
16from test_dfs import wait_dfs_event
17from test_ap_acs import wait_acs
18
19def test_he_open(dev, apdev):
20    """HE AP with open mode configuration"""
21    params = {"ssid": "he",
22              "ieee80211ax": "1",
23              "he_bss_color": "42",
24              "he_mu_edca_ac_be_ecwmin": "7",
25              "he_mu_edca_ac_be_ecwmax": "15"}
26    hapd = hostapd.add_ap(apdev[0], params)
27    if hapd.get_status_field("ieee80211ax") != "1":
28        raise Exception("STATUS did not indicate ieee80211ax=1")
29    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
30    sta = hapd.get_sta(dev[0].own_addr())
31    if "[HE]" not in sta['flags']:
32        raise Exception("Missing STA flag: HE")
33
34def test_he_disabled_on_sta(dev, apdev):
35    """HE AP and HE disabled on STA"""
36    params = {"ssid": "he",
37              "ieee80211ax": "1",
38              "he_bss_color": "42",
39              "he_mu_edca_ac_be_ecwmin": "7",
40              "he_mu_edca_ac_be_ecwmax": "15"}
41    hapd = hostapd.add_ap(apdev[0], params)
42    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412", disable_he="1")
43    sta = hapd.get_sta(dev[0].own_addr())
44    if "[HE]" in sta['flags']:
45        raise Exception("Unexpected STA flag: HE")
46
47def test_he_params(dev, apdev):
48    """HE AP parameters"""
49    params = {"ssid": "he",
50              "ieee80211ax": "1",
51              "he_bss_color": "42",
52              "he_mu_edca_ac_be_ecwmin": "7",
53              "he_mu_edca_ac_be_ecwmax": "15",
54              "he_su_beamformer": "0",
55              "he_su_beamformee": "0",
56              "he_default_pe_duration": "4",
57              "he_twt_required": "1",
58              "he_rts_threshold": "64",
59              "he_basic_mcs_nss_set": "65535",
60              "he_mu_edca_qos_info_param_count": "0",
61              "he_mu_edca_qos_info_q_ack": "0",
62              "he_mu_edca_qos_info_queue_request": "1",
63              "he_mu_edca_qos_info_txop_request": "0",
64              "he_mu_edca_ac_be_aifsn": "0",
65              "he_mu_edca_ac_be_ecwmin": "15",
66              "he_mu_edca_ac_be_ecwmax": "15",
67              "he_mu_edca_ac_be_timer": "255",
68              "he_mu_edca_ac_bk_aifsn": "0",
69              "he_mu_edca_ac_bk_aci": "1",
70              "he_mu_edca_ac_bk_ecwmin": "15",
71              "he_mu_edca_ac_bk_ecwmax": "15",
72              "he_mu_edca_ac_bk_timer": "255",
73              "he_mu_edca_ac_vi_ecwmin": "15",
74              "he_mu_edca_ac_vi_ecwmax": "15",
75              "he_mu_edca_ac_vi_aifsn": "0",
76              "he_mu_edca_ac_vi_aci": "2",
77              "he_mu_edca_ac_vi_timer": "255",
78              "he_mu_edca_ac_vo_aifsn": "0",
79              "he_mu_edca_ac_vo_aci": "3",
80              "he_mu_edca_ac_vo_ecwmin": "15",
81              "he_mu_edca_ac_vo_ecwmax": "15",
82              "he_mu_edca_ac_vo_timer": "255",
83              "he_spr_sr_control": "0",
84              "he_spr_non_srg_obss_pd_max_offset": "0",
85              "he_spr_srg_obss_pd_min_offset": "0",
86              "he_spr_srg_obss_pd_max_offset": "0",
87              "he_spr_srg_bss_colors": "1 2 10 63",
88              "he_spr_srg_partial_bssid": "0 1 3 63",
89              "he_6ghz_max_ampdu_len_exp": "7",
90              "he_6ghz_rx_ant_pat": "1",
91              "he_6ghz_tx_ant_pat": "1",
92              "he_6ghz_max_mpdu": "2",
93              "he_oper_chwidth": "0",
94              "he_oper_centr_freq_seg0_idx": "1",
95              "he_oper_centr_freq_seg1_idx": "0"}
96    hapd = hostapd.add_ap(apdev[0], params)
97    if hapd.get_status_field("ieee80211ax") != "1":
98        raise Exception("STATUS did not indicate ieee80211ax=1")
99    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
100
101def test_he_spr_params(dev, apdev):
102    """HE AP spatial reuse parameters"""
103    params = {"ssid": "he",
104              "ieee80211ax": "1",
105              "he_spr_sr_control": "12",
106              "he_spr_non_srg_obss_pd_max_offset": "1",
107              "he_spr_srg_obss_pd_min_offset": "2",
108              "he_spr_srg_obss_pd_max_offset": "3",
109              "he_spr_srg_bss_colors": "1 2 10 63",
110              "he_spr_srg_partial_bssid": "0 1 3 63",
111              "he_oper_chwidth": "0",
112              "he_oper_centr_freq_seg0_idx": "1",
113              "he_oper_centr_freq_seg1_idx": "0"}
114    hapd = hostapd.add_ap(apdev[0], params)
115    if hapd.get_status_field("ieee80211ax") != "1":
116        raise Exception("STATUS did not indicate ieee80211ax=1")
117    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
118
119def he_supported():
120    cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
121    out, err = cmd.communicate()
122    reg = out.decode()
123    if "@ 80)" in reg or "@ 160)" in reg:
124        return True
125    return False
126
127def test_he80(dev, apdev):
128    """HE with 80 MHz channel width"""
129    try:
130        hapd = None
131        params = {"ssid": "he",
132                  "country_code": "FI",
133                  "hw_mode": "a",
134                  "channel": "36",
135                  "ht_capab": "[HT40+]",
136                  "ieee80211n": "1",
137                  "ieee80211ac": "1",
138                  "ieee80211ax": "1",
139                  "vht_oper_chwidth": "1",
140                  "vht_capab": "[MAX-MPDU-11454]",
141                  "vht_oper_centr_freq_seg0_idx": "42",
142                  "he_oper_chwidth": "1",
143                  "he_oper_centr_freq_seg0_idx": "42"}
144        hapd = hostapd.add_ap(apdev[0], params)
145        bssid = apdev[0]['bssid']
146
147        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
148        hwsim_utils.test_connectivity(dev[0], hapd)
149        sig = dev[0].request("SIGNAL_POLL").splitlines()
150        if "FREQUENCY=5180" not in sig:
151            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
152        if "WIDTH=80 MHz" not in sig:
153            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
154        est = dev[0].get_bss(bssid)['est_throughput']
155        if est != "600502":
156            raise Exception("Unexpected BSS est_throughput: " + est)
157        status = dev[0].get_status()
158        if status["ieee80211ac"] != "1":
159            raise Exception("Unexpected STATUS ieee80211ac value (STA)")
160        status = hapd.get_status()
161        logger.info("hostapd STATUS: " + str(status))
162        if status["ieee80211n"] != "1":
163            raise Exception("Unexpected STATUS ieee80211n value")
164        if status["ieee80211ac"] != "1":
165            raise Exception("Unexpected STATUS ieee80211ac value")
166        if status["ieee80211ax"] != "1":
167            raise Exception("Unexpected STATUS ieee80211ax value")
168        if status["secondary_channel"] != "1":
169            raise Exception("Unexpected STATUS secondary_channel value")
170        if status["vht_oper_chwidth"] != "1":
171            raise Exception("Unexpected STATUS vht_oper_chwidth value")
172        if status["vht_oper_centr_freq_seg0_idx"] != "42":
173            raise Exception("Unexpected STATUS vht_oper_centr_freq_seg0_idx value")
174        if "vht_caps_info" not in status:
175            raise Exception("Missing vht_caps_info")
176        if status["he_oper_chwidth"] != "1":
177            raise Exception("Unexpected STATUS he_oper_chwidth value")
178        if status["he_oper_centr_freq_seg0_idx"] != "42":
179            raise Exception("Unexpected STATUS he_oper_centr_freq_seg0_idx value")
180
181        sta = hapd.get_sta(dev[0].own_addr())
182        logger.info("hostapd STA: " + str(sta))
183        if "[HT]" not in sta['flags']:
184            raise Exception("Missing STA flag: HT")
185        if "[VHT]" not in sta['flags']:
186            raise Exception("Missing STA flag: VHT")
187        if "[HE]" not in sta['flags']:
188            raise Exception("Missing STA flag: HE")
189
190    except Exception as e:
191        if isinstance(e, Exception) and str(e) == "AP startup failed":
192            if not he_supported():
193                raise HwsimSkip("80 MHz channel not supported in regulatory information")
194        raise
195    finally:
196        dev[0].request("DISCONNECT")
197        clear_regdom(hapd, dev)
198
199def _test_he_wifi_generation(dev, apdev, conf, scan_freq):
200    try:
201        hapd = None
202        params = {"ssid": "he",
203                  "country_code": "FI",
204                  "ieee80211n": "1",
205                  "ieee80211ax": "1"}
206        params.update(conf)
207        hapd = hostapd.add_ap(apdev[0], params)
208        bssid = apdev[0]['bssid']
209
210        dev[0].connect("he", key_mgmt="NONE", scan_freq=scan_freq)
211        status = dev[0].get_status()
212        if 'wifi_generation' not in status:
213            # For now, assume this is because of missing kernel support
214            raise HwsimSkip("Association Request IE reporting not supported")
215            #raise Exception("Missing wifi_generation information")
216        if status['wifi_generation'] != "6":
217            raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
218
219        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
220        wpas.interface_add("wlan5", drv_params="force_connect_cmd=1")
221        wpas.connect("he", key_mgmt="NONE", scan_freq=scan_freq)
222        status = wpas.get_status()
223        if 'wifi_generation' not in status:
224            # For now, assume this is because of missing kernel support
225            raise HwsimSkip("Association Request IE reporting not supported")
226            #raise Exception("Missing wifi_generation information (connect)")
227        if status['wifi_generation'] != "6":
228            raise Exception("Unexpected wifi_generation value (connect): " + status['wifi_generation'])
229    except Exception as e:
230        if isinstance(e, Exception) and str(e) == "AP startup failed":
231            if not he_supported():
232                raise HwsimSkip("80 MHz channel not supported in regulatory information")
233        raise
234    finally:
235        dev[0].request("DISCONNECT")
236        clear_regdom(hapd, dev)
237
238def test_he_wifi_generation(dev, apdev):
239    """HE and wifi_generation (5 GHz)"""
240    conf = {
241        "vht_oper_chwidth": "1",
242        "hw_mode": "a",
243        "channel": "36",
244        "ht_capab": "[HT40+]",
245        "vht_oper_centr_freq_seg0_idx": "42",
246        "he_oper_chwidth": "1",
247        "he_oper_centr_freq_seg0_idx": "42",
248        "vht_capab": "[MAX-MPDU-11454]",
249        "ieee80211ac": "1",
250    }
251    _test_he_wifi_generation(dev, apdev, conf, "5180")
252
253def test_he_wifi_generation_24(dev, apdev):
254    """HE and wifi_generation (2.4 GHz)"""
255    conf = {
256        "hw_mode": "g",
257        "channel": "1",
258    }
259    _test_he_wifi_generation(dev, apdev, conf, "2412")
260
261def he80_test(apdev, dev, channel, ht_capab):
262    clear_scan_cache(apdev)
263    try:
264        hapd = None
265        params = {"ssid": "he",
266                  "country_code": "FI",
267                  "hw_mode": "a",
268                  "channel": str(channel),
269                  "ht_capab": ht_capab,
270                  "ieee80211n": "1",
271                  "ieee80211ac": "1",
272                  "ieee80211ax": "1",
273                  "vht_oper_chwidth": "1",
274                  "vht_oper_centr_freq_seg0_idx": "42",
275                  "he_oper_chwidth": "1",
276                  "he_oper_centr_freq_seg0_idx": "42"}
277        hapd = hostapd.add_ap(apdev, params)
278        bssid = apdev['bssid']
279
280        dev[0].connect("he", key_mgmt="NONE",
281                       scan_freq=str(5000 + 5 * channel))
282        hwsim_utils.test_connectivity(dev[0], hapd)
283    except Exception as e:
284        if isinstance(e, Exception) and str(e) == "AP startup failed":
285            if not he_supported():
286                raise HwsimSkip("80 MHz channel not supported in regulatory information")
287        raise
288    finally:
289        clear_regdom(hapd, dev)
290
291def test_he80b(dev, apdev):
292    """HE with 80 MHz channel width (HT40- channel 40)"""
293    he80_test(apdev[0], dev, 40, "[HT40-]")
294
295def test_he80c(dev, apdev):
296    """HE with 80 MHz channel width (HT40+ channel 44)"""
297    he80_test(apdev[0], dev, 44, "[HT40+]")
298
299def test_he80d(dev, apdev):
300    """HE with 80 MHz channel width (HT40- channel 48)"""
301    he80_test(apdev[0], dev, 48, "[HT40-]")
302
303def test_he80_params(dev, apdev):
304    """HE with 80 MHz channel width and number of optional features enabled"""
305    try:
306        hapd = None
307        params = {"ssid": "he",
308                  "country_code": "FI",
309                  "hw_mode": "a",
310                  "channel": "36",
311                  "ht_capab": "[HT40+][SHORT-GI-40][DSS_CCK-40]",
312                  "ieee80211n": "1",
313                  "ieee80211ac": "1",
314                  "ieee80211ax": "1",
315                  "vht_oper_chwidth": "1",
316                  "vht_capab": "[MAX-MPDU-11454][RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC-1][MAX-A-MPDU-LEN-EXP0]",
317                  "vht_oper_centr_freq_seg0_idx": "42",
318                  "require_vht": "1",
319                  "require_he": "1",
320                  "he_oper_chwidth": "1",
321                  "he_oper_centr_freq_seg0_idx": "42",
322                  "he_su_beamformer": "1",
323                  "he_mu_beamformer": "1",
324                  "he_bss_color":"1",
325                  "he_default_pe_duration":"1",
326                  "he_twt_required":"1",
327                  "he_rts_threshold":"1"}
328        hapd = hostapd.add_ap(apdev[0], params)
329
330        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
331        wpas.interface_add("wlan5",
332                           drv_params="extra_bss_membership_selectors=126,122")
333
334        wpas.connect("he", key_mgmt="NONE", scan_freq="5180",
335                     disable_vht="1", wait_connect=False)
336        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
337        dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
338                       disable_sgi="1")
339        ev = wpas.wait_event(["CTRL-EVENT-ASSOC-REJECT"])
340        if ev is None:
341            raise Exception("Association rejection timed out")
342        if "status_code=104" not in ev:
343            raise Exception("Unexpected rejection status code")
344        wpas.request("DISCONNECT")
345        wpas.request("REMOVE_NETWORK all")
346        wpas.dump_monitor()
347        wpas.connect("he", key_mgmt="NONE", scan_freq="5180",
348                     disable_he="1", wait_connect=False)
349        hwsim_utils.test_connectivity(dev[0], hapd)
350        sta0 = hapd.get_sta(dev[0].own_addr())
351        sta2 = hapd.get_sta(dev[2].own_addr())
352        capab0 = int(sta0['vht_caps_info'], base=16)
353        capab2 = int(sta2['vht_caps_info'], base=16)
354        if capab0 & 0x60 == 0:
355            raise Exception("dev[0] did not support SGI")
356        if capab2 & 0x60 != 0:
357            raise Exception("dev[2] claimed support for SGI")
358        ev = wpas.wait_event(["CTRL-EVENT-ASSOC-REJECT"])
359        if ev is None:
360            raise Exception("Association rejection timed out (2)")
361        if "status_code=124" not in ev:
362            raise Exception("Unexpected rejection status code (2): " + ev)
363    except Exception as e:
364        if isinstance(e, Exception) and str(e) == "AP startup failed":
365            if not he_supported():
366                raise HwsimSkip("80 MHz channel not supported in regulatory information")
367        raise
368    finally:
369        clear_regdom(hapd, dev, count=3)
370
371def test_he80_invalid(dev, apdev):
372    """HE with invalid 80 MHz channel configuration (seg1)"""
373    try:
374        hapd = None
375        params = {"ssid": "he",
376                  "country_code": "US",
377                  "hw_mode": "a",
378                  "channel": "36",
379                  "ht_capab": "[HT40+]",
380                  "ieee80211n": "1",
381                  "ieee80211ac": "1",
382                  "ieee80211ax": "1",
383                  "vht_oper_chwidth": "1",
384                  "vht_oper_centr_freq_seg0_idx": "42",
385                  "vht_oper_centr_freq_seg1_idx": "159",
386                  "he_oper_chwidth": "1",
387                  "he_oper_centr_freq_seg0_idx": "42",
388                  "he_oper_centr_freq_seg1_idx": "155",
389                  'ieee80211d': '1',
390                  'ieee80211h': '1'}
391        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
392        # This fails due to unexpected seg1 configuration
393        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
394        if ev is None:
395            raise Exception("AP-DISABLED not reported")
396    except Exception as e:
397        if isinstance(e, Exception) and str(e) == "AP startup failed":
398            if not he_supported():
399                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
400        raise
401    finally:
402        clear_regdom(hapd, dev)
403
404def test_he80_invalid2(dev, apdev):
405    """HE with invalid 80 MHz channel configuration (seg0)"""
406    try:
407        hapd = None
408        params = {"ssid": "he",
409                  "country_code": "US",
410                  "hw_mode": "a",
411                  "channel": "36",
412                  "ht_capab": "[HT40+]",
413                  "ieee80211n": "1",
414                  "ieee80211ac": "1",
415                  "ieee80211ax": "1",
416                  "vht_oper_chwidth": "1",
417                  "vht_oper_centr_freq_seg0_idx": "42",
418                  "he_oper_chwidth": "1",
419                  "he_oper_centr_freq_seg0_idx": "46",
420                  'ieee80211d': '1',
421                  'ieee80211h': '1'}
422        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
423        # This fails due to invalid seg0 configuration
424        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
425        if ev is None:
426            raise Exception("AP-DISABLED not reported")
427    except Exception as e:
428        if isinstance(e, Exception) and str(e) == "AP startup failed":
429            if not he_supported():
430                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
431        raise
432    finally:
433        clear_regdom(hapd, dev)
434
435def test_he_20(devs, apdevs):
436    """HE and 20 MHz channel"""
437    dev = devs[0]
438    ap = apdevs[0]
439    try:
440        hapd = None
441        params = {"ssid": "test-he20",
442                  "country_code": "DE",
443                  "hw_mode": "a",
444                  "channel": "36",
445                  "ieee80211n": "1",
446                  "ieee80211ac": "1",
447                  "ieee80211ax": "1",
448                  "ht_capab": "",
449                  "vht_capab": "",
450                  "vht_oper_chwidth": "0",
451                  "vht_oper_centr_freq_seg0_idx": "0",
452                  "supported_rates": "60 120 240 360 480 540",
453                  "require_vht": "1",
454                  "he_oper_chwidth": "0",
455                  "he_oper_centr_freq_seg0_idx": "0"}
456        hapd = hostapd.add_ap(ap, params)
457        dev.connect("test-he20", scan_freq="5180", key_mgmt="NONE")
458        hwsim_utils.test_connectivity(dev, hapd)
459    finally:
460        dev.request("DISCONNECT")
461        clear_regdom(hapd, devs)
462
463def test_he_40(devs, apdevs):
464    """HE and 40 MHz channel"""
465    dev = devs[0]
466    ap = apdevs[0]
467    try:
468        hapd = None
469        params = {"ssid": "test-he40",
470                  "country_code": "DE",
471                  "hw_mode": "a",
472                  "channel": "36",
473                  "ieee80211n": "1",
474                  "ieee80211ac": "1",
475                  "ieee80211ax": "1",
476                  "ht_capab": "[HT40+]",
477                  "vht_capab": "",
478                  "vht_oper_chwidth": "0",
479                  "vht_oper_centr_freq_seg0_idx": "38",
480                  "he_oper_chwidth": "0",
481                  "he_oper_centr_freq_seg0_idx": "38",
482                  "he_su_beamformer": "1",
483                  "he_mu_beamformer": "1"}
484        hapd = hostapd.add_ap(ap, params)
485        dev.connect("test-he40", scan_freq="5180", key_mgmt="NONE")
486        hwsim_utils.test_connectivity(dev, hapd)
487    finally:
488        dev.request("DISCONNECT")
489        clear_regdom(hapd, devs)
490
491@long_duration_test
492def test_he160(dev, apdev):
493    """HE with 160 MHz channel width (1)"""
494    try:
495        hapd = None
496        params = {"ssid": "he",
497                  "country_code": "FI",
498                  "hw_mode": "a",
499                  "channel": "36",
500                  "ht_capab": "[HT40+]",
501                  "vht_capab": "[VHT160]",
502                  "ieee80211n": "1",
503                  "ieee80211ac": "1",
504                  "ieee80211ax": "1",
505                  "vht_oper_chwidth": "2",
506                  "vht_oper_centr_freq_seg0_idx": "50",
507                  "he_oper_chwidth": "2",
508                  "he_oper_centr_freq_seg0_idx": "50",
509                  'ieee80211d': '1',
510                  'ieee80211h': '1'}
511        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
512        bssid = apdev[0]['bssid']
513
514        ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
515        if "DFS-CAC-START" not in ev:
516            raise Exception("Unexpected DFS event")
517
518        state = hapd.get_status_field("state")
519        if state != "DFS":
520            if state == "DISABLED" and not os.path.exists("dfs"):
521                # Not all systems have recent enough CRDA version and
522                # wireless-regdb changes to support 160 MHz and DFS. For now,
523                # do not report failures for this test case.
524                raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
525            raise Exception("Unexpected interface state: " + state)
526
527        logger.info("Waiting for CAC to complete")
528
529        ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
530        if "success=1" not in ev:
531            raise Exception("CAC failed")
532        if "freq=5180" not in ev:
533            raise Exception("Unexpected DFS freq result")
534
535        ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
536        if not ev:
537            raise Exception("AP setup timed out")
538
539        state = hapd.get_status_field("state")
540        if state != "ENABLED":
541            raise Exception("Unexpected interface state")
542
543        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
544        dev[0].wait_regdom(country_ie=True)
545        hwsim_utils.test_connectivity(dev[0], hapd)
546        sig = dev[0].request("SIGNAL_POLL").splitlines()
547        if "FREQUENCY=5180" not in sig:
548            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
549        if "WIDTH=160 MHz" not in sig:
550            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
551        est = dev[0].get_bss(bssid)['est_throughput']
552        if est != "1201002":
553            raise Exception("Unexpected BSS est_throughput: " + est)
554    except Exception as e:
555        if isinstance(e, Exception) and str(e) == "AP startup failed":
556            if not he_supported():
557                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
558        raise
559    finally:
560        if hapd:
561            hapd.request("DISABLE")
562        dev[0].disconnect_and_stop_scan()
563        subprocess.call(['iw', 'reg', 'set', '00'])
564        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
565        dev[0].flush_scan_cache()
566
567@long_duration_test
568def test_he160b(dev, apdev):
569    """HE with 160 MHz channel width (2)"""
570    try:
571        hapd = None
572
573        params = {"ssid": "he",
574                  "country_code": "FI",
575                  "hw_mode": "a",
576                  "channel": "104",
577                  "ht_capab": "[HT40-]",
578                  "vht_capab": "[VHT160]",
579                  "ieee80211n": "1",
580                  "ieee80211ac": "1",
581                  "ieee80211ax": "1",
582                  "vht_oper_chwidth": "2",
583                  "vht_oper_centr_freq_seg0_idx": "114",
584                  "he_oper_chwidth": "2",
585                  "he_oper_centr_freq_seg0_idx": "114",
586                  'ieee80211d': '1',
587                  'ieee80211h': '1'}
588        hapd = hostapd.add_ap(apdev[1], params, wait_enabled=False)
589
590        ev = wait_dfs_event(hapd, "DFS-CAC-START", 5)
591        if "DFS-CAC-START" not in ev:
592            raise Exception("Unexpected DFS event(2)")
593
594        state = hapd.get_status_field("state")
595        if state != "DFS":
596            if state == "DISABLED" and not os.path.exists("dfs"):
597                # Not all systems have recent enough CRDA version and
598                # wireless-regdb changes to support 160 MHz and DFS. For now,
599                # do not report failures for this test case.
600                raise HwsimSkip("CRDA or wireless-regdb did not support 160 MHz")
601            raise Exception("Unexpected interface state: " + state)
602
603        logger.info("Waiting for CAC to complete")
604
605        ev = wait_dfs_event(hapd, "DFS-CAC-COMPLETED", 70)
606        if "success=1" not in ev:
607            raise Exception("CAC failed(2)")
608        if "freq=5520" not in ev:
609            raise Exception("Unexpected DFS freq result(2)")
610
611        ev = hapd.wait_event(["AP-ENABLED"], timeout=5)
612        if not ev:
613            raise Exception("AP setup timed out(2)")
614
615        state = hapd.get_status_field("state")
616        if state != "ENABLED":
617            raise Exception("Unexpected interface state(2)")
618
619        freq = hapd.get_status_field("freq")
620        if freq != "5520":
621            raise Exception("Unexpected frequency(2)")
622
623        dev[0].connect("he", key_mgmt="NONE", scan_freq="5520")
624        dev[0].wait_regdom(country_ie=True)
625        hwsim_utils.test_connectivity(dev[0], hapd)
626        sig = dev[0].request("SIGNAL_POLL").splitlines()
627        if "FREQUENCY=5520" not in sig:
628            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
629        if "WIDTH=160 MHz" not in sig:
630            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
631    except Exception as e:
632        if isinstance(e, Exception) and str(e) == "AP startup failed":
633            if not he_supported():
634                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
635        raise
636    finally:
637        if hapd:
638            hapd.request("DISABLE")
639        dev[0].disconnect_and_stop_scan()
640        subprocess.call(['iw', 'reg', 'set', '00'])
641        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
642        dev[0].flush_scan_cache()
643
644def test_he160_no_dfs_100_plus(dev, apdev):
645    """HE with 160 MHz channel width and no DFS (100 plus)"""
646    run_ap_he160_no_dfs(dev, apdev, "100", "[HT40+]")
647
648def test_he160_no_dfs(dev, apdev):
649    """HE with 160 MHz channel width and no DFS (104 minus)"""
650    run_ap_he160_no_dfs(dev, apdev, "104", "[HT40-]")
651
652def test_he160_no_dfs_108_plus(dev, apdev):
653    """HE with 160 MHz channel width and no DFS (108 plus)"""
654    run_ap_he160_no_dfs(dev, apdev, "108", "[HT40+]")
655
656def test_he160_no_dfs_112_minus(dev, apdev):
657    """HE with 160 MHz channel width and no DFS (112 minus)"""
658    run_ap_he160_no_dfs(dev, apdev, "112", "[HT40-]")
659
660def test_he160_no_dfs_116_plus(dev, apdev):
661    """HE with 160 MHz channel width and no DFS (116 plus)"""
662    run_ap_he160_no_dfs(dev, apdev, "116", "[HT40+]")
663
664def test_he160_no_dfs_120_minus(dev, apdev):
665    """HE with 160 MHz channel width and no DFS (120 minus)"""
666    run_ap_he160_no_dfs(dev, apdev, "120", "[HT40-]")
667
668def test_he160_no_dfs_124_plus(dev, apdev):
669    """HE with 160 MHz channel width and no DFS (124 plus)"""
670    run_ap_he160_no_dfs(dev, apdev, "124", "[HT40+]")
671
672def test_he160_no_dfs_128_minus(dev, apdev):
673    """HE with 160 MHz channel width and no DFS (128 minus)"""
674    run_ap_he160_no_dfs(dev, apdev, "128", "[HT40-]")
675
676def run_ap_he160_no_dfs(dev, apdev, channel, ht_capab):
677    try:
678        hapd = None
679        params = {"ssid": "he",
680                  "country_code": "ZA",
681                  "hw_mode": "a",
682                  "channel": channel,
683                  "ht_capab": ht_capab,
684                  "vht_capab": "[VHT160]",
685                  "ieee80211n": "1",
686                  "ieee80211ac": "1",
687                  "ieee80211ax": "1",
688                  "vht_oper_chwidth": "2",
689                  "vht_oper_centr_freq_seg0_idx": "114",
690                  "he_oper_chwidth": "2",
691                  "he_oper_centr_freq_seg0_idx": "114",
692                  'ieee80211d': '1',
693                  'ieee80211h': '1'}
694        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
695        ev = hapd.wait_event(["AP-ENABLED"], timeout=2)
696        if not ev:
697            cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
698            out, err = cmd.communicate()
699            reg = out.splitlines()
700            for r in reg:
701                if b"5490" in r and b"DFS" in r:
702                    raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
703            raise Exception("AP setup timed out")
704
705        freq = str(int(channel) * 5 + 5000)
706        dev[0].connect("he", key_mgmt="NONE", scan_freq=freq)
707        dev[0].wait_regdom(country_ie=True)
708        hwsim_utils.test_connectivity(dev[0], hapd)
709        sig = dev[0].request("SIGNAL_POLL").splitlines()
710        if "FREQUENCY=" + freq not in sig:
711            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
712        if "WIDTH=160 MHz" not in sig:
713            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
714    except Exception as e:
715        if isinstance(e, Exception) and str(e) == "AP startup failed":
716            if not he_supported():
717                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
718        raise
719    finally:
720        clear_regdom(hapd, dev)
721
722def test_he160_no_ht40(dev, apdev):
723    """HE with 160 MHz channel width and HT40 disabled"""
724    try:
725        hapd = None
726        params = {"ssid": "he",
727                  "country_code": "ZA",
728                  "hw_mode": "a",
729                  "channel": "108",
730                  "ht_capab": "",
731                  "ieee80211n": "1",
732                  "ieee80211ac": "1",
733                  "ieee80211ax": "1",
734                  "vht_oper_chwidth": "2",
735                  "vht_oper_centr_freq_seg0_idx": "114",
736                  "he_oper_chwidth": "2",
737                  "he_oper_centr_freq_seg0_idx": "114",
738                  'ieee80211d': '1',
739                  'ieee80211h': '1'}
740        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
741        ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=2)
742        if not ev:
743            cmd = subprocess.Popen(["iw", "reg", "get"], stdout=subprocess.PIPE)
744            out, err = cmd.communicate()
745            reg = out.splitlines()
746            for r in reg:
747                if "5490" in r and "DFS" in r:
748                    raise HwsimSkip("ZA regulatory rule did not have DFS requirement removed")
749            raise Exception("AP setup timed out")
750        if "AP-ENABLED" in ev:
751            # This was supposed to fail due to sec_channel_offset == 0
752            raise Exception("Unexpected AP-ENABLED")
753    except Exception as e:
754        if isinstance(e, Exception) and str(e) == "AP startup failed":
755            if not he_supported():
756                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
757        raise
758    finally:
759        clear_regdom(hapd, dev)
760
761def test_he80plus80(dev, apdev):
762    """HE with 80+80 MHz channel width"""
763    try:
764        hapd = None
765        hapd2 = None
766        params = {"ssid": "he",
767                  "country_code": "US",
768                  "hw_mode": "a",
769                  "channel": "52",
770                  "ht_capab": "[HT40+]",
771                  "vht_capab": "[VHT160-80PLUS80]",
772                  "ieee80211n": "1",
773                  "ieee80211ac": "1",
774                  "ieee80211ax": "1",
775                  "vht_oper_chwidth": "3",
776                  "vht_oper_centr_freq_seg0_idx": "58",
777                  "vht_oper_centr_freq_seg1_idx": "155",
778                  "he_oper_chwidth": "3",
779                  "he_oper_centr_freq_seg0_idx": "58",
780                  "he_oper_centr_freq_seg1_idx": "155",
781                  'ieee80211d': '1',
782                  'ieee80211h': '1'}
783        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
784        # This will actually fail since DFS on 80+80 is not yet supported
785        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
786        # ignore result to avoid breaking the test once 80+80 DFS gets enabled
787
788        params = {"ssid": "he2",
789                  "country_code": "US",
790                  "hw_mode": "a",
791                  "channel": "36",
792                  "ht_capab": "[HT40+]",
793                  "vht_capab": "[VHT160-80PLUS80]",
794                  "ieee80211n": "1",
795                  "ieee80211ac": "1",
796                  "ieee80211ax": "1",
797                  "vht_oper_chwidth": "3",
798                  "vht_oper_centr_freq_seg0_idx": "42",
799                  "vht_oper_centr_freq_seg1_idx": "155",
800                  "he_oper_chwidth": "3",
801                  "he_oper_centr_freq_seg0_idx": "42",
802                  "he_oper_centr_freq_seg1_idx": "155"}
803        hapd2 = hostapd.add_ap(apdev[1], params, wait_enabled=False)
804
805        ev = hapd2.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=5)
806        if not ev:
807            raise Exception("AP setup timed out(2)")
808        if "AP-DISABLED" in ev:
809            # Assume this failed due to missing regulatory update for now
810            raise HwsimSkip("80+80 MHz channel not supported in regulatory information")
811
812        state = hapd2.get_status_field("state")
813        if state != "ENABLED":
814            raise Exception("Unexpected interface state(2)")
815
816        dev[1].connect("he2", key_mgmt="NONE", scan_freq="5180")
817        hwsim_utils.test_connectivity(dev[1], hapd2)
818        sig = dev[1].request("SIGNAL_POLL").splitlines()
819        if "FREQUENCY=5180" not in sig:
820            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
821        if "WIDTH=80+80 MHz" not in sig:
822            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
823        if "CENTER_FRQ1=5210" not in sig:
824            raise Exception("Unexpected SIGNAL_POLL value(3): " + str(sig))
825        if "CENTER_FRQ2=5775" not in sig:
826            raise Exception("Unexpected SIGNAL_POLL value(4): " + str(sig))
827    except Exception as e:
828        if isinstance(e, Exception) and str(e) == "AP startup failed":
829            if not he_supported():
830                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
831        raise
832    finally:
833        dev[0].request("DISCONNECT")
834        dev[1].request("DISCONNECT")
835        if hapd:
836            hapd.request("DISABLE")
837        if hapd2:
838            hapd2.request("DISABLE")
839        subprocess.call(['iw', 'reg', 'set', '00'])
840        dev[0].flush_scan_cache()
841        dev[1].flush_scan_cache()
842
843def test_he80plus80_invalid(dev, apdev):
844    """HE with invalid 80+80 MHz channel"""
845    try:
846        hapd = None
847        params = {"ssid": "he",
848                  "country_code": "US",
849                  "hw_mode": "a",
850                  "channel": "36",
851                  "ht_capab": "[HT40+]",
852                  "ieee80211n": "1",
853                  "ieee80211ac": "1",
854                  "ieee80211ax": "1",
855                  "vht_oper_chwidth": "3",
856                  "vht_oper_centr_freq_seg0_idx": "42",
857                  "vht_oper_centr_freq_seg1_idx": "0",
858                  "he_oper_chwidth": "3",
859                  "he_oper_centr_freq_seg0_idx": "42",
860                  "he_oper_centr_freq_seg1_idx": "0",
861                  'ieee80211d': '1',
862                  'ieee80211h': '1'}
863        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
864        # This fails due to missing(invalid) seg1 configuration
865        ev = hapd.wait_event(["AP-DISABLED"], timeout=5)
866        if ev is None:
867            raise Exception("AP-DISABLED not reported")
868    except Exception as e:
869        if isinstance(e, Exception) and str(e) == "AP startup failed":
870            if not he_supported():
871                raise HwsimSkip("80/160 MHz channel not supported in regulatory information")
872        raise
873    finally:
874        clear_regdom(hapd, dev)
875
876def test_he80_csa(dev, apdev):
877    """HE with 80 MHz channel width and CSA"""
878    csa_supported(dev[0])
879    try:
880        hapd = None
881        params = {"ssid": "he",
882                  "country_code": "US",
883                  "hw_mode": "a",
884                  "channel": "149",
885                  "ht_capab": "[HT40+]",
886                  "ieee80211n": "1",
887                  "ieee80211ac": "1",
888                  "ieee80211ax": "1",
889                  "vht_oper_chwidth": "1",
890                  "vht_oper_centr_freq_seg0_idx": "155",
891                  "he_oper_chwidth": "1",
892                  "he_oper_centr_freq_seg0_idx": "155"}
893        hapd = hostapd.add_ap(apdev[0], params)
894
895        dev[0].connect("he", key_mgmt="NONE", scan_freq="5745")
896        hwsim_utils.test_connectivity(dev[0], hapd)
897
898        hapd.request("CHAN_SWITCH 5 5180 ht vht he blocktx center_freq1=5210 sec_channel_offset=1 bandwidth=80")
899        ev = hapd.wait_event(["CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=10)
900        if ev is None:
901            raise Exception("Channel switch start event not seen")
902        if "freq=5180" not in ev:
903            raise Exception("Unexpected channel in CS started")
904        ev = hapd.wait_event(["CTRL-EVENT-CHANNEL-SWITCH"], timeout=10)
905        if ev is None:
906            raise Exception("Channel switch completion event not seen")
907        if "freq=5180" not in ev:
908            raise Exception("Unexpected channel in CS completed")
909        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
910        if ev is None:
911            raise Exception("CSA finished event timed out")
912        if "freq=5180" not in ev:
913            raise Exception("Unexpected channel in CSA finished event")
914        time.sleep(0.5)
915        hwsim_utils.test_connectivity(dev[0], hapd)
916
917        hapd.request("CHAN_SWITCH 5 5745")
918        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
919        if ev is None:
920            raise Exception("CSA finished event timed out")
921        if "freq=5745" not in ev:
922            raise Exception("Unexpected channel in CSA finished event")
923        time.sleep(0.5)
924        hwsim_utils.test_connectivity(dev[0], hapd)
925
926        # This CSA to same channel will fail in kernel, so use this only for
927        # extra code coverage.
928        hapd.request("CHAN_SWITCH 5 5745")
929        hapd.wait_event(["AP-CSA-FINISHED"], timeout=1)
930    except Exception as e:
931        if isinstance(e, Exception) and str(e) == "AP startup failed":
932            if not he_supported():
933                raise HwsimSkip("80 MHz channel not supported in regulatory information")
934        raise
935    finally:
936        dev[0].request("DISCONNECT")
937        clear_regdom(hapd, dev)
938
939def test_he_on_24ghz(dev, apdev):
940    """Subset of HE features on 2.4 GHz"""
941    hapd = None
942    params = {"ssid": "test-he-2g",
943              "hw_mode": "g",
944              "channel": "1",
945              "ieee80211n": "1",
946              "ieee80211ax": "1",
947              "vht_oper_chwidth": "0",
948              "vht_oper_centr_freq_seg0_idx": "1",
949              "he_oper_chwidth": "0",
950              "he_oper_centr_freq_seg0_idx": "1"}
951    hapd = hostapd.add_ap(apdev[0], params)
952    try:
953        dev[0].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
954        hwsim_utils.test_connectivity(dev[0], hapd)
955        sta = hapd.get_sta(dev[0].own_addr())
956
957        dev[1].connect("test-he-2g", scan_freq="2412", key_mgmt="NONE")
958        sta = hapd.get_sta(dev[1].own_addr())
959
960    finally:
961        dev[0].request("DISCONNECT")
962        dev[1].request("DISCONNECT")
963        if hapd:
964            hapd.request("DISABLE")
965        subprocess.call(['iw', 'reg', 'set', '00'])
966        dev[0].flush_scan_cache()
967        dev[1].flush_scan_cache()
968
969def test_he80_pwr_constraint(dev, apdev):
970    """HE with 80 MHz channel width and local power constraint"""
971    hapd = None
972    try:
973        params = {"ssid": "he",
974                  "country_code": "FI",
975                  "hw_mode": "a",
976                  "channel": "36",
977                  "ht_capab": "[HT40+]",
978                  "ieee80211d": "1",
979                  "local_pwr_constraint": "3",
980                  "ieee80211n": "1",
981                  "ieee80211ac": "1",
982                  "ieee80211ax": "1",
983                  "vht_oper_chwidth": "1",
984                  "vht_oper_centr_freq_seg0_idx": "42",
985                  "he_oper_chwidth": "1",
986                  "he_oper_centr_freq_seg0_idx": "42"}
987        hapd = hostapd.add_ap(apdev[0], params)
988
989        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
990        dev[0].wait_regdom(country_ie=True)
991    except Exception as e:
992        if isinstance(e, Exception) and str(e) == "AP startup failed":
993            if not he_supported():
994                raise HwsimSkip("80 MHz channel not supported in regulatory information")
995        raise
996    finally:
997        if hapd:
998            hapd.request("DISABLE")
999        dev[0].disconnect_and_stop_scan()
1000        subprocess.call(['iw', 'reg', 'set', '00'])
1001        dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=0.5)
1002        dev[0].flush_scan_cache()
1003
1004def test_he_use_sta_nsts(dev, apdev):
1005    """HE with 80 MHz channel width and use_sta_nsts=1"""
1006    try:
1007        hapd = None
1008        params = {"ssid": "he",
1009                  "country_code": "FI",
1010                  "hw_mode": "a",
1011                  "channel": "36",
1012                  "ht_capab": "[HT40+]",
1013                  "ieee80211n": "1",
1014                  "ieee80211ac": "1",
1015                  "ieee80211ax": "1",
1016                  "vht_oper_chwidth": "1",
1017                  "vht_oper_centr_freq_seg0_idx": "42",
1018                  "he_oper_chwidth": "1",
1019                  "he_oper_centr_freq_seg0_idx": "42",
1020                  "use_sta_nsts": "1"}
1021        hapd = hostapd.add_ap(apdev[0], params)
1022        bssid = apdev[0]['bssid']
1023
1024        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1025        hwsim_utils.test_connectivity(dev[0], hapd)
1026    except Exception as e:
1027        if isinstance(e, Exception) and str(e) == "AP startup failed":
1028            if not he_supported():
1029                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1030        raise
1031    finally:
1032        clear_regdom(hapd, dev)
1033
1034def test_he_tkip(dev, apdev):
1035    """HE and TKIP"""
1036    skip_without_tkip(dev[0])
1037    try:
1038        hapd = None
1039        params = {"ssid": "he",
1040                  "wpa": "1",
1041                  "wpa_key_mgmt": "WPA-PSK",
1042                  "wpa_pairwise": "TKIP",
1043                  "wpa_passphrase": "12345678",
1044                  "country_code": "FI",
1045                  "hw_mode": "a",
1046                  "channel": "36",
1047                  "ht_capab": "[HT40+]",
1048                  "ieee80211n": "1",
1049                  "ieee80211ac": "1",
1050                  "ieee80211ax": "1",
1051                  "vht_oper_chwidth": "1",
1052                  "vht_oper_centr_freq_seg0_idx": "42",
1053                  "he_oper_chwidth": "1",
1054                  "he_oper_centr_freq_seg0_idx": "42"}
1055        hapd = hostapd.add_ap(apdev[0], params)
1056        bssid = apdev[0]['bssid']
1057
1058        dev[0].connect("he", psk="12345678", scan_freq="5180")
1059        hwsim_utils.test_connectivity(dev[0], hapd)
1060        sig = dev[0].request("SIGNAL_POLL").splitlines()
1061        if "FREQUENCY=5180" not in sig:
1062            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1063        if "WIDTH=20 MHz (no HT)" not in sig:
1064            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1065        status = hapd.get_status()
1066        logger.info("hostapd STATUS: " + str(status))
1067        if status["ieee80211n"] != "0":
1068            raise Exception("Unexpected STATUS ieee80211n value")
1069        if status["ieee80211ac"] != "0":
1070            raise Exception("Unexpected STATUS ieee80211ac value")
1071        if status["ieee80211ax"] != "0":
1072            raise Exception("Unexpected STATUS ieee80211ax value")
1073        if status["secondary_channel"] != "0":
1074            raise Exception("Unexpected STATUS secondary_channel value")
1075    except Exception as e:
1076        if isinstance(e, Exception) and str(e) == "AP startup failed":
1077            if not he_supported():
1078                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1079        raise
1080    finally:
1081        dev[0].request("DISCONNECT")
1082        clear_regdom(hapd, dev)
1083
1084def test_he_40_fallback_to_20(devs, apdevs):
1085    """HE and 40 MHz channel configuration falling back to 20 MHz"""
1086    dev = devs[0]
1087    ap = apdevs[0]
1088    try:
1089        hapd = None
1090        params = {"ssid": "test-he40",
1091                  "country_code": "US",
1092                  "hw_mode": "a",
1093                  "basic_rates": "60 120 240",
1094                  "channel": "161",
1095                  "ieee80211d": "1",
1096                  "ieee80211h": "1",
1097                  "ieee80211n": "1",
1098                  "ieee80211ac": "1",
1099                  "ieee80211ax": "1",
1100                  "ht_capab": "[HT40+][SHORT-GI-20][SHORT-GI-40][DSSS_CCK-40]",
1101                  "vht_capab": "[RXLDPC][SHORT-GI-80][TX-STBC-2BY1][RX-STBC1][MAX-MPDU-11454][MAX-A-MPDU-LEN-EXP7]",
1102                  "vht_oper_chwidth": "0",
1103                  "vht_oper_centr_freq_seg0_idx": "155",
1104                  "he_oper_chwidth": "0",
1105                  "he_oper_centr_freq_seg0_idx": "155"}
1106        hapd = hostapd.add_ap(ap, params)
1107        dev.connect("test-he40", scan_freq="5805", key_mgmt="NONE")
1108        dev.wait_regdom(country_ie=True)
1109        hwsim_utils.test_connectivity(dev, hapd)
1110    finally:
1111        clear_regdom(hapd, devs)
1112
1113def test_he80_to_24g_he(dev, apdev):
1114    """HE with 80 MHz channel width reconfigured to 2.4 GHz HE"""
1115    try:
1116        hapd = None
1117        params = {"ssid": "he",
1118                  "country_code": "FI",
1119                  "hw_mode": "a",
1120                  "channel": "36",
1121                  "ht_capab": "[HT40+]",
1122                  "ieee80211n": "1",
1123                  "ieee80211ac": "1",
1124                  "ieee80211ax": "1",
1125                  "vht_oper_chwidth": "1",
1126                  "vht_capab": "[MAX-MPDU-11454]",
1127                  "vht_oper_centr_freq_seg0_idx": "42",
1128                  "he_oper_chwidth": "1",
1129                  "he_oper_centr_freq_seg0_idx": "42"}
1130        hapd = hostapd.add_ap(apdev[0], params)
1131        bssid = apdev[0]['bssid']
1132
1133        hapd.disable()
1134        hapd.set("ieee80211ac", "0")
1135        hapd.set("hw_mode", "g")
1136        hapd.set("channel", "1")
1137        hapd.set("ht_capab", "")
1138        hapd.set("vht_capab", "")
1139        hapd.set("he_oper_chwidth", "")
1140        hapd.set("he_oper_centr_freq_seg0_idx", "")
1141        hapd.set("vht_oper_chwidth", "")
1142        hapd.set("vht_oper_centr_freq_seg0_idx", "")
1143        hapd.enable()
1144
1145        dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1146    except Exception as e:
1147        if isinstance(e, Exception) and str(e) == "AP startup failed":
1148            if not he_supported():
1149                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1150        raise
1151    finally:
1152        dev[0].request("DISCONNECT")
1153        clear_regdom(hapd, dev)
1154
1155def test_he_twt(dev, apdev):
1156    """HE and TWT"""
1157    params = {"ssid": "he",
1158              "ieee80211ax": "1",
1159              "he_bss_color": "42",
1160              "he_twt_required":"1"}
1161    hapd = hostapd.add_ap(apdev[0], params)
1162
1163    dev[0].connect("he", key_mgmt="NONE", scan_freq="2412")
1164    if "OK" not in dev[0].request("TWT_SETUP"):
1165        raise Exception("TWT_SETUP failed")
1166    if "OK" not in dev[0].request("TWT_TEARDOWN"):
1167        raise Exception("TWT_SETUP failed")
1168    if "OK" not in dev[0].request("TWT_SETUP dialog=123 exponent=9 mantissa=10 min_twt=254 setup_cmd=1 twt=1234567890 requestor=1 trigger=0 implicit=0 flow_type=0 flow_id=2 protection=1 twt_channel=3 control=16"):
1169        raise Exception("TWT_SETUP failed")
1170    if "OK" not in dev[0].request("TWT_TEARDOWN flags=255"):
1171        raise Exception("TWT_SETUP failed")
1172
1173def test_he_6ghz(dev, apdev):
1174    """HE with 20 MHz channel width on 6 GHz"""
1175    check_sae_capab(dev[0])
1176
1177    try:
1178        dev[0].set("sae_pwe", "1")
1179        hapd = None
1180        params = {"ssid": "he",
1181                  "country_code": "DE",
1182                  "op_class": "131",
1183                  "channel": "5",
1184                  "ieee80211ax": "1",
1185                  "wpa": "2",
1186                  "rsn_pairwise": "CCMP",
1187                  "wpa_key_mgmt": "SAE",
1188                  "sae_pwe": "1",
1189                  "sae_password": "password",
1190                  "ieee80211w": "2"}
1191        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1192        bssid = apdev[0]['bssid']
1193
1194        dev[0].set("sae_groups", "")
1195        dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1196                       ieee80211w="2", scan_freq="5975")
1197        hwsim_utils.test_connectivity(dev[0], hapd)
1198        sig = dev[0].request("SIGNAL_POLL").splitlines()
1199        if "FREQUENCY=5975" not in sig:
1200            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1201        if "WIDTH=20 MHz" not in sig:
1202            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1203        status = dev[0].get_status()
1204        if 'wifi_generation' not in status:
1205            # For now, assume this is because of missing kernel support
1206            raise HwsimSkip("Association Request IE reporting not supported")
1207            #raise Exception("Missing wifi_generation information")
1208        if status['wifi_generation'] != "6":
1209            raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1210        status = hapd.get_status()
1211        logger.info("hostapd STATUS: " + str(status))
1212        if status["ieee80211ax"] != "1":
1213            raise Exception("Unexpected STATUS ieee80211ax value")
1214        if status["he_oper_chwidth"] != "0":
1215            raise Exception("Unexpected STATUS he_oper_chwidth value")
1216
1217        sta = hapd.get_sta(dev[0].own_addr())
1218        logger.info("hostapd STA: " + str(sta))
1219        if "[HE]" not in sta['flags']:
1220            raise Exception("Missing STA flag: HE")
1221
1222    except Exception as e:
1223        if isinstance(e, Exception) and str(e) == "AP startup failed":
1224            if not he_supported():
1225                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1226        raise
1227    finally:
1228        dev[0].request("DISCONNECT")
1229        dev[0].set("sae_pwe", "0")
1230        clear_regdom(hapd, dev)
1231
1232def test_he_6ghz_auto_security(dev, apdev):
1233    """HE on 6 GHz and automatic security settings on STA"""
1234    check_sae_capab(dev[0])
1235    try:
1236        hapd = None
1237        params = {"ssid": "he",
1238                  "country_code": "DE",
1239                  "op_class": "131",
1240                  "channel": "5",
1241                  "ieee80211ax": "1",
1242                  "wpa": "2",
1243                  "ieee80211w": "2",
1244                  "rsn_pairwise": "CCMP",
1245                  "wpa_key_mgmt": "SAE",
1246                  "sae_password": "password"}
1247        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1248        bssid = apdev[0]['bssid']
1249
1250        dev[0].set("sae_groups", "")
1251        dev[0].connect("he", psk="password", key_mgmt="SAE WPA-PSK",
1252                       ieee80211w="1", scan_freq="5975")
1253        status = dev[0].get_status()
1254        if "pmf" not in status:
1255            raise Exception("pmf missing from status")
1256        if status["pmf"] != "2":
1257            raise Exception("Unexpected pmf status value: " + status["pmf"])
1258
1259        hapd.wait_sta()
1260        sta = hapd.get_sta(dev[0].own_addr())
1261        if sta["hostapdMFPR"] != "1":
1262            raise Exception("STA did not indicate MFPR=1")
1263    except Exception as e:
1264        if isinstance(e, Exception) and str(e) == "AP startup failed":
1265            if not he_supported():
1266                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1267        raise
1268    finally:
1269        dev[0].request("DISCONNECT")
1270        clear_regdom(hapd, dev)
1271
1272    params = hostapd.wpa2_params(ssid="he", passphrase="password")
1273    hapd = hostapd.add_ap(apdev[1], params)
1274    bssid = apdev[1]['bssid']
1275    dev[0].scan_for_bss(bssid, freq=2412)
1276    dev[0].request("RECONNECT")
1277    dev[0].wait_connected()
1278    status = dev[0].get_status()
1279    if "pmf" in status:
1280        raise Exception("Unexpected pmf status value(2): " + status["pmf"])
1281    hapd.wait_sta()
1282    sta = hapd.get_sta(dev[0].own_addr())
1283    if "[MFP]" in sta["flags"]:
1284        raise Exception("MFP reported unexpectedly(2)")
1285
1286def he_6ghz_acs(dev, apdev, op_class, bw):
1287    check_sae_capab(dev[0])
1288
1289    try:
1290        dev[0].set("sae_pwe", "1")
1291        hapd = None
1292        params = {"ssid": "he",
1293                  "country_code": "DE",
1294                  "op_class": str(op_class),
1295                  "hw_mode": "a",
1296                  "channel": "0",
1297                  "ieee80211ax": "1",
1298                  "wpa": "2",
1299                  "rsn_pairwise": "CCMP",
1300                  "wpa_key_mgmt": "SAE",
1301                  "sae_pwe": "1",
1302                  "sae_password": "password",
1303                  "ieee80211w": "2"}
1304        hapd = hostapd.add_ap(apdev[0], params, wait_enabled=False)
1305        wait_acs(hapd)
1306        bssid = apdev[0]['bssid']
1307
1308        freq = hapd.get_status_field("freq")
1309        if int(freq) < 5955:
1310            raise Exception("Unexpected frequency: " + freq)
1311
1312        sec = hapd.get_status_field("secondary_channel")
1313        if bw > 20 and int(sec) == 0:
1314            raise Exception("Secondary channel not set")
1315        if bw == 20 and int(sec) != 0:
1316            raise Exception("Secondary channel set")
1317
1318        dev[0].set("sae_groups", "")
1319        dev[0].connect("he", sae_password="password", key_mgmt="SAE",
1320                       ieee80211w="2", scan_freq=freq)
1321        hwsim_utils.test_connectivity(dev[0], hapd)
1322        sig = dev[0].request("SIGNAL_POLL").splitlines()
1323        if "FREQUENCY=" + freq not in sig:
1324            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1325        if "WIDTH=" + str(bw) + " MHz" not in sig:
1326            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1327    except Exception as e:
1328        if isinstance(e, Exception) and str(e) == "AP startup failed":
1329            if not he_supported():
1330                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1331        raise
1332    finally:
1333        dev[0].request("DISCONNECT")
1334        dev[0].set("sae_pwe", "0")
1335        clear_regdom(hapd, dev)
1336
1337def test_he_6ghz_acs_20mhz(dev, apdev):
1338    """HE with ACS on 6 GHz using a 20 MHz channel"""
1339    he_6ghz_acs(dev, apdev, 131, 20)
1340
1341def test_he_6ghz_acs_40mhz(dev, apdev):
1342    """HE with ACS on 6 GHz using a 40 MHz channel"""
1343    he_6ghz_acs(dev, apdev, 132, 40)
1344
1345def test_he_6ghz_acs_80mhz(dev, apdev):
1346    """HE with ACS on 6 GHz using an 80 MHz channel"""
1347    he_6ghz_acs(dev, apdev, 133, 80)
1348
1349def test_he_6ghz_acs_160mhz(dev, apdev):
1350    """HE with ACS on 6 GHz using a 160 MHz channel"""
1351    he_6ghz_acs(dev, apdev, 134, 160)
1352
1353def test_he_6ghz_security(dev, apdev):
1354    """HE AP and 6 GHz security parameter validation"""
1355    params = {"ssid": "he",
1356              "ieee80211ax": "1",
1357              "op_class": "131",
1358              "channel": "1"}
1359    hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
1360
1361    # Pre-RSNA security methods are not allowed in 6 GHz
1362    if "FAIL" not in hapd.request("ENABLE"):
1363        raise Exception("Invalid configuration accepted(1)")
1364
1365    # Management frame protection is required in 6 GHz"
1366    hapd.set("wpa", "2")
1367    hapd.set("wpa_passphrase", "12345678")
1368    hapd.set("wpa_key_mgmt", "SAE")
1369    hapd.set("rsn_pairwise", "CCMP")
1370    hapd.set("ieee80211w", "1")
1371    if "FAIL" not in hapd.request("ENABLE"):
1372        raise Exception("Invalid configuration accepted(2)")
1373
1374    # Invalid AKM suite for 6 GHz
1375    hapd.set("ieee80211w", "2")
1376    hapd.set("wpa_key_mgmt", "SAE WPA-PSK")
1377    if "FAIL" not in hapd.request("ENABLE"):
1378        raise Exception("Invalid configuration accepted(3)")
1379
1380    # Invalid pairwise cipher suite for 6 GHz
1381    hapd.set("wpa_key_mgmt", "SAE")
1382    hapd.set("rsn_pairwise", "CCMP TKIP")
1383    if "FAIL" not in hapd.request("ENABLE"):
1384        raise Exception("Invalid configuration accepted(4)")
1385
1386    # Invalid group cipher suite for 6 GHz
1387    hapd.set("wpa_key_mgmt", "SAE")
1388    hapd.set("rsn_pairwise", "CCMP")
1389    hapd.set("group_cipher", "TKIP")
1390    if "FAIL" not in hapd.request("ENABLE"):
1391        raise Exception("Invalid configuration accepted(5)")
1392
1393def test_he_prefer_he20(dev, apdev):
1394    """Preference on HE20 over HT20"""
1395    params = {"ssid": "he",
1396              "channel": "1",
1397              "ieee80211ax": "0",
1398              "ieee80211n": "1"}
1399    hapd = hostapd.add_ap(apdev[0], params)
1400    bssid = apdev[0]['bssid']
1401    params = {"ssid": "test",
1402              "channel": "1",
1403              "ieee80211ax": "1",
1404              "ieee80211n": "1"}
1405    hapd2 = hostapd.add_ap(apdev[1], params)
1406    bssid2 = apdev[1]['bssid']
1407
1408    dev[0].scan_for_bss(bssid, freq=2412)
1409    dev[0].scan_for_bss(bssid2, freq=2412)
1410    dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
1411    if dev[0].get_status_field('bssid') != bssid2:
1412        raise Exception("Unexpected BSS selected")
1413
1414    est = dev[0].get_bss(bssid)['est_throughput']
1415    if est != "65000":
1416        raise Exception("Unexpected BSS0 est_throughput: " + est)
1417
1418    est = dev[0].get_bss(bssid2)['est_throughput']
1419    if est != "143402":
1420        raise Exception("Unexpected BSS1 est_throughput: " + est)
1421
1422def test_he_capab_parsing(dev, apdev):
1423    """HE AP and capability parsing"""
1424    params = {"ssid": "he",
1425              "ieee80211ax": "1",
1426              "he_bss_color": "42",
1427              "he_mu_edca_ac_be_ecwmin": "7",
1428              "he_mu_edca_ac_be_ecwmax": "15"}
1429    hapd = hostapd.add_ap(apdev[0], params)
1430
1431    hapd.set("ext_mgmt_frame_handling", "1")
1432    bssid = hapd.own_addr().replace(':', '')
1433    addr = "020304050607"
1434    addr_ = "02:03:04:05:06:07"
1435
1436    tests = []
1437    mac_capa = binascii.unhexlify("0178c81a4000")
1438    phy_capa = binascii.unhexlify("00bfce0000000000000000")
1439    mcs_nss = binascii.unhexlify("faff")
1440    payload = mac_capa + phy_capa + 2*mcs_nss
1441    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1442    tests += [ (hdr + payload, True) ]
1443
1444    phy_capa = binascii.unhexlify("08bfce0000000000000000")
1445    payload = mac_capa + phy_capa + 4*mcs_nss
1446    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1447    tests += [ (hdr + payload, True) ]
1448
1449    phy_capa = binascii.unhexlify("10bfce0000000000000000")
1450    payload = mac_capa + phy_capa + 4*mcs_nss
1451    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1452    tests += [ (hdr + payload, True) ]
1453
1454    phy_capa = binascii.unhexlify("18bfce0000000000000000")
1455    payload = mac_capa + phy_capa + 6*mcs_nss
1456    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1457    tests += [ (hdr + payload, True) ]
1458
1459    # Missing PPE Threshold field
1460    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1461    payload = mac_capa + phy_capa + 2*mcs_nss
1462    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1463    tests += [ (hdr + payload, False) ]
1464
1465    # Truncated PPE Threshold field
1466    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1467    payload = mac_capa + phy_capa + 2*mcs_nss + struct.pack('B', 0x79)
1468    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1469    tests += [ (hdr + payload, False) ]
1470
1471    # Extra field at the end (without PPE Threshold field)
1472    phy_capa = binascii.unhexlify("00bfce0000000000000000")
1473    payload = mac_capa + phy_capa + 2*mcs_nss
1474    payload += struct.pack('B', 0)
1475    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1476    tests += [ (hdr + payload, True) ]
1477
1478    # Extra field at the end (with PPE Threshold field)
1479    phy_capa = binascii.unhexlify("00bfce0000008000000000")
1480    payload = mac_capa + phy_capa + 2*mcs_nss
1481    payload += binascii.unhexlify("79000000000000")
1482    payload += struct.pack('B', 0)
1483    hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1484    tests += [ (hdr + payload, True) ]
1485
1486    ppet = []
1487    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x0
1488    # --> 3 + 4 + 0 * 6 * 1 = 7 bits --> 1 octet
1489    ppet += ["00"]
1490    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x1
1491    # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1492    ppet += ["08" + "00"]
1493    # NSTS=1 (i.e., NSTS field value 0), RU Index Bitmask=0x8
1494    # --> 3 + 4 + 1 * 6 * 1 = 13 bits --> 2 octets
1495    ppet += ["40" + "00"]
1496    # NSTS=2 (i.e., NSTS field value 1), RU Index Bitmask=0xf
1497    # --> 3 + 4 + 4 * 6 * 2 = 55 bits --> 7 octets
1498    ppet += ["79" + 6*"00"]
1499    # NSTS=3 (i.e., NSTS field value 2), RU Index Bitmask=0xf
1500    # --> 3 + 4 + 4 * 6 * 3 = 79 bits --> 10 octets
1501    ppet += ["7a" + 9*"00"]
1502    # NSTS=4 (i.e., NSTS field value 3), RU Index Bitmask=0x5
1503    # --> 3 + 4 + 2 * 6 * 4 = 55 bits --> 7 octets
1504    ppet += ["2b" + 6*"00"]
1505    # NSTS=8 (i.e., NSTS field value 7), RU Index Bitmask=0xf
1506    # --> 3 + 4 + 4 * 6 * 8 = 199 bits --> 25 octets
1507    ppet += ["ff" + 24*"00"]
1508    for p in ppet:
1509        phy_capa = binascii.unhexlify("00bfce0000008000000000")
1510        payload = mac_capa + phy_capa + 2*mcs_nss + binascii.unhexlify(p)
1511        hdr = struct.pack('BBB', 255, 1 + len(payload), 35)
1512        tests += [ (hdr + payload, True) ]
1513
1514    for capab, result in tests:
1515        auth = "b0003a01" + bssid + addr + bssid + '1000000001000000'
1516        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % auth):
1517            raise Exception("MGMT_RX_PROCESS failed")
1518
1519        he_capab = binascii.hexlify(capab).decode()
1520
1521        ies = "00026865" # SSID
1522        ies += "010802040b160c121824" # Supp Rates
1523        ies += "32043048606c" # Ext Supp Rates
1524        ies += "2d1afe131bffff000000000000000000000100000000000000000000" # HT Capab
1525        ies += "7f0b04004a0201404040000120" # Ext Capab
1526        ies += he_capab
1527        ies += "3b155151525354737475767778797a7b7c7d7e7f808182" # Supp Op Classes
1528        ies += "dd070050f202000100" # WMM
1529
1530        assoc_req = "00003a01" + bssid + addr + bssid + "2000" + "21040500" + ies
1531        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % assoc_req):
1532            raise Exception("MGMT_RX_PROCESS failed")
1533
1534        sta = hapd.get_sta(addr_)
1535        if result:
1536            if "[HE]" not in sta['flags']:
1537                raise Exception("Missing STA flag: HE (HE Capab: %s)" % he_capab)
1538        else:
1539            if "[HE]" in sta['flags']:
1540                raise Exception("Unexpected STA flag: HE (HE Capab: %s)" % he_capab)
1541
1542        deauth = "c0003a01" + bssid + addr + bssid + "3000" + "0300"
1543        if "OK" not in hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % deauth):
1544            raise Exception("MGMT_RX_PROCESS failed")
1545
1546        hapd.dump_monitor()
1547
1548def test_he_cw_change_notification(dev, apdev):
1549    """HE AP on 80 MHz channel and CW change notification"""
1550    try:
1551        hapd = None
1552        params = {"ssid": "he",
1553                  "country_code": "FI",
1554                  "hw_mode": "a",
1555                  "channel": "36",
1556                  "ht_capab": "[HT40+]",
1557                  "ieee80211n": "1",
1558                  "ieee80211ac": "1",
1559                  "ieee80211ax": "1",
1560                  "vht_oper_chwidth": "1",
1561                  "vht_oper_centr_freq_seg0_idx": "42",
1562                  "he_oper_chwidth": "1",
1563                  "he_oper_centr_freq_seg0_idx": "42"}
1564        hapd = hostapd.add_ap(apdev[0], params)
1565        bssid = apdev[0]['bssid']
1566
1567        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1568        dev[1].connect("he", key_mgmt="NONE", scan_freq="5180",
1569                       disable_he="1")
1570        dev[2].connect("he", key_mgmt="NONE", scan_freq="5180",
1571                       disable_he="1", disable_vht="1")
1572
1573        sta = hapd.get_sta(dev[0].own_addr())
1574        logger.info("hostapd STA0: " + str(sta))
1575        if "[HT]" not in sta['flags']:
1576            raise Exception("Missing STA0 flag: HT")
1577        if "[VHT]" not in sta['flags']:
1578            raise Exception("Missing STA0 flag: VHT")
1579        if "[HE]" not in sta['flags']:
1580            raise Exception("Missing STA0 flag: HE")
1581
1582        sta = hapd.get_sta(dev[1].own_addr())
1583        logger.info("hostapd STA1: " + str(sta))
1584        if "[HT]" not in sta['flags']:
1585            raise Exception("Missing STA1 flag: HT")
1586        if "[VHT]" not in sta['flags']:
1587            raise Exception("Missing STA1 flag: VHT")
1588        if "[HE]" in sta['flags']:
1589            raise Exception("Unexpected STA1 flag: HE")
1590
1591        sta = hapd.get_sta(dev[2].own_addr())
1592        logger.info("hostapd STA1: " + str(sta))
1593        if "[HT]" not in sta['flags']:
1594            raise Exception("Missing STA2 flag: HT")
1595        if "[VHT]" in sta['flags']:
1596            raise Exception("Unxpected STA2 flag: VHT")
1597        if "[HE]" in sta['flags']:
1598            raise Exception("Unexpected STA2 flag: HE")
1599
1600        for i in [2, 1, 0]:
1601            if "OK" not in hapd.request("NOTIFY_CW_CHANGE %d" % i):
1602                raise Exception("NOTIFY_CW_CHANGE %d failed" % i)
1603
1604            time.sleep(1)
1605            hwsim_utils.test_connectivity(dev[0], hapd)
1606            hwsim_utils.test_connectivity(dev[1], hapd)
1607            hwsim_utils.test_connectivity(dev[2], hapd)
1608    except Exception as e:
1609        if isinstance(e, Exception) and str(e) == "AP startup failed":
1610            if not he_supported():
1611                raise HwsimSkip("80 MHz channel not supported in regulatory information")
1612        raise
1613    finally:
1614        dev[0].request("DISCONNECT")
1615        dev[1].request("DISCONNECT")
1616        dev[2].request("DISCONNECT")
1617        clear_regdom(hapd, dev)
1618
1619def he_verify_status(wpas, hapd, freq, bw, is_6ghz=True):
1620    status = hapd.get_status()
1621    logger.info("hostapd STATUS: " + str(status))
1622
1623    if status["ieee80211n"] != "1":
1624        raise Exception("Unexpected STATUS ieee80211n value")
1625    if status["ieee80211ac"] != "1":
1626        raise Exception("Unexpected STATUS ieee80211ac value")
1627    if status["ieee80211ax"] != "1":
1628        raise Exception("Unexpected STATUS ieee80211ax value")
1629
1630    sta = hapd.get_sta(wpas.own_addr())
1631    if "[HE]" not in sta['flags']:
1632        raise Exception("Missing STA flag: HE")
1633    if is_6ghz and "[6GHZ]" not in sta['flags']:
1634        raise Exception("Missing STA flag: 6GHZ")
1635
1636    sig = wpas.request("SIGNAL_POLL").splitlines()
1637    if "FREQUENCY=%s" % freq not in sig:
1638        raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
1639    if "WIDTH=%s MHz" % bw not in sig:
1640        raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
1641
1642def he_verify_wifi_version(dev):
1643    status = dev.get_status()
1644    logger.info("station status: " + str(status))
1645
1646    # For now, assume this is because of missing kernel support
1647    if 'wifi_generation' not in status:
1648        raise HwsimSkip("Association Request IE reporting not supported")
1649        #raise Exception("Missing wifi_generation information")
1650
1651    if status['wifi_generation'] != "6":
1652        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
1653
1654def test_he_6ghz_reg(dev, apdev):
1655    """TX power control on 6 GHz"""
1656    check_sae_capab(dev[0])
1657    try:
1658        ssid = "HE_6GHz_regulatory"
1659        freq = 5975
1660        bw = "20"
1661        hapd = None
1662        params = {"ssid": ssid,
1663                  "country_code": "DE",
1664                  "hw_mode": "a",
1665                  "ieee80211ax": "1",
1666                  "wpa": "2",
1667                  "rsn_pairwise": "CCMP",
1668                  "wpa_key_mgmt": "SAE",
1669                  "sae_pwe": "1",
1670                  "sae_password": "password",
1671                  "ieee80211w": "2",
1672                  "channel": "5",
1673                  "op_class": "131",
1674                  "he_oper_centr_freq_seg0_idx": "5",
1675                  "ieee80211d": "1",
1676                  "ieee80211h": "1",
1677                  "ieee80211n": "1",
1678                  "ieee80211ac": "1",
1679                  "local_pwr_constraint": "4",
1680                  # Set the 6 GHz regulatory power configuration
1681                  "he_6ghz_reg_pwr_type": "0",
1682                  # Note: hostapd uses "Maximum Transmit Power Interpretation"
1683                  # set to "Regulatory client EIRP PSD", so the values should
1684                  # be set accordingly.
1685                  "reg_def_cli_eirp_psd": "3",
1686                  "reg_sub_cli_eirp_psd": "2"}
1687
1688        hapd = hostapd.add_ap(apdev[0], params, set_channel=False)
1689
1690        dev[0].set("sae_pwe", "1")
1691        dev[0].set("sae_groups", "")
1692        dev[0].connect(ssid, sae_password="password", key_mgmt="SAE",
1693                       ieee80211w="2", scan_freq=str(freq))
1694        hapd.wait_sta()
1695
1696        he_verify_status(dev[0], hapd, freq, bw)
1697        he_verify_wifi_version(dev[0])
1698        hwsim_utils.test_connectivity(dev[0], hapd)
1699
1700        # Configure different values related to power constraints and update
1701        # the Beacon frame contents.
1702        hapd.set("local_pwr_constraint", "2")
1703        hapd.set("he_6ghz_reg_pwr_type", "2")
1704        hapd.set("reg_def_cli_eirp_psd", "2")
1705        hapd.set("reg_sub_cli_eirp_psd", "1")
1706
1707        # In addition, inject a Transmit Power Envelope as an vendor element
1708        hapd.set("vendor_elements", "c303190202")
1709
1710        if "OK" not in hapd.request("UPDATE_BEACON"):
1711            raise Exception("UPDATE_BEACON failed")
1712
1713        # Allow few more Beacon frames
1714        time.sleep(0.5)
1715
1716        # Modify the regulatory power type to SP and provide the client EIRP
1717        # limit
1718        # EIRP = PSD + 10 * log(channel width)
1719        # 16 = 3 + 10 * log(20)
1720        hapd.set("vendor_elements", "")
1721        hapd.set("he_6ghz_reg_pwr_type", "1")
1722        hapd.set("reg_def_cli_eirp", "14")
1723
1724        if "OK" not in hapd.request("UPDATE_BEACON"):
1725            raise Exception("UPDATE_BEACON failed")
1726
1727        # Allow few more Beacon frames
1728        time.sleep(0.5)
1729    except Exception as e:
1730        if isinstance(e, Exception) and str(e) == "AP startup failed":
1731            if not he_supported():
1732                raise HwsimSkip("HE 6 GHz channel not supported in regulatory information")
1733        raise
1734    finally:
1735        dev[0].request("DISCONNECT")
1736        dev[0].set("sae_pwe", "0")
1737        dev[0].wait_disconnected()
1738        clear_regdom(hapd, dev)
1739
1740def test_he_downgrade_40mhz_to_20mhz(dev, apdev):
1741    """HE AP and downgrade from 40 MHz to 20 MHz due to regulatory constraints"""
1742    # Try to configure 40 MHz channel when the regdb limits this frequency to
1743    # 20 MHz.
1744    params = {"ssid": "he",
1745              "country_code": "AM",
1746              "channel": "36",
1747              "op_class": "116",
1748              "ieee80211n": "1",
1749              "ieee80211ac": "1",
1750              "ieee80211ax": "1",
1751              "hw_mode": "a",
1752              "ht_capab": "[HT40+]",
1753              "vht_oper_chwidth": "0",
1754              "he_oper_chwidth": "0" }
1755    run_he_downgrade_to_20_mhz(dev, apdev, params)
1756
1757def test_he_downgrade_40mhz_plus_minus_to_20mhz(dev, apdev):
1758    """HE AP and downgrade from 40 MHz (+/-) to 20 MHz due to regulatory constraints"""
1759    # Try to configure 40 MHz channel when the regdb limits this frequency to
1760    # 20 MHz.
1761    params = {"ssid": "he",
1762              "country_code": "AM",
1763              "channel": "36",
1764              "op_class": "116",
1765              "ieee80211n": "1",
1766              "ieee80211ac": "1",
1767              "ieee80211ax": "1",
1768              "hw_mode": "a",
1769              "ht_capab": "[HT40+][HT40-]",
1770              "vht_oper_chwidth": "0",
1771              "he_oper_chwidth": "0" }
1772    run_he_downgrade_to_20_mhz(dev, apdev, params)
1773
1774def test_he_downgrade_80mhz_to_20mhz(dev, apdev):
1775    """HE AP and downgrade from 80 MHz to 20 MHz due to regulatory constraints"""
1776    # Try to configure 80 MHz channel when the regdb limits this frequency to
1777    # 20 MHz.
1778    params = {"ssid": "he",
1779              "country_code": "AM",
1780              "channel": "36",
1781              "op_class": "128",
1782              "ieee80211n": "1",
1783              "ieee80211ac": "1",
1784              "ieee80211ax": "1",
1785              "hw_mode": "a",
1786              "ht_capab": "[HT40+]",
1787              "vht_oper_centr_freq_seg0_idx": "42",
1788              "he_oper_centr_freq_seg0_idx": "42",
1789              "vht_oper_chwidth": "1",
1790              "he_oper_chwidth": "1" }
1791    run_he_downgrade_to_20_mhz(dev, apdev, params)
1792
1793def run_he_downgrade_to_20_mhz(dev, apdev, params):
1794    try:
1795        hapd = None
1796        hapd = hostapd.add_ap(apdev[0], params)
1797        dev[0].connect("he", key_mgmt="NONE", scan_freq="5180")
1798        sig = dev[0].request("SIGNAL_POLL").splitlines()
1799        logger.info("SIGNAL_POLL: " + str(sig))
1800        if "WIDTH=20 MHz" not in sig:
1801            raise Exception("20 MHz channel width not reported")
1802        dev[0].request("DISCONNECT")
1803        dev[0].wait_disconnected()
1804        hapd.wait_sta_disconnect()
1805    finally:
1806        dev[0].request("DISCONNECT")
1807        clear_regdom(hapd, dev)
1808
1809def test_he_bss_color_change(dev, apdev):
1810    """HE AP with color change"""
1811    params = {"ssid": "test_he",
1812              "ieee80211ax": "1",
1813              "he_bss_color": "42",
1814              "he_mu_edca_ac_be_ecwmin": "7",
1815              "he_mu_edca_ac_be_ecwmax": "15"}
1816    hapd = hostapd.add_ap(apdev[0], params)
1817    if hapd.get_status_field("ieee80211ax") != "1":
1818        raise Exception("STATUS did not indicate ieee80211ax=1")
1819
1820    color = hapd.get_status_field("he_bss_color")
1821    if color != "42":
1822        raise Exception("Expected current he_bss_color to be 42; was " + color)
1823
1824    # Small sleep to capture Beacon frames before the change
1825    time.sleep(0.5)
1826
1827    # Change color by doing CCA
1828    if "OK" not in hapd.request("COLOR_CHANGE 20"):
1829        raise Exception("COLOR_CHANGE failed")
1830    time.sleep(1.5)
1831
1832    color = hapd.get_status_field("he_bss_color")
1833    if color != "20":
1834        raise Exception("Expected current he_bss_color to be 20")
1835
1836    # Disable color by setting value to 0
1837    if "OK" not in hapd.request("COLOR_CHANGE 0"):
1838        raise Exception("COLOR_CHANGE failed")
1839    time.sleep(1.5)
1840
1841    color = hapd.get_status_field("he_bss_color")
1842    if color is not None:
1843        raise Exception("Expected he_bss_color to get disabled but found " + color)
1844
1845    # Enable color back by setting same previous color value
1846    if "OK" not in hapd.request("COLOR_CHANGE 20"):
1847        raise Exception("COLOR_CHANGE failed")
1848    time.sleep(1.5)
1849
1850    color = hapd.get_status_field("he_bss_color")
1851    if color != "20":
1852        raise Exception("Expected current he_bss_color to be 20")
1853
1854    hapd.dump_monitor()
1855