1# EHT tests
2# Copyright (c) 2022-2024, Qualcomm Innovation Center, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import subprocess
9import tempfile
10
11import hostapd
12from utils import *
13from hwsim import HWSimRadio
14import hwsim_utils
15from wpasupplicant import WpaSupplicant
16import re
17import mld
18from tshark import run_tshark
19from test_gas import hs20_ap_params
20from test_dpp import check_dpp_capab, wait_auth_success
21from test_rrm import build_beacon_request, run_req_beacon, BeaconReport
22
23def eht_verify_wifi_version(dev):
24    status = dev.get_status()
25    logger.info("station status: " + str(status))
26
27    if 'wifi_generation' not in status:
28        raise Exception("Missing wifi_generation information")
29    if status['wifi_generation'] != "7":
30        raise Exception("Unexpected wifi_generation value: " + status['wifi_generation'])
31
32def _eht_get_links_bitmap(wpas, name):
33    vfile = "/sys/kernel/debug/ieee80211/%s/netdev:%s/%s" % \
34        (wpas.get_driver_status_field("phyname"), wpas.ifname, name)
35
36    if wpas.cmd_execute(["ls", vfile])[0] != 0:
37        logger_info("%s not supported in mac80211: %s" % (name, vfile))
38        return 0
39
40    res, out = wpas.cmd_execute(["cat", vfile], shell=True)
41    if res != 0:
42        raise Exception("Failed to read %s" % fname)
43
44    logger.info("%s=%s" % (name, out))
45    return int(out, 16)
46
47def _eht_valid_links(wpas):
48    return _eht_get_links_bitmap(wpas, "valid_links")
49
50def _eht_active_links(wpas):
51    return _eht_get_links_bitmap(wpas, "active_links")
52
53def _eht_dormant_links(wpas):
54    return _eht_get_links_bitmap(wpas, "dormant_links")
55
56def _eht_verify_links(wpas, valid_links=0, active_links=0):
57    vlinks = _eht_valid_links(wpas)
58    if vlinks != valid_links:
59        raise Exception("Unexpected valid links (0x%04x != 0x%04x)" % (vlinks, valid_links))
60
61    alinks = _eht_active_links(wpas)
62    if alinks != active_links:
63        raise Exception("Unexpected active links (0x%04x != 0x%04x)" % (alinks, active_links))
64
65def eht_verify_status(wpas, hapd, freq, bw, is_ht=False, is_vht=False,
66                      mld=False, valid_links=0, active_links=0):
67    status = hapd.get_status()
68
69    logger.info("hostapd STATUS: " + str(status))
70    if is_ht and status["ieee80211n"] != "1":
71        raise Exception("Unexpected STATUS ieee80211n value")
72    if is_vht and status["ieee80211ac"] != "1":
73        raise Exception("Unexpected STATUS ieee80211ac value")
74    if status["ieee80211ax"] != "1":
75        raise Exception("Unexpected STATUS ieee80211ax value")
76    if status["ieee80211be"] != "1":
77        raise Exception("Unexpected STATUS ieee80211be value")
78
79    sta = hapd.get_sta(wpas.own_addr())
80    logger.info("hostapd STA: " + str(sta))
81    if sta['addr'] == 'FAIL':
82        raise Exception("hostapd " + hapd.ifname + " did not have a STA entry for the STA " + wpas.own_addr())
83    if is_ht and "[HT]" not in sta['flags']:
84        raise Exception("Missing STA flag: HT")
85    if is_vht and "[VHT]" not in sta['flags']:
86        raise Exception("Missing STA flag: VHT")
87    if "[HE]" not in sta['flags']:
88        raise Exception("Missing STA flag: HE")
89    if "[EHT]" not in sta['flags']:
90        raise Exception("Missing STA flag: EHT")
91
92    sig = wpas.request("SIGNAL_POLL").splitlines()
93
94    # TODO: With MLD connection, signal poll logic is still not implemented.
95    # While mac80211 maintains the station using the MLD address, the
96    # information is maintained in the link stations, but it is not sent to
97    # user space yet.
98    if not mld:
99        if "FREQUENCY=%s" % freq not in sig:
100            raise Exception("Unexpected SIGNAL_POLL value(1): " + str(sig))
101        if "WIDTH=%s MHz" % bw not in sig:
102            raise Exception("Unexpected SIGNAL_POLL value(2): " + str(sig))
103
104    # Active links are updated in async work after the connection.
105    # Sleep a bit to allow it to run.
106    time.sleep(0.1)
107    _eht_verify_links(wpas, valid_links, active_links)
108
109def traffic_test(wpas, hapd, success=True, ifname2=None):
110    hwsim_utils.test_connectivity(wpas, hapd, success_expected=success,
111                                  ifname2=ifname2)
112
113def test_eht_open(dev, apdev):
114    """EHT AP with open mode configuration"""
115    params = {"ssid": "eht",
116              "ieee80211ax": "1",
117              "ieee80211be": "1"}
118    try:
119        hapd = hostapd.add_ap(apdev[0], params)
120    except Exception as e:
121        if isinstance(e, Exception) and \
122           str(e) == "Failed to set hostapd parameter ieee80211be":
123            raise HwsimSkip("EHT not supported")
124        raise
125    if hapd.get_status_field("ieee80211be") != "1":
126        raise Exception("AP STATUS did not indicate ieee80211be=1")
127    dev[0].connect("eht", key_mgmt="NONE", scan_freq="2412")
128    sta = hapd.get_sta(dev[0].own_addr())
129    if "[EHT]" not in sta['flags']:
130        raise Exception("Missing STA flag: EHT")
131    status = dev[0].request("STATUS")
132    if "wifi_generation=7" not in status:
133        raise Exception("STA STATUS did not indicate wifi_generation=7")
134
135def test_prefer_eht_20(dev, apdev):
136    """EHT AP on a 20 MHz channel"""
137    params = {"ssid": "eht",
138              "channel": "1",
139              "ieee80211ax": "1",
140              "ieee80211be" : "1",
141              "ieee80211n": "1"}
142    try:
143        hapd0 = hostapd.add_ap(apdev[0], params)
144
145        params["ieee80211be"] = "0"
146        hapd1 = hostapd.add_ap(apdev[1], params)
147    except Exception as e:
148        if isinstance(e, Exception) and \
149           str(e) == "Failed to set hostapd parameter ieee80211be":
150            raise HwsimSkip("EHT not supported")
151        raise
152
153    dev[0].connect("eht", key_mgmt="NONE")
154    if dev[0].get_status_field('bssid') != apdev[0]['bssid']:
155        raise Exception("dev[0] connected to unexpected AP")
156
157    est = dev[0].get_bss(apdev[0]['bssid'])['est_throughput']
158    if est != "172103":
159      raise Exception("Unexpected BSS1 est_throughput: " + est)
160
161def start_eht_sae_ap(apdev, ml=False, transition_mode=False,
162                     anti_clogging_token=False):
163    params = hostapd.wpa2_params(ssid="eht", passphrase="12345678")
164    params["ieee80211ax"] = "1"
165    params["ieee80211be"] = "1"
166    params['ieee80211w'] = '1' if transition_mode else '2'
167    params['rsn_pairwise'] = "CCMP GCMP-256" if transition_mode else "GCMP-256"
168    params['group_cipher'] = "CCMP" if transition_mode else "GCMP-256"
169    params["group_mgmt_cipher"] = "AES-128-CMAC" if transition_mode else "BIP-GMAC-256"
170    params['beacon_prot'] = '1'
171    params['wpa_key_mgmt'] = "SAE SAE-EXT-KEY WPA-PSK WPA-PSK-SHA256" if transition_mode else 'SAE-EXT-KEY'
172    params['sae_groups'] = "19 20" if transition_mode else "20"
173    params['sae_pwe'] = "2" if transition_mode else "1"
174    if anti_clogging_token:
175        params['sae_anti_clogging_threshold'] = '0'
176    if ml:
177        ml_elem = "ff0d6b" + "3001" + "0a" + "021122334455" + "01" + "00" + "00"
178        params['vendor_elements'] = ml_elem
179    try:
180        hapd = hostapd.add_ap(apdev, params)
181    except Exception as e:
182        if isinstance(e, Exception) and \
183           str(e) == "Failed to set hostapd parameter ieee80211be":
184            raise HwsimSkip("EHT not supported")
185        raise
186
187def test_eht_sae(dev, apdev):
188    """EHT AP with SAE"""
189    check_sae_capab(dev[0])
190
191    hapd = start_eht_sae_ap(apdev[0])
192    try:
193        dev[0].set("sae_groups", "20")
194        dev[0].set("sae_pwe", "2")
195        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
196                       ieee80211w="2", beacon_prot="1",
197                       pairwise="GCMP-256", group="GCMP-256",
198                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
199    finally:
200        dev[0].set("sae_groups", "")
201        dev[0].set("sae_pwe", "0")
202
203def test_eht_sae_mlo(dev, apdev):
204    """EHT+MLO AP with SAE"""
205    check_sae_capab(dev[0])
206
207    hapd = start_eht_sae_ap(apdev[0], ml=True)
208    try:
209        dev[0].set("sae_groups", "20")
210        dev[0].set("sae_pwe", "2")
211        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
212                       ieee80211w="2", beacon_prot="1",
213                       pairwise="GCMP-256", group="GCMP-256",
214                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
215    finally:
216        dev[0].set("sae_groups", "")
217        dev[0].set("sae_pwe", "0")
218
219def test_eht_sae_mlo_tm(dev, apdev):
220    """EHT+MLO AP with SAE and transition mode"""
221    check_sae_capab(dev[0])
222    check_sae_capab(dev[1])
223
224    hapd = start_eht_sae_ap(apdev[0], ml=True, transition_mode=True)
225    try:
226        dev[0].set("sae_groups", "20")
227        dev[0].set("sae_pwe", "2")
228        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
229                       ieee80211w="2", beacon_prot="1",
230                       pairwise="GCMP-256", group="CCMP",
231                       group_mgmt="AES-128-CMAC", scan_freq="2412")
232        dev[1].set("sae_groups", "19")
233        dev[1].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
234                       ieee80211w="2", beacon_prot="1",
235                       pairwise="CCMP", group="CCMP",
236                       group_mgmt="AES-128-CMAC", scan_freq="2412",
237                       disable_eht="1")
238        dev[2].connect("eht", key_mgmt="WPA-PSK", psk="12345678",
239                       pairwise="CCMP", group="CCMP",
240                       group_mgmt="AES-128-CMAC", scan_freq="2412",
241                       disable_eht="1")
242    finally:
243        dev[0].set("sae_groups", "")
244        dev[0].set("sae_pwe", "0")
245        dev[1].set("sae_groups", "")
246
247def test_eht_sae_mlo_anti_clogging_token(dev, apdev):
248    """EHT+MLO AP with SAE and anti-clogging token"""
249    check_sae_capab(dev[0])
250
251    hapd = start_eht_sae_ap(apdev[0], ml=True, anti_clogging_token=True)
252    try:
253        dev[0].set("sae_groups", "20")
254        dev[0].set("sae_pwe", "2")
255        dev[0].connect("eht", key_mgmt="SAE-EXT-KEY", psk="12345678",
256                       ieee80211w="2", beacon_prot="1",
257                       pairwise="GCMP-256", group="GCMP-256",
258                       group_mgmt="BIP-GMAC-256", scan_freq="2412")
259    finally:
260        dev[0].set("sae_groups", "")
261        dev[0].set("sae_pwe", "0")
262
263def eht_mld_enable_ap(iface, link_id, params):
264    hapd = hostapd.add_mld_link(iface, link_id, params)
265    hapd.enable()
266
267    ev = hapd.wait_event(["AP-ENABLED", "AP-DISABLED"], timeout=1)
268    if ev is None:
269        raise Exception("AP startup timed out")
270    if "AP-ENABLED" not in ev:
271        raise Exception("AP startup failed")
272
273    return hapd
274
275def eht_mld_ap_wpa2_params(ssid, passphrase=None, key_mgmt="WPA-PSK-SHA256",
276                           mfp="2", pwe=None, beacon_prot="1", bridge=False):
277    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase,
278                                 wpa_key_mgmt=key_mgmt, ieee80211w=mfp)
279    params['ieee80211n'] = '1'
280    params['ieee80211ax'] = '1'
281    params['ieee80211be'] = '1'
282    params['channel'] = '1'
283    params['hw_mode'] = 'g'
284    params['group_mgmt_cipher'] = "AES-128-CMAC"
285    params['beacon_prot'] = beacon_prot
286    if bridge:
287        params['bridge'] = 'ap-br0'
288
289    if pwe is not None:
290        params['sae_pwe'] = pwe
291
292    return params
293
294def _eht_mld_probe_req(wpas, hapd, tsf0, link_id=-1):
295    if "OK" not in wpas.request("ML_PROBE_REQ bssid=%s mld_id=0 link_id=%d" % (hapd.own_addr(), link_id)):
296        raise Exception("Failed to request ML probe request")
297
298    ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
299    if ev is None:
300        raise Exception("Scan did not start")
301
302    ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
303    if ev is None:
304        raise Exception("Scan did not complete")
305
306    logger.info("ML Probe request scan done")
307
308    bss = wpas.get_bss(hapd.own_addr())
309    if not bss:
310        raise Exception("AP did not reply to ML probe request")
311
312    tsf1 = int(bss['tsf'])
313    logger.info("tsf0=%s, tsf1=%s" % (tsf0, tsf1))
314
315    if tsf0 >= tsf1:
316        raise Exception("AP was not found in ML probe request scan")
317
318def test_eht_mld_discovery(dev, apdev):
319    """EHT MLD AP discovery"""
320    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
321        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
322
323        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
324        wpas.interface_add(wpas_iface)
325
326        ssid = "mld_ap"
327        link0_params = {"ssid": ssid,
328                        "hw_mode": "g",
329                        "channel": "1"}
330        link1_params = {"ssid": ssid,
331                        "hw_mode": "g",
332                        "channel": "2"}
333
334        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link0_params)
335        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link1_params)
336
337        # Only scan link 0
338        res = wpas.request("SCAN freq=2412")
339        if "FAIL" in res:
340            raise Exception("Failed to start scan")
341
342        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
343        if ev is None:
344            raise Exception("Scan did not start")
345
346        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
347        if ev is None:
348            raise Exception("Scan did not complete")
349
350        logger.info("Scan done")
351
352        rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
353                                 re.MULTILINE)
354        ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
355
356        bss = wpas.request("BSS " + hapd0.own_addr())
357        logger.info("BSS 0: " + str(bss))
358
359        if rnr_pattern.search(bss) is None:
360            raise Exception("RNR element not found for first link")
361
362        if ml_pattern.search(bss) is None:
363            raise Exception("ML element not found for first link")
364
365        # Save the tsf0 for checking ML Probe request scan later
366        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
367
368        if wpas.get_bss(hapd1.own_addr()) is not None:
369            raise Exception("BSS for link 1 found without ML probe request")
370
371        # Now send an ML probe request (for all links)
372        _eht_mld_probe_req(wpas, hapd0, tsf0)
373        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
374
375        # NOTE: hostapd incorrectly reports a TSF offset of zero
376        # This only works because the source is always the ML probe response
377        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])
378
379        bss = wpas.request("BSS " + hapd1.own_addr())
380        logger.info("BSS 1: " + str(bss))
381
382        if rnr_pattern.search(bss) is None:
383            raise Exception("RNR element not found for second link")
384
385        if ml_pattern.search(bss) is None:
386            raise Exception("ML element not found for second link")
387
388        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=1)
389        if int(wpas.get_bss(hapd1.own_addr())['tsf']) <= tsf1:
390            raise Exception("Probe for link ID did not update BSS")
391        tsf0 = int(wpas.get_bss(hapd0.own_addr())['tsf'])
392        tsf1 = int(wpas.get_bss(hapd1.own_addr())['tsf'])
393
394        # Probing the wrong link ID should not update second link
395        _eht_mld_probe_req(wpas, hapd0, tsf0, link_id=4)
396        if int(wpas.get_bss(hapd1.own_addr())['tsf']) != tsf1:
397            raise Exception("Probe for other link ID not updated BSS")
398
399def test_eht_mld_owe_two_links(dev, apdev):
400    """EHT MLD AP with MLD client OWE connection using two links"""
401    _eht_mld_owe_two_links(dev, apdev)
402
403def test_eht_mld_owe_two_links_scan_second(dev, apdev):
404    """EHT MLD AP with MLD client OWE connection using two links; scan only second"""
405    _eht_mld_owe_two_links(dev, apdev, scan_only_second_link=True)
406
407@long_duration_test
408def test_eht_mld_owe_two_links_no_assoc_timeout(dev, apdev):
409    """Verify that AP MLD does not time out two link association"""
410    _eht_mld_owe_two_links(dev, apdev, wait_for_timeout=True)
411
412def test_eht_mld_owe_two_links_reconf_remove_extra_link(dev, apdev):
413    """AP MLD with MLD client OWE connection with one not-advertised link removed in reconf"""
414    reconf_mle = "ff0b6b" + "0200" + "01" + "0005420003ffff"
415    _eht_mld_owe_two_links(dev, apdev, reconf_mle=reconf_mle)
416
417def test_eht_mld_owe_two_links_reconf_remove_link(dev, apdev):
418    """AP MLD with MLD client OWE connection with one link removed in reconf"""
419    reconf_mle = "ff0b6b" + "0200" + "01" + "0005400003ffff"
420    _eht_mld_owe_two_links(dev, apdev, reconf_mle=reconf_mle,
421                           only_second=True, scan_only_second_link=True)
422
423def test_eht_mld_owe_two_links_reconf_mle_ext(dev, apdev):
424    """AP MLD with MLD client OWE connection and reconf MLE extensibility"""
425    reconf_mle = "ff106b" + "0200" + "05aaaaaaaa" + "0006420004ffffaa"
426    _eht_mld_owe_two_links(dev, apdev, reconf_mle=reconf_mle)
427
428def test_eht_mld_owe_two_links_reconf_mle_ext_only_second(dev, apdev):
429    """AP MLD with MLD client OWE connection and reconf MLE extensibility"""
430    reconf_mle = "ff106b" + "0200" + "05aaaaaaaa" + "0006400004ffffaa"
431    _eht_mld_owe_two_links(dev, apdev, reconf_mle=reconf_mle,
432                           only_second=True, scan_only_second_link=True)
433
434def _eht_mld_owe_two_links(dev, apdev, second_link_disabled=False,
435                           only_one_link=False, scan_only_second_link=False,
436                           wait_for_timeout=False, reconf_mle=None,
437                           only_second=False):
438    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
439        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
440        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
441
442        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
443        wpas.interface_add(wpas_iface)
444        check_owe_capab(wpas)
445
446        ssid = "mld_ap_owe_two_link"
447        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
448        if reconf_mle:
449            params['vendor_elements'] = reconf_mle
450
451        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
452
453        params['channel'] = '6'
454        if second_link_disabled:
455            params['mld_indicate_disabled'] = '1'
456
457        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
458        # Check legacy client connection
459        dev[0].connect(ssid, scan_freq="2437", key_mgmt="OWE", ieee80211w="2")
460
461        if only_one_link:
462            link0 = hapd0.get_status_field("link_addr")
463            wpas.set("bssid_filter", link0)
464        scan_freq = "2437" if scan_only_second_link else "2412 2437"
465        wpas.connect(ssid, scan_freq=scan_freq, key_mgmt="OWE",
466                     ieee80211w="2")
467
468        active_links = 3
469        valid_links = 3
470        if second_link_disabled:
471            dlinks = _eht_dormant_links(wpas)
472            if dlinks != 2:
473                raise Exception("Unexpected dormant links")
474            active_links = 1
475        if only_one_link:
476            active_links = 1
477            valid_links = 1
478
479        if only_second:
480            eht_verify_status(wpas, hapd1, 2437, 20, is_ht=True, mld=True,
481                              valid_links=2,
482                              active_links=2)
483        else:
484            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
485                              valid_links=valid_links,
486                              active_links=active_links)
487        eht_verify_wifi_version(wpas)
488        traffic_test(wpas, hapd0)
489
490        if not second_link_disabled:
491            traffic_test(wpas, hapd1)
492
493        if only_one_link:
494            wpas.set("bssid_filter", "")
495
496        if wait_for_timeout:
497            ev = wpas.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=65)
498            if ev is not None:
499                raise Exception("Unexpected disconnection")
500
501def test_eht_mld_owe_two_links_one_disabled(dev, apdev):
502    """AP MLD with MLD client OWE connection when one of the AP MLD links is disabled"""
503    _eht_mld_owe_two_links(dev, apdev, second_link_disabled=True)
504
505def test_eht_mld_owe_two_links_only_one_negotiated(dev, apdev):
506    """AP MLD with MLD client OWE connection when only one of the links is negotiated"""
507    _eht_mld_owe_two_links(dev, apdev, only_one_link=True)
508
509def test_eht_mld_sae_single_link(dev, apdev):
510    """EHT MLD AP with MLD client SAE H2E connection using single link"""
511    run_eht_mld_sae_single_link(dev, apdev)
512
513def test_eht_mld_sae_single_link_anti_clogging_token(dev, apdev):
514    """EHT MLD AP with MLD client SAE H2E connection using single link and SAE anti-clogging token"""
515    run_eht_mld_sae_single_link(dev, apdev, anti_clogging_token=True)
516
517def run_eht_mld_sae_single_link(dev, apdev, anti_clogging_token=False):
518    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
519            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
520        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
521        wpas.interface_add(wpas_iface)
522        check_sae_capab(wpas)
523
524        passphrase = 'qwertyuiop'
525        ssid = "mld_ap_sae_single_link"
526        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE",
527                                        mfp="2", pwe='2')
528        if anti_clogging_token:
529            params['sae_anti_clogging_threshold'] = '0'
530
531        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
532
533        wpas.set("sae_pwe", "1")
534        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
535                     key_mgmt="SAE", ieee80211w="2")
536
537        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
538                          valid_links=1, active_links=1)
539        eht_verify_wifi_version(wpas)
540        traffic_test(wpas, hapd0)
541
542def run_eht_mld_sae_two_links(dev, apdev, beacon_prot="1",
543                              disable_enable=False, bridge=False):
544    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
545        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
546
547        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
548        wpas.interface_add(wpas_iface)
549        check_sae_capab(wpas)
550
551        passphrase = 'qwertyuiop'
552        ssid = "mld_ap_sae_two_link"
553        params = eht_mld_ap_wpa2_params(ssid, passphrase,
554                                        key_mgmt="SAE", mfp="2", pwe='1',
555                                        beacon_prot=beacon_prot,
556                                        bridge=bridge)
557
558        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
559
560        params['channel'] = '6'
561
562        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
563
564        if bridge:
565            hapd0.cmd_execute(['brctl', 'setfd', 'ap-br0', '0'])
566            hapd0.cmd_execute(['ip', 'link', 'set', 'dev', 'ap-br0', 'up'])
567
568        wpas.set("sae_pwe", "1")
569
570        # The first authentication attempt tries to use group 20 and the
571        # authentication is expected to fail. The next authentication should
572        # use group 19 and succeed.
573        wpas.set("sae_groups", "20 19")
574
575        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
576                     key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
577
578        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
579                          valid_links=3, active_links=3)
580        eht_verify_wifi_version(wpas)
581
582        if wpas.get_status_field('sae_group') != '19':
583            raise Exception("Expected SAE group not used")
584
585        traffic_test(wpas, hapd0, ifname2='ap-br0' if bridge else None)
586        traffic_test(wpas, hapd1, ifname2='ap-br0' if bridge else None)
587
588        if disable_enable:
589            if "OK" not in hapd0.request("DISABLE_MLD"):
590                raise Exception("DISABLE_MLD failed")
591            ev = hapd0.wait_event(["AP-DISABLED"], timeout=1)
592            if ev is None:
593                raise Exception("AP-DISABLED not received (0)")
594            ev = hapd1.wait_event(["AP-DISABLED"], timeout=1)
595            if ev is None:
596                raise Exception("AP-DISABLED not received (1)")
597
598            # mac80211 does not seem to detect beacon loss or deauthentication
599            # in non-AP MLD case?! For now, ignore that and just force
600            # disconnection locally on the STA.
601            wpas.request("DISCONNECT")
602            wpas.wait_disconnected()
603
604            if "OK" not in hapd0.request("ENABLE_MLD"):
605                raise Exception("ENABLE_MLD failed")
606            ev = hapd0.wait_event(["AP-ENABLED"], timeout=1)
607            if ev is None:
608                raise Exception("AP-ENABLED not received (0)")
609            ev = hapd1.wait_event(["AP-ENABLED"], timeout=1)
610            if ev is None:
611                raise Exception("AP-ENABLED not received (1)")
612
613            # TODO: Figure out why this fails without PMKSA_FLUSH. Things should
614            # fall back to full SAE from failed PMKSA caching attempt
615            # automatically.
616            wpas.request("PMKSA_FLUSH")
617
618            # flush the BSS table before reconnect as otherwise the old
619            # AP MLD BSSs would be in the BSS list
620            wpas.request("BSS_FLUSH 0")
621            wpas.request("RECONNECT")
622            wpas.wait_connected()
623            hapd0.wait_sta()
624            hapd1.wait_sta()
625            traffic_test(wpas, hapd0, ifname2='ap-br0' if bridge else None)
626            traffic_test(wpas, hapd1, ifname2='ap-br0' if bridge else None)
627
628def test_eht_mld_sae_two_links(dev, apdev):
629    """EHT MLD AP with MLD client SAE H2E connection using two links"""
630    run_eht_mld_sae_two_links(dev, apdev)
631
632def test_eht_mld_sae_two_links_no_beacon_prot(dev, apdev):
633    """EHT MLD AP with MLD client SAE H2E connection using two links and no beacon protection"""
634    run_eht_mld_sae_two_links(dev, apdev, beacon_prot="0")
635
636def test_eht_mld_sae_two_links_disable_enable(dev, apdev):
637    """AP MLD with two links and disabling/enabling full AP MLD"""
638    run_eht_mld_sae_two_links(dev, apdev, disable_enable=True)
639
640def test_eht_mld_sae_two_links_bridge(dev, apdev):
641    """AP MLD with two links in a bridge"""
642    run_eht_mld_sae_two_links(dev, apdev, bridge=True)
643
644def test_eht_mld_sae_ext_one_link(dev, apdev):
645    """EHT MLD AP with MLD client SAE-EXT H2E connection using single link"""
646    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
647        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
648
649        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
650        wpas.interface_add(wpas_iface)
651        check_sae_capab(wpas)
652
653        passphrase = 'qwertyuiop'
654        ssid = "mld_ap_sae_ext_single_link"
655        params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE-EXT-KEY")
656
657        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
658
659        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412",
660                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
661
662        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
663                          valid_links=1, active_links=1)
664        eht_verify_wifi_version(wpas)
665        traffic_test(wpas, hapd0)
666
667def test_eht_mld_sae_ext_two_links(dev, apdev):
668    """EHT MLD AP with MLD client SAE-EXT H2E connection using two links"""
669    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
670        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
671
672        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
673        wpas.interface_add(wpas_iface)
674        check_sae_capab(wpas)
675
676        passphrase = 'qwertyuiop'
677        ssid = "mld_ap_sae_two_link"
678        params = eht_mld_ap_wpa2_params(ssid, passphrase,
679                                        key_mgmt="SAE-EXT-KEY")
680
681        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
682
683        params['channel'] = '6'
684
685        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
686
687        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
688                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
689
690        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
691                          valid_links=3, active_links=3)
692        eht_verify_wifi_version(wpas)
693        traffic_test(wpas, hapd0)
694        traffic_test(wpas, hapd1)
695
696def test_eht_mld_sae_legacy_client(dev, apdev):
697    """EHT MLD AP with legacy client SAE H2E connection"""
698    check_sae_capab(dev[0])
699    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
700        passphrase = 'qwertyuiop'
701        ssid = "mld_ap_sae_two_link"
702        params = eht_mld_ap_wpa2_params(ssid, passphrase,
703                                        key_mgmt="SAE", mfp="2", pwe='1')
704
705        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
706
707        params['channel'] = '6'
708
709        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
710
711        try:
712            dev[0].set("sae_groups", "")
713            dev[0].set("sae_pwe", "1")
714            dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
715                           key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
716            logger.info("wpa_supplicant STATUS:\n" + dev[0].request("STATUS"))
717            bssid = dev[0].get_status_field("bssid")
718            if hapd0.own_addr() == bssid:
719                hapd0.wait_sta();
720            elif hapd1.own_addr() == bssid:
721                hapd1.wait_sta();
722            else:
723                raise Exception("Unknown BSSID: " + bssid)
724
725            eht_verify_status(dev[0], hapd0, 2412, 20, is_ht=True)
726            traffic_test(dev[0], hapd0)
727        finally:
728            dev[0].set("sae_groups", "")
729            dev[0].set("sae_pwe", "0")
730
731def test_eht_mld_sae_transition(dev, apdev):
732    """EHT MLD AP in SAE/PSK transition mode with MLD client connection using two links"""
733    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
734        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
735
736        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
737        wpas.interface_add(wpas_iface)
738        check_sae_capab(wpas)
739
740        passphrase = 'qwertyuiop'
741        ssid = "mld_ap_sae_two_link"
742        params = eht_mld_ap_wpa2_params(ssid, passphrase,
743                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
744                                        mfp="1")
745
746        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
747
748        params['channel'] = '6'
749
750        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
751
752        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
753                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
754
755        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
756                          valid_links=3, active_links=3)
757        eht_verify_wifi_version(wpas)
758        traffic_test(wpas, hapd0)
759        traffic_test(wpas, hapd1)
760
761        dev[0].set("sae_groups", "")
762        dev[0].connect(ssid, sae_password=passphrase, scan_freq="2412",
763                       key_mgmt="SAE", ieee80211w="2", beacon_prot="1")
764        dev[1].connect(ssid, psk=passphrase, scan_freq="2412",
765                       key_mgmt="WPA-PSK", ieee80211w="0")
766
767def test_eht_mld_ptk_rekey(dev, apdev):
768    """EHT MLD AP and PTK rekeying with MLD client connection using two links"""
769    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
770        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
771
772        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
773        wpas.interface_add(wpas_iface)
774        check_sae_capab(wpas)
775
776        passphrase = 'qwertyuiop'
777        ssid = "mld_ap_sae_two_link"
778        params = eht_mld_ap_wpa2_params(ssid, passphrase,
779                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
780                                        mfp="1")
781        params['wpa_ptk_rekey'] = '5'
782
783        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
784
785        params['channel'] = '6'
786
787        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
788
789        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
790                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
791        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
792        if ev0 is None:
793            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
794        traffic_test(wpas, hapd0)
795        traffic_test(wpas, hapd1)
796
797        ev = wpas.wait_event(["WPA: Key negotiation completed",
798                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
799        if ev is None:
800            raise Exception("PTK rekey timed out")
801        if "CTRL-EVENT-DISCONNECTED" in ev:
802            raise Exception("Disconnect instead of rekey")
803
804        time.sleep(0.1)
805        traffic_test(wpas, hapd0)
806        traffic_test(wpas, hapd1)
807
808def test_eht_mld_gtk_rekey(dev, apdev):
809    """AP MLD and GTK rekeying with MLD client connection using two links"""
810    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
811        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
812
813        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
814        wpas.interface_add(wpas_iface)
815        check_sae_capab(wpas)
816
817        passphrase = 'qwertyuiop'
818        ssid = "mld_ap_sae_two_link"
819        params = eht_mld_ap_wpa2_params(ssid, passphrase,
820                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
821                                        mfp="1")
822        params['wpa_group_rekey'] = '5'
823
824        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
825
826        params['channel'] = '6'
827
828        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
829
830        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
831                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
832        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
833        if ev0 is None:
834            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
835        traffic_test(wpas, hapd0)
836        traffic_test(wpas, hapd1)
837
838        for i in range(2):
839            ev = wpas.wait_event(["MLO RSN: Group rekeying completed",
840                                  "CTRL-EVENT-DISCONNECTED"], timeout=10)
841            if ev is None:
842                raise Exception("GTK rekey timed out")
843            if "CTRL-EVENT-DISCONNECTED" in ev:
844                raise Exception("Disconnect instead of rekey")
845
846            time.sleep(0.1)
847            traffic_test(wpas, hapd0)
848            traffic_test(wpas, hapd1)
849
850def test_eht_mld_gtk_rekey_failure(dev, apdev):
851    """AP MLD and GTK rekeying failure with MLD client connection using two links"""
852    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
853        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
854
855        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
856        wpas.interface_add(wpas_iface)
857        check_sae_capab(wpas)
858
859        passphrase = 'qwertyuiop'
860        ssid = "mld_ap_sae_two_link"
861        params = eht_mld_ap_wpa2_params(ssid, passphrase,
862                                        key_mgmt="SAE-EXT-KEY SAE WPA-PSK WPA-PSK-SHA256",
863                                        mfp="1")
864        params['wpa_group_rekey'] = '5'
865
866        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
867
868        params['channel'] = '6'
869
870        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
871
872        wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
873                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
874        ev0 = hapd0.wait_event(["AP-STA-CONNECT"], timeout=1)
875        if ev0 is None:
876            ev1 = hapd1.wait_event(["AP-STA-CONNECT"], timeout=1)
877
878        # Force group handshake to time out
879        hapd0.request("SET ext_eapol_frame_io 1")
880        hapd1.request("SET ext_eapol_frame_io 1")
881
882        ev = wpas.wait_event(["CTRL-EVENT-DISCONNECTED"], timeout=20)
883        if ev is None:
884            raise Exception("Disconnection not reported")
885        if "reason=16" not in ev.split():
886            raise Exception("Unexpected disconnection reason: " + ev)
887
888        hapd0.dump_monitor()
889        hapd1.dump_monitor()
890
891        # Re-enable automatic EAPOL frame processing
892        hapd0.request("SET ext_eapol_frame_io 0")
893        hapd1.request("SET ext_eapol_frame_io 0")
894
895        wpas.wait_connected()
896
897def test_eht_ml_probe_req(dev, apdev):
898    """AP MLD with two links and non-AP MLD sending ML Probe Request"""
899    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
900        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
901
902        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
903        wpas.interface_add(wpas_iface)
904
905        passphrase = 'qwertyuiop'
906        ssid = "mld_ap_sae_two_link"
907        params = eht_mld_ap_wpa2_params(ssid, passphrase,
908                                        key_mgmt="SAE-EXT-KEY")
909
910        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
911
912        params['channel'] = '6'
913
914        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
915
916        bssid = hapd0.own_addr()
917        wpas.scan_for_bss(bssid, freq=2412)
918
919        time.sleep(1)
920        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0"
921        if "OK" not in wpas.request(cmd):
922            raise Exception("Failed to run: " + cmd)
923        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
924                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
925        if ev is None:
926            raise Exception("ML_PROBE_REQ did not result in scan results")
927
928        time.sleep(1)
929        cmd = "ML_PROBE_REQ bssid=" + bssid + " mld_id=0 link_id=2"
930        if "OK" not in wpas.request(cmd):
931            raise Exception("Failed to run: " + cmd)
932        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS",
933                              "CTRL-EVENT-SCAN-FAILED"], timeout=10)
934        if ev is None:
935            raise Exception("ML_PROBE_REQ did not result in scan results")
936
937def test_eht_mld_connect_probes(dev, apdev, params):
938    """MLD client sends ML probe to connect to not discovered links"""
939    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
940        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
941
942        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
943        wpas.interface_add(wpas_iface)
944        check_sae_capab(wpas)
945
946        ssid = "mld_ap"
947        passphrase = 'qwertyuiop'
948        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
949                                             key_mgmt="SAE", pwe='2')
950        link_params['channel'] = '1'
951        link_params['bssid'] = '00:11:22:33:44:01'
952        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link_params)
953
954        link_params['channel'] = '6'
955        link_params['bssid'] = '00:11:22:33:44:02'
956        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link_params)
957
958        wpas.set("sae_pwe", "1")
959        wpas.connect(ssid, sae_password= passphrase, ieee80211w="2",
960                     key_mgmt="SAE", scan_freq="2412")
961
962        filters = ['wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.length == 8 && wlan.ext_tag.number == 107 && wlan.eht.multi_link.control == 0x0011 && wlan.eht.multi_link.common_info.length == 2 && wlan.eht.multi_link.common_info.mld_id == 0 && wlan.eht.multi_link.sta_profile.subelt_id == 0 && wlan.eht.multi_link.sta_profile.subelt_len == 2 && wlan.eht.multi_link.type_1.sta_profile_count == 1 && wlan.eht.multi_link.sta_profile_id_list == \'1\'',
963                   'wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.length == 8 && wlan.ext_tag.number == 107 && wlan.eht.multi_link_control == 0x0011 && wlan.eht.multi_link.common_info.length == 2 && wlan.eht.multi_link.common_info.mld_id == 0 && wlan.eht.multi_link.sta_profile.subelt_id == 0 && wlan.eht.multi_link.sta_profile.subelt_len == 2 && wlan.eht.multi_link.type_1.sta_profile_count == 1 && wlan.eht.multi_link.sta_profile_id_list == 1',
964                   'wlan.fc.type_subtype == 0x0004 && wlan.ext_tag.number == 107 && wlan.ext_tag.data == 11:00:02:00:00:02:11:00']
965        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
966                         filters, display=['frame.number'])
967        if not out.splitlines():
968            raise Exception('ML probe request not found')
969
970        # Probe Response frame has the ML element, which will be fragmented
971        out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
972                         "wlan.fc.type_subtype == 0x0005 && wlan.ext_tag.number == 107 && wlan.ext_tag.length == 254",
973                         display=['frame.number'])
974        if not out.splitlines():
975            # This requires new tshark (e.g., 4.0.6); for now, ignore the issue
976            # to avoid forcing such upgrade.
977            logger.info('ML probe response not found')
978            #raise Exception('ML probe response not found')
979
980        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
981                          valid_links=3, active_links=3)
982        traffic_test(wpas, hapd0)
983        traffic_test(wpas, hapd1)
984
985def test_eht_tx_link_rejected_connect_other(dev, apdev, params):
986    """EHT MLD AP with MLD client being rejected on TX link, but then connecting on second link"""
987    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
988        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
989
990        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
991        wpas.interface_add(wpas_iface)
992        check_sae_capab(wpas)
993
994        ssid = "mld_ap"
995        passphrase = 'qwertyuiop'
996        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
997                                             key_mgmt="SAE", pwe='2')
998        link_params['channel'] = '1'
999        link_params['bssid'] = '00:11:22:33:44:01'
1000        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link_params)
1001
1002        link_params['channel'] = '6'
1003        link_params['bssid'] = '00:11:22:33:44:02'
1004        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link_params)
1005
1006        wpas.set("sae_pwe", "1")
1007        with fail_test(hapd0, 1, "hostapd_get_aid"):
1008            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
1009                         key_mgmt="SAE", scan_freq="2412")
1010
1011        eht_verify_status(wpas, hapd1, 2437, 20, is_ht=True, mld=True,
1012                          valid_links=2, active_links=2)
1013        traffic_test(wpas, hapd0)
1014        traffic_test(wpas, hapd1)
1015
1016def test_eht_all_links_rejected(dev, apdev, params):
1017    """EHT MLD AP with MLD client ignores all rejected links"""
1018    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1019        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1020
1021        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1022        wpas.interface_add(wpas_iface)
1023        check_sae_capab(wpas)
1024
1025        ssid = "mld_ap"
1026        passphrase = 'qwertyuiop'
1027        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
1028                                             key_mgmt="SAE", pwe='2')
1029        link_params['channel'] = '1'
1030        link_params['bssid'] = '00:11:22:33:44:01'
1031        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link_params)
1032
1033        link_params['channel'] = '6'
1034        link_params['bssid'] = '00:11:22:33:44:02'
1035        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link_params)
1036        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
1037        wpas.set("sae_pwe", "1")
1038
1039        with fail_test(hapd0, 1, "hostapd_get_aid",
1040                       1, "hostapd_process_assoc_ml_info"):
1041            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
1042                         key_mgmt="SAE", scan_freq="2412", wait_connect=False)
1043            ev = wpas.wait_event(['CTRL-EVENT-ASSOC-REJECT'])
1044            if not ev:
1045                raise Exception('Rejection not found')
1046
1047            ev1 = wpas.wait_event(['Added BSSID'])
1048            ev2 = wpas.wait_event(['Added BSSID'])
1049            if (not ev1 or not ev2) or \
1050                not ((hapd0.own_addr() in ev1 and hapd1.own_addr() in ev2) or
1051                     (hapd1.own_addr() in ev1 and hapd0.own_addr() in ev2)):
1052                raise Exception('Not all BSSs were added to the ignore list')
1053
1054            # After this message, a new scan clears the ignore and the STA
1055            # connects.
1056            wpas.wait_connected(timeout=15)
1057
1058def test_eht_connect_invalid_link(dev, apdev, params):
1059    """EHT MLD AP where one link is incorrectly configured and rejected by mac80211"""
1060    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1061        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1062
1063        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1064        wpas.interface_add(wpas_iface)
1065        check_sae_capab(wpas)
1066
1067        ssid = "mld_ap"
1068        passphrase = 'qwertyuiop'
1069        ssid = "mld_ap"
1070        passphrase = 'qwertyuiop'
1071        link_params = eht_mld_ap_wpa2_params(ssid, passphrase, mfp="2",
1072                                             key_mgmt="SAE", pwe='2')
1073        link_params['channel'] = '1'
1074        link_params['bssid'] = '00:11:22:33:44:01'
1075        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link_params)
1076
1077        link_params['channel'] = '6'
1078        link_params['bssid'] = '00:11:22:33:44:02'
1079        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link_params)
1080
1081        # We scan for both APs, then try to connect to link 0, but only the
1082        # second attempt will work if mac80211 rejects the second link.
1083        wpas.set("mld_connect_bssid_pref", "00:11:22:33:44:01")
1084        wpas.set("sae_pwe", "1")
1085        with fail_test(wpas, 1, "assoc;wpa_driver_nl80211_associate",
1086                             2, "link;wpa_driver_nl80211_associate"):
1087            wpas.connect(ssid, sae_password=passphrase, ieee80211w="2",
1088                         key_mgmt="SAE", scan_freq="2412")
1089
1090        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1091                          valid_links=1, active_links=1)
1092
1093        filters = ['wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.length == 11 && wlan.ext_tag.number == 107 && wlan.eht.multi_link.control == 0x0100 && wlan.eht.multi_link.common_info.length == 9 && wlan.eht.multi_link.common_info.mld_mac_address == %s && wlan.eht.multi_link.common_info.mld_capabilities == 0x0000' % wpas.own_addr(),
1094                   'wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.length == 11 && wlan.ext_tag.number == 107 && wlan.eht.multi_link_control == 0x0100 && wlan.eht.multi_link.common_info.length == 9 && wlan.eht.multi_link.common_info.mld_mac_address == %s && wlan.eht.multi_link.common_info.mld_capabilities == 0x0000' % wpas.own_addr(),
1095                   'wlan.fc.type_subtype == 0x0000 && wlan.ext_tag.data == 00:01:09:%s:00:00' % wpas.own_addr()]
1096        out = run_tshark(os.path.join(params['logdir'], 'hwsim0.pcapng'),
1097                         filters, display=['frame.number'])
1098        if not out.splitlines():
1099            raise Exception('Association request send by mac80211 had unexpected ML element content (probably it contained a second link)')
1100
1101def test_eht_mld_link_removal(dev, apdev):
1102    """EHT MLD with two links. Links removed during association"""
1103
1104    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1105        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1106
1107        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1108        wpas.interface_add(wpas_iface)
1109        check_owe_capab(wpas)
1110
1111        ssid = "mld_ap_owe_two_link"
1112        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1113        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1114
1115        params['channel'] = '6'
1116        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1117
1118        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1119                     ieee80211w="2")
1120        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1121                          valid_links=3, active_links=3)
1122        eht_verify_wifi_version(wpas)
1123        traffic_test(wpas, hapd0)
1124
1125        logger.info("Disable the 2nd link in 4 beacon intervals")
1126        hapd1.link_remove(4)
1127        time.sleep(0.6)
1128
1129        logger.info("Test traffic after 2nd link disabled")
1130        traffic_test(wpas, hapd0)
1131
1132        if "OK" not in hapd0.request("REKEY_GTK"):
1133            raise Exception("REKEY_GTK failed")
1134
1135        ev = wpas.wait_event(["MLO RSN: Group rekeying completed"], timeout=2)
1136        if ev is None:
1137            raise Exception("GTK rekey timed out")
1138
1139        traffic_test(wpas, hapd0)
1140
1141        logger.info("Disable the 1st link in 20 beacon intervals")
1142        hapd0.link_remove(20)
1143        time.sleep(1)
1144
1145        logger.info("Verify that traffic is valid before the link is removed")
1146        traffic_test(wpas, hapd0)
1147        time.sleep(2)
1148
1149        logger.info("Test traffic after 1st link disabled")
1150        traffic_test(wpas, hapd0, success=False)
1151
1152def test_eht_mld_bss_trans_mgmt_link_removal_imminent(dev, apdev):
1153    """EHT MLD with two links. BSS transition management with link removal imminent"""
1154
1155    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1156        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1157
1158        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1159        wpas.interface_add(wpas_iface)
1160        check_owe_capab(wpas)
1161
1162        ssid = "mld_ap_owe_two_link"
1163        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1164        params["bss_transition"] = "1"
1165        params["mbo"] = "1"
1166
1167        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1168
1169        params['channel'] = '6'
1170
1171        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1172
1173        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1174                     ieee80211w="2")
1175        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1176                          valid_links=3, active_links=3)
1177        eht_verify_wifi_version(wpas)
1178        hapd0.wait_sta()
1179        hapd1.wait_sta()
1180        traffic_test(wpas, hapd0)
1181
1182        addr = wpas.own_addr()
1183        cmd = "BSS_TM_REQ " + addr + " disassoc_timer=3 disassoc_imminent=1 link_removal_imminent=1 bss_term=0,1"
1184        if "OK" not in hapd0.request(cmd):
1185            raise Exception("BSS_TM_REQ command failed")
1186
1187        # Only one link is terminate, so the STA is expected to remain
1188        # associated and not start a scan.
1189        ev = hapd0.wait_event(['BSS-TM-RESP'], timeout=5)
1190        # For now, allow this to pass without the BSS TM response since that
1191        # functionality with MLD needs a recent kernel change.
1192        #if ev is None:
1193        #    raise Exception("No BSS TM response received")
1194        if ev and "status_code=0" not in ev:
1195            raise Exception("Unexpected BSS TM response contents: " + ev)
1196
1197        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED",
1198                              "CTRL-EVENT-DISCONNECTED"], timeout=10)
1199        if ev is not None:
1200            raise Exception("Unexpected action on STA: " + ev)
1201
1202def send_check(hapd, frame, no_tx_status=False):
1203        cmd = "MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame="
1204        hapd.request(cmd + frame)
1205        if no_tx_status:
1206            return
1207        ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=1)
1208        if ev is None:
1209            raise Exception("No TX status")
1210
1211def test_eht_ap_mld_proto(dev, apdev):
1212    """AP MLD protocol testing"""
1213    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1214        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface):
1215
1216        ssid = "mld_ap_owe_two_link"
1217        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1218
1219        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1220
1221        params['channel'] = '6'
1222
1223        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1224
1225        ap_mld_addr = hapd0.get_status_field("mld_addr[0]").replace(':', '')
1226        bssid0 = hapd0.own_addr().replace(':', '')
1227        bssid1 = hapd1.own_addr().replace(':', '')
1228
1229        time.sleep(1)
1230        hapd0.set("ext_mgmt_frame_handling", "1")
1231        hapd1.set("ext_mgmt_frame_handling", "1")
1232
1233        # Truncated EML missing MLD Capabilities And operations field
1234        hapd0.note("Truncated EML missing MLD Capabilities And operations field")
1235        addr0 = "021122334400"
1236        addr1 = "021122334401"
1237        mld_addr = "02112233440f"
1238        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1239        mle = "ff0a6b000007" + mld_addr
1240        auth = hdr + "0000" + "0100" + "0000" + mle
1241        send_check(hapd0, auth)
1242
1243        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1244        ssid = "00136d6c645f61705f6f77655f74776f5f6c696e6b"
1245        supp_rates = "010802040b160c121824"
1246        ext_supp_rates = "32043048606c"
1247        rsne = "301a0100000fac040100000fac040100000fac12cc000000000fac06"
1248        ht_capab = "2d1afe131bffff000000000000000000000100000000000000000000"
1249        ext_capab = "7f0a04004a02014000400001"
1250        he_capab = "ff16230178c81a400000bfce0000000000000000fafffaff"
1251        eht_capab = "ff126c07007c0000feffff7f0100888888880000"
1252        supp_op_classes = "3b155151525354737475767778797a7b7c7d7e7f808182"
1253        dh_param = "ff23201300ea85e693343a079500cf4d461011a0ff90ec4de1af40165adbea94a3f36eb071"
1254        wmm = "dd070050f202000100"
1255        assocreq_start = "3004" + "0500" + ssid + supp_rates + ext_supp_rates + rsne + ht_capab + ext_capab + he_capab
1256        assocreq_end = eht_capab + supp_op_classes + dh_param + wmm
1257
1258        # --> Not enough bytes for common info
1259        mle = "ff0a6b000109" + mld_addr
1260        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1261
1262        # Truncated Non-Inheritance element
1263        hapd0.note("Truncated Non-Inheritance element")
1264        addr0 = "021122334410"
1265        addr1 = "021122334411"
1266        mld_addr = "02112233441f"
1267        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1268        mle = "ff0a6b000007" + mld_addr
1269        auth = hdr + "0000" + "0100" + "0000" + mle
1270        send_check(hapd0, auth)
1271
1272        # --> MLD: Invalid inheritance
1273        mle = "ff7d6b000109" + mld_addr + "0000"
1274        mle += "0067" + "3100" + "07" + addr1
1275        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1276        # Non-Inhericance element
1277        mle += "ff023800"
1278        # Unknown optional subelement
1279        mle += "aa00"
1280        # Vendor-Specific subelement
1281        mle += "dd0411223344"
1282        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1283        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end,
1284                   no_tx_status=True)
1285
1286        # Empty Non-Inheritance element
1287        hapd0.note("Empty Non-Inheritance element")
1288        addr0 = "021122334420"
1289        addr1 = "021122334421"
1290        mld_addr = "02112233442f"
1291        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1292        mle = "ff0a6b000007" + mld_addr
1293        auth = hdr + "0000" + "0100" + "0000" + mle
1294        send_check(hapd0, auth)
1295
1296        mle = "ff7e6b000109" + mld_addr + "0000"
1297        mle += "0068" + "3100" + "07" + addr1
1298        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1299        # Non-Inhericance element
1300        mle += "ff03380000"
1301        # Unknown optional subelement
1302        mle += "aa00"
1303        # Vendor-Specific subelement
1304        mle += "dd0411223344"
1305        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1306        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1307
1308        # Non-Inheritance element
1309        hapd0.note("Non-Inheritance element")
1310        addr0 = "021122334430"
1311        addr1 = "021122334431"
1312        mld_addr = "02112233443f"
1313        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1314        mle = "ff0a6b000007" + mld_addr
1315        auth = hdr + "0000" + "0100" + "0000" + mle
1316        send_check(hapd0, auth)
1317
1318        mle = "ff9e6b000109" + mld_addr + "0000"
1319        mle += "0088" + "3100" + "07" + addr1
1320        mle += "3004" + "010802040b160c121824" + "32043048606c" + "2d1afe131bffff000000000000000000000100000000000000000000" + "ff16230178c81a400000bfce0000000000000000fafffaff" + "ff126c07007c0000feffff7f0100888888880000"
1321        # Non-Inhericance element
1322        mle += "ff2338" + "1010032a362137387172756b548bedeff0" + "106b01020304050607080c0d21643b3a36"
1323        # Unknown optional subelement
1324        mle += "aa00"
1325        # Vendor-Specific subelement
1326        mle += "dd0411223344"
1327        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1328        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1329
1330        # No Non-Inheritance element
1331        hapd0.note("No Non-Inheritance element")
1332        addr0 = "021122334440"
1333        addr1 = "021122334441"
1334        mld_addr = "02112233444f"
1335        hdr = "b0003a01" + bssid0 + addr0 + bssid0 + "1000"
1336        mle = "ff0a6b000007" + mld_addr
1337        auth = hdr + "0000" + "0100" + "0000" + mle
1338        send_check(hapd0, auth)
1339
1340        mle = "ff716b000109" + mld_addr + "0000"
1341        mle += "0063" + "3100" + "07" + addr1
1342        mle += "3004010802040b160c12182432043048606c2d1afe131bffff000000000000000000000100000000000000000000ff16230178c81a400000bfce0000000000000000fafffaffff126c07007c0000feffff7f0100888888880000"
1343        hdr = "00000000" + bssid0 + mld_addr + bssid0 + "1000"
1344        send_check(hapd0, hdr + assocreq_start + mle + assocreq_end)
1345
1346def _5ghz_chanwidth_to_bw(op):
1347    return {
1348        0: "40",
1349        1: "80",
1350        2: "160",
1351        3: "80+80",
1352    }.get(op, "20")
1353
1354def _test_eht_5ghz(dev, apdev, channel, chanwidth, ccfs1, ccfs2=0,
1355                   eht_oper_puncturing_override=None,
1356                   he_ccfs1=None, he_oper_chanwidth=None):
1357    if he_ccfs1 is None:
1358        he_ccfs1 = ccfs1
1359    if he_oper_chanwidth is None:
1360        he_oper_chanwidth = chanwidth
1361
1362    try:
1363        params = {"ssid": "eht",
1364                  "country_code": "US",
1365                  "hw_mode": "a",
1366                  "channel": str(channel),
1367                  "ieee80211n": "1",
1368                  "ieee80211ac": "1",
1369                  "ieee80211ax": "1",
1370                  "ieee80211be": "1",
1371                  "vht_oper_chwidth": str(he_oper_chanwidth),
1372                  "vht_oper_centr_freq_seg0_idx": str(he_ccfs1),
1373                  "vht_oper_centr_freq_seg1_idx": str(ccfs2),
1374                  "he_oper_chwidth": str(he_oper_chanwidth),
1375                  "he_oper_centr_freq_seg0_idx": str(he_ccfs1),
1376                  "he_oper_centr_freq_seg1_idx": str(ccfs2),
1377                  "eht_oper_centr_freq_seg0_idx": str(ccfs1),
1378                  "eht_oper_chwidth": str(chanwidth)}
1379
1380        if he_oper_chanwidth == 0:
1381            if channel < he_ccfs1:
1382                  params["ht_capab"] = "[HT40+]"
1383            elif channel > he_ccfs1:
1384                  params["ht_capab"] = "[HT40-]"
1385        else:
1386            params["ht_capab"] = "[HT40+]"
1387            if he_oper_chanwidth == 2:
1388                params["vht_capab"] = "[VHT160]"
1389            elif he_oper_chanwidth == 3:
1390                params["vht_capab"] = "[VHT160-80PLUS80]"
1391
1392        if eht_oper_puncturing_override:
1393            params['eht_oper_puncturing_override'] = eht_oper_puncturing_override
1394
1395        freq = 5000 + channel * 5
1396        bw = "20"
1397        if chanwidth != 0 or channel != ccfs1:
1398            bw = _5ghz_chanwidth_to_bw(chanwidth)
1399
1400        hapd = hostapd.add_ap(apdev[0], params)
1401        dev[0].connect("eht", key_mgmt="NONE", scan_freq=str(freq))
1402        hapd.wait_sta()
1403
1404        eht_verify_status(dev[0], hapd, freq, bw, is_ht=True, is_vht=True)
1405        eht_verify_wifi_version(dev[0])
1406        hwsim_utils.test_connectivity(dev[0], hapd)
1407
1408        if eht_oper_puncturing_override:
1409            hapd.set("eht_oper_puncturing_override", "0x0")
1410            hapd.request("UPDATE_BEACON")
1411            time.sleep(1)
1412    finally:
1413        dev[0].request("DISCONNECT")
1414        dev[0].wait_disconnected()
1415        hapd.wait_sta_disconnect()
1416        set_world_reg(apdev[0], None, dev[0])
1417
1418def test_eht_5ghz_20mhz(dev, apdev):
1419    """EHT with 20 MHz channel width on 5 GHz"""
1420    _test_eht_5ghz(dev, apdev, 36, 0, 36, 0)
1421
1422def test_eht_5ghz_40mhz_low(dev, apdev):
1423    """EHT with 40 MHz channel width on 5 GHz - secondary channel above"""
1424    _test_eht_5ghz(dev, apdev, 36, 0, 38, 0)
1425
1426def test_eht_5ghz_40mhz_high(dev, apdev):
1427    """EHT with 80 MHz channel width on 5 GHz - secondary channel below"""
1428    _test_eht_5ghz(dev, apdev, 40, 0, 38, 0)
1429
1430def test_eht_5ghz_80mhz_1(dev, apdev):
1431    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
1432    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0)
1433
1434def test_eht_5ghz_80mhz_2(dev, apdev):
1435    """EHT with 80 MHz channel width on 5 GHz - primary=149"""
1436    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0)
1437
1438def test_eht_5ghz_80mhz_puncturing_override_1(dev, apdev):
1439    """EHT with 80 MHz channel width on 5 GHz - primary=36 - puncturing override (2nd)"""
1440
1441    # The 2nd 20 MHz is punctured
1442    _test_eht_5ghz(dev, apdev, 36, 1, 42, 0,
1443                   eht_oper_puncturing_override="0x0002",
1444                   he_ccfs1=36, he_oper_chanwidth=0)
1445
1446def test_eht_5ghz_80mhz_puncturing_override_2(dev, apdev):
1447    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (3rd)"""
1448
1449    # The 3rd 20 MHz is punctured
1450    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
1451                   eht_oper_puncturing_override="0x0004",
1452                   he_ccfs1=151, he_oper_chanwidth=0)
1453
1454def test_eht_5ghz_80mhz_puncturing_override_3(dev, apdev):
1455    """EHT with 80 MHz channel width on 5 GHz - primary=149 - puncturing override (4th)"""
1456
1457    # The 4th 20 MHz is punctured
1458    _test_eht_5ghz(dev, apdev, 149, 1, 155, 0,
1459                   eht_oper_puncturing_override="0x0008",
1460                   he_ccfs1=151, he_oper_chanwidth=0)
1461
1462def test_eht_5ghz_80p80mhz(dev, apdev):
1463    """EHT with 80+80 MHz channel width on 5 GHz"""
1464    _test_eht_5ghz(dev, apdev, 36, 3, 42, 155)
1465
1466def _6ghz_op_class_to_bw(op):
1467    return {
1468        131: "20",
1469        132: "40",
1470        133: "80",
1471        134: "160",
1472        137: "320",
1473    }.get(op, "20")
1474
1475def _test_eht_6ghz(dev, apdev, channel, op_class, ccfs1):
1476    check_sae_capab(dev[0])
1477
1478    # CA enables 320 MHz channels without NO-IR restriction
1479    dev[0].cmd_execute(['iw', 'reg', 'set', 'CA'])
1480    wait_regdom_changes(dev[0])
1481
1482    try:
1483        ssid = "eht_6ghz_sae"
1484        passphrase = "12345678"
1485        params = hostapd.he_wpa2_params(ssid=ssid, passphrase=passphrase)
1486        params["ieee80211be"] = "1"
1487        params["channel"] = str(channel)
1488        params["op_class"] = str(op_class)
1489        params["he_oper_centr_freq_seg0_idx"] = str(ccfs1)
1490        params["eht_oper_centr_freq_seg0_idx"] = str(ccfs1)
1491        params["country_code"] = "CA"
1492
1493        if not he_6ghz_supported():
1494            raise HwsimSkip("6 GHz frequency is not supported")
1495        if op_class == 137 and not eht_320mhz_supported():
1496            raise HwsimSkip("320 MHz channels are not supported")
1497
1498        hapd = hostapd.add_ap(apdev[0], params)
1499        status = hapd.get_status()
1500        logger.info("hostapd STATUS: " + str(status))
1501        if hapd.get_status_field("ieee80211ax") != "1":
1502            raise Exception("STATUS did not indicate ieee80211ax=1")
1503
1504        if hapd.get_status_field("ieee80211be") != "1":
1505            raise Exception("STATUS did not indicate ieee80211be=1")
1506
1507        dev[0].set("sae_pwe", "1")
1508        dev[0].set("sae_groups", "")
1509
1510        freq = 5950 + channel * 5
1511        bw = _6ghz_op_class_to_bw(op_class)
1512
1513        dev[0].connect(ssid, key_mgmt="SAE", psk=passphrase, ieee80211w="2",
1514                       scan_freq=str(freq))
1515        hapd.wait_sta()
1516
1517        eht_verify_status(dev[0], hapd, freq, bw)
1518        eht_verify_wifi_version(dev[0])
1519        sta = hapd.get_sta(dev[0].own_addr())
1520        if 'supp_op_classes' not in sta:
1521            raise Exception("supp_op_classes not indicated")
1522        supp_op_classes = binascii.unhexlify(sta['supp_op_classes'])
1523        if op_class not in supp_op_classes:
1524            raise Exception("STA did not indicate support for opclass %d" % op_class)
1525        hwsim_utils.test_connectivity(dev[0], hapd)
1526        dev[0].request("DISCONNECT")
1527        dev[0].wait_disconnected()
1528        hapd.wait_sta_disconnect()
1529        hapd.disable()
1530    finally:
1531        dev[0].set("sae_pwe", "0")
1532        dev[0].cmd_execute(['iw', 'reg', 'set', '00'])
1533        wait_regdom_changes(dev[0])
1534
1535def test_eht_6ghz_20mhz(dev, apdev):
1536    """EHT with 20 MHz channel width on 6 GHz"""
1537    _test_eht_6ghz(dev, apdev, 5, 131, 5)
1538
1539def test_eht_6ghz_40mhz(dev, apdev):
1540    """EHT with 40 MHz channel width on 6 GHz"""
1541    _test_eht_6ghz(dev, apdev, 5, 132, 3)
1542
1543def test_eht_6ghz_80mhz(dev, apdev):
1544    """EHT with 80 MHz channel width on 6 GHz"""
1545    _test_eht_6ghz(dev, apdev, 5, 133, 7)
1546
1547def test_eht_6ghz_160mhz(dev, apdev):
1548    """EHT with 160 MHz channel width on 6 GHz"""
1549    _test_eht_6ghz(dev, apdev, 5, 134, 15)
1550
1551def test_eht_6ghz_320mhz(dev, apdev):
1552    """EHT with 320 MHz channel width on 6 GHz"""
1553    _test_eht_6ghz(dev, apdev, 5, 137, 31)
1554
1555def test_eht_6ghz_320mhz_2(dev, apdev):
1556    """EHT with 320 MHz channel width on 6 GHz center 63"""
1557    _test_eht_6ghz(dev, apdev, 37, 137, 63)
1558
1559def test_eht_6ghz_320mhz_3(dev, apdev):
1560    """EHT with 320 MHz channel width on 6 GHz center 31 primary 37"""
1561    _test_eht_6ghz(dev, apdev, 37, 137, 31)
1562
1563def check_anqp(dev, bssid):
1564    if "OK" not in dev.request("ANQP_GET " + bssid + " 258"):
1565        raise Exception("ANQP_GET command failed")
1566
1567    ev = dev.wait_event(["GAS-QUERY-START"], timeout=5)
1568    if ev is None:
1569        raise Exception("GAS query start timed out")
1570
1571    ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
1572    if ev is None:
1573        raise Exception("GAS query timed out")
1574
1575    ev = dev.wait_event(["RX-ANQP"], timeout=1)
1576    if ev is None or "Venue Name" not in ev:
1577        raise Exception("Did not receive Venue Name")
1578
1579    ev = dev.wait_event(["ANQP-QUERY-DONE"], timeout=10)
1580    if ev is None:
1581        raise Exception("ANQP-QUERY-DONE event not seen")
1582    if "result=SUCCESS" not in ev:
1583        raise Exception("Unexpected result: " + ev)
1584
1585def test_eht_mld_gas(dev, apdev):
1586    """GAS/ANQP during MLO association"""
1587    params = hs20_ap_params()
1588    bssid = apdev[0]['bssid']
1589    params['hessid'] = bssid
1590    params['channel'] = "11"
1591    hapd = hostapd.add_ap(apdev[0], params)
1592
1593    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1594        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1595        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1596
1597        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1598        wpas.interface_add(wpas_iface)
1599        check_owe_capab(wpas)
1600        wpas.scan_for_bss(bssid, freq="2462")
1601
1602        ssid = "owe_two_link"
1603        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1604        params['interworking'] = "1"
1605        params['venue_group'] = "7"
1606        params['venue_type'] = "1"
1607        params['venue_name'] = "eng:Example venue"
1608        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1609        bssid0 = hapd0.own_addr()
1610
1611        params['channel'] = '6'
1612        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1613        bssid1 = hapd1.own_addr()
1614
1615        wpas.scan_for_bss(bssid0, freq="2412")
1616        wpas.scan_for_bss(bssid1, freq="2437")
1617
1618        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1619                     ieee80211w="2")
1620        hapd0.wait_sta()
1621        hapd1.wait_sta()
1622
1623        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1624                          valid_links=3, active_links=3)
1625
1626        check_anqp(wpas, bssid)
1627        check_anqp(wpas, bssid0)
1628        check_anqp(wpas, bssid1)
1629
1630def test_eht_mld_dpp_responder_while_assoc(dev, apdev):
1631    """DPP responder while ML associated"""
1632    check_dpp_capab(dev[0])
1633
1634    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1635        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1636        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1637
1638        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1639        wpas.interface_add(wpas_iface)
1640        check_dpp_capab(wpas)
1641
1642        ssid = "owe_two_link"
1643        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1644        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1645
1646        params['channel'] = '6'
1647        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1648
1649        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1650                     ieee80211w="2")
1651        hapd0.wait_sta()
1652        hapd1.wait_sta()
1653
1654        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1655                          valid_links=3, active_links=3)
1656
1657        id = wpas.dpp_bootstrap_gen(chan="81/11", mac=True)
1658        uri = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % id)
1659        wpas.dpp_listen(2462)
1660        dev[0].dpp_auth_init(uri=uri)
1661        wait_auth_success(dev[0], wpas)
1662
1663def _eht_mld_disconnect(dev, apdev, disassoc=True):
1664    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1665        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1666
1667        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1668        wpas.interface_add(wpas_iface)
1669        check_owe_capab(wpas)
1670
1671        ssid = "mld_ap_owe_two_link"
1672        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1673        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1674
1675        params['channel'] = '6'
1676        hapd1 = eht_mld_enable_ap(hapd0_iface, 1, params)
1677
1678        wpas.connect(ssid, scan_freq="2412 2437", key_mgmt="OWE",
1679                     ieee80211w="2")
1680        hapd0.wait_sta()
1681        hapd1.wait_sta()
1682        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1683                          valid_links=3, active_links=3)
1684        eht_verify_wifi_version(wpas)
1685        traffic_test(wpas, hapd0)
1686
1687        cmd = "DISASSOCIATE " if disassoc else "DEAUTHENTICATE "
1688
1689        cmd += wpas.own_addr()
1690        for i in range(0, 3):
1691            time.sleep(1)
1692
1693            if "OK" not in hapd0.request(cmd):
1694                raise Exception("Failed to request: " + cmd)
1695            hapd0.wait_sta_disconnect()
1696            hapd1.wait_sta_disconnect()
1697
1698            wpas.wait_disconnected(timeout=1)
1699            wpas.wait_connected(timeout=5)
1700            hapd0.wait_sta()
1701            hapd1.wait_sta()
1702
1703            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1704                              valid_links=3, active_links=3)
1705            eht_verify_wifi_version(wpas)
1706            traffic_test(wpas, hapd0)
1707
1708def test_eht_mld_disassociate(dev, apdev):
1709    """EHT MLD with two links. Disassociate and reconnect"""
1710    _eht_mld_disconnect(dev, apdev, disassoc=True)
1711
1712def test_eht_mld_deauthenticate(dev, apdev):
1713    """EHT MLD with two links. Deauthenticate and reconnect"""
1714    _eht_mld_disconnect(dev, apdev, disassoc=False)
1715
1716def test_eht_mld_non_pref_chan(dev, apdev):
1717    """EHT MLD with one link. MBO non preferred channels"""
1718
1719    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1720        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1721
1722        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1723        wpas.interface_add(wpas_iface)
1724        check_owe_capab(wpas)
1725
1726        # Start the first AP
1727        ssid = "mld_ap_one_link_mbo"
1728        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1729        params["bss_transition"] = "1"
1730        params["mbo"] = "1"
1731
1732        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1733
1734        if "OK" not in wpas.request("SET non_pref_chan 81:7:200:1 81:9:100:2"):
1735            raise Exception("Failed to set non-preferred channel list")
1736
1737        id = wpas.connect(ssid, scan_freq="2412", key_mgmt="OWE",
1738                          ieee80211w="2", owe_only="1")
1739        hapd0.wait_sta()
1740        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1741                          valid_links=1, active_links=1)
1742        eht_verify_wifi_version(wpas)
1743        traffic_test(wpas, hapd0)
1744
1745        # Validate information received from the Association Request frame
1746        addr = wpas.own_addr()
1747        sta = hapd0.get_sta(addr)
1748        logger.debug("STA: " + str(sta))
1749
1750        if 'non_pref_chan[0]' not in sta:
1751            raise Exception("Missing non_pref_chan[0] value (assoc)")
1752        if sta['non_pref_chan[0]'] != '81:200:1:7':
1753            raise Exception("Unexpected non_pref_chan[0] value (assoc)")
1754        if 'non_pref_chan[1]' not in sta:
1755            raise Exception("Missing non_pref_chan[1] value (assoc)")
1756        if sta['non_pref_chan[1]'] != '81:100:2:9':
1757            raise Exception("Unexpected non_pref_chan[1] value (assoc)")
1758        if 'non_pref_chan[2]' in sta:
1759            raise Exception("Unexpected non_pref_chan[2] value (assoc)")
1760
1761        # Verify operating class
1762        if 'supp_op_classes' not in sta:
1763            raise Exception("No supp_op_classes")
1764        supp = bytearray(binascii.unhexlify(sta['supp_op_classes']))
1765        if supp[0] != 81:
1766            raise Exception("Unexpected current operating class %d" % supp[0])
1767        if 115 not in supp:
1768            raise Exception("Operating class 115 missing")
1769
1770        # Validate information from WNM action
1771        if "OK" not in wpas.request("SET non_pref_chan 81:9:100:2"):
1772            raise Exception("Failed to update non-preferred channel list")
1773
1774        time.sleep(0.1)
1775        sta = hapd0.get_sta(addr)
1776        logger.debug("STA: " + str(sta))
1777
1778        if 'non_pref_chan[0]' not in sta:
1779            raise Exception("Missing non_pref_chan[0] value (update 1)")
1780        if sta['non_pref_chan[0]'] != '81:100:2:9':
1781            raise Exception("Unexpected non_pref_chan[0] value (update 1)")
1782        if 'non_pref_chan[1]' in sta:
1783            raise Exception("Unexpected non_pref_chan[1] value (update 1)")
1784
1785        # Validate information from WNM action with multiple entries
1786        if "OK" not in wpas.request("SET non_pref_chan 81:9:100:2 81:10:100:2 81:8:100:2 81:7:100:1 81:5:100:1"):
1787            raise Exception("Failed to update non-preferred channel list")
1788        time.sleep(0.1)
1789        sta = hapd0.get_sta(addr)
1790        logger.debug("STA: " + str(sta))
1791
1792        if 'non_pref_chan[0]' not in sta:
1793            raise Exception("Missing non_pref_chan[0] value (update 2)")
1794        if sta['non_pref_chan[0]'] != '81:100:1:7,5':
1795            raise Exception("Unexpected non_pref_chan[0] value (update 2)")
1796        if 'non_pref_chan[1]' not in sta:
1797            raise Exception("Missing non_pref_chan[1] value (update 2)")
1798        if sta['non_pref_chan[1]'] != '81:100:2:9,10,8':
1799            raise Exception("Unexpected non_pref_chan[1] value (update 2)")
1800        if 'non_pref_chan[2]' in sta:
1801            raise Exception("Unexpected non_pref_chan[2] value (update 2)")
1802
1803def test_eht_mld_rrm_beacon_req(dev, apdev):
1804    """EHT MLD with one link. RRM beacon request"""
1805
1806    with HWSimRadio(use_mlo=True) as (hapd0_radio, hapd0_iface), \
1807        HWSimRadio(use_mlo=True) as (hapd1_radio, hapd1_iface), \
1808        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1809
1810        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1811        wpas.interface_add(wpas_iface)
1812        check_owe_capab(wpas)
1813
1814        # Start the first AP and connect
1815        ssid = "mld_ap_one_link_rrm1"
1816        params = eht_mld_ap_wpa2_params(ssid, key_mgmt="OWE", mfp="2")
1817        params["bss_transition"] = "1"
1818        params["mbo"] = "1"
1819        params["rrm_beacon_report"] = "1"
1820
1821        hapd0 = eht_mld_enable_ap(hapd0_iface, 0, params)
1822
1823        wpas.connect(ssid, scan_freq="2412", key_mgmt="OWE", ieee80211w="2",
1824                     owe_only="1")
1825        hapd0.wait_sta()
1826        eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1827                          valid_links=1, active_links=1)
1828        eht_verify_wifi_version(wpas)
1829        traffic_test(wpas, hapd0)
1830
1831        # Start the second AP
1832        other_ssid = "other"
1833        params = eht_mld_ap_wpa2_params(other_ssid, key_mgmt="OWE", mfp="2")
1834        params["channel"] = '6'
1835        hapd1 = eht_mld_enable_ap(hapd1_iface, 0, params)
1836
1837        # Issue a beacon request for the second AP
1838        addr = wpas.own_addr()
1839        req = build_beacon_request(mode=1, chan=6, duration=50)
1840
1841        # Send the request with SSID, Detail, Last Beacon Report Indication, and
1842        # Extended Request subelements. The Extended Request elements includes
1843        # the Multi-Link element ID.
1844        run_req_beacon(hapd0, addr,
1845                       req + "0000" + "020101" + "a40101" + "0b02ff6b")
1846
1847        ev = hapd0.wait_event(["BEACON-RESP-RX"], timeout=3)
1848        if ev is None:
1849            raise Exception("Beacon report response not received")
1850
1851        fields = ev.split(' ')
1852        report = BeaconReport(binascii.unhexlify(fields[4]))
1853        logger.info("Received beacon report: " + str(report))
1854        if report.bssid_str != hapd1.own_addr() or report.opclass != 81 or \
1855           report.channel != 6:
1856            raise Exception("Incorrect bssid/op class/channel for hapd1")
1857
1858        if not report.last_indication:
1859            raise Exception("Last Beacon Report Indication subelement missing")
1860
1861def test_eht_mld_legacy_stas(dev, apdev):
1862    """EHT AP MLD and multiple non-MLD STAs"""
1863    for i in range(3):
1864        check_sae_capab(dev[i])
1865
1866    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface):
1867        password = 'qwertyuiop'
1868        ssid = "ap_mld_sae"
1869        params = eht_mld_ap_wpa2_params(ssid, password,
1870                                        key_mgmt="SAE SAE-EXT-KEY",
1871                                        mfp="2", pwe='2')
1872        params['rsn_pairwise'] = "CCMP GCMP-256"
1873        params['sae_groups'] = "19 20"
1874        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
1875
1876        for i in range(3):
1877            dev[i].set("sae_groups", "")
1878            dev[i].connect(ssid, sae_password=password, scan_freq="2412",
1879                           key_mgmt="SAE", ieee80211w="2", disable_eht="1")
1880        hapd0.wait_sta()
1881        hapd0.wait_sta()
1882        hapd0.wait_sta()
1883        aid = []
1884        for i in range(3):
1885            aid.append(int(hapd0.get_sta(dev[i].own_addr())['aid']))
1886            traffic_test(dev[i], hapd0)
1887        logger.info("Assigned AIDs: " + str(aid))
1888        if len(set(aid)) != 3:
1889            raise Exception("AP did not assign unique AID to each STA")
1890
1891def test_eht_mld_and_mlds(dev, apdev):
1892    """EHT AP MLD and multiple non-AP MLDs"""
1893    check_sae_capab(dev[0])
1894
1895    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1896            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface), \
1897            HWSimRadio(use_mlo=True) as (wpas_radio2, wpas_iface2):
1898        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1899        wpas.interface_add(wpas_iface)
1900        check_sae_capab(wpas)
1901
1902        wpas2 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1903        wpas2.interface_add(wpas_iface2)
1904        check_sae_capab(wpas2)
1905
1906        password = 'qwertyuiop'
1907        ssid = "ap_mld_sae"
1908        params = eht_mld_ap_wpa2_params(ssid, password,
1909                                        key_mgmt="SAE SAE-EXT-KEY",
1910                                        mfp="2", pwe='2')
1911        params['rsn_pairwise'] = "CCMP GCMP-256"
1912        params['sae_groups'] = "19 20"
1913        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
1914
1915        wpas.set("sae_pwe", "1")
1916        wpas.connect(ssid, sae_password=password, scan_freq="2412",
1917                     key_mgmt="SAE-EXT-KEY", ieee80211w="2")
1918        wpas2.set("sae_pwe", "1")
1919        wpas2.connect(ssid, sae_password=password, scan_freq="2412",
1920                      key_mgmt="SAE-EXT-KEY", ieee80211w="2")
1921
1922        hapd0.wait_sta()
1923        hapd0.wait_sta()
1924        aid = []
1925        aid.append(int(hapd0.get_sta(wpas.own_addr())['aid']))
1926        traffic_test(wpas, hapd0)
1927        aid.append(int(hapd0.get_sta(wpas2.own_addr())['aid']))
1928        traffic_test(wpas2, hapd0)
1929        logger.info("Assigned AIDs: " + str(aid))
1930        if len(set(aid)) != 2:
1931            raise Exception("AP MLD did not assign unique AID to each non-AP MLD")
1932
1933def mlo_perform_csa(hapd, command, freq, dev):
1934        match_str = "freq=" + str(freq)
1935        hapd.request(command)
1936
1937        ev = hapd.wait_event(["CTRL-EVENT-STARTED-CHANNEL-SWITCH"], timeout=10)
1938        if ev is None:
1939            raise Exception("Channel switch start event not seen")
1940        if match_str not in ev:
1941            raise Exception("Unexpected channel in CS started")
1942
1943        ev = hapd.wait_event(["CTRL-EVENT-CHANNEL-SWITCH"], timeout=10)
1944        if ev is None:
1945            raise Exception("Channel switch completion event not seen")
1946        if match_str not in ev:
1947            raise Exception("Unexpected channel in CS completed")
1948
1949        ev = hapd.wait_event(["AP-CSA-FINISHED"], timeout=10)
1950        if ev is None:
1951            raise Exception("CSA finished event timed out")
1952        if match_str not in ev:
1953            raise Exception("Unexpected channel in CSA finished event")
1954
1955        ev = dev.wait_event(["CTRL-EVENT-LINK-CHANNEL-SWITCH"], timeout=10)
1956        if ev is None:
1957            raise Exception("Non-AP MLD did not report CS")
1958        if match_str not in ev:
1959            raise Exception("Unexpected channel in CS event from non-AP MLD")
1960
1961        time.sleep(0.5)
1962
1963def test_eht_mlo_csa(dev, apdev):
1964        """EHT MLD AP connected to non-AP MLD. Seamless channel switch"""
1965        csa_supported(dev[0])
1966
1967        with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
1968            HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
1969
1970            wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
1971            wpas.interface_add(wpas_iface)
1972            check_sae_capab(wpas)
1973
1974            ssid = "mld_ap"
1975            passphrase = 'qwertyuiop'
1976
1977            params = eht_mld_ap_wpa2_params(ssid, passphrase,
1978                                            key_mgmt="SAE", mfp="2", pwe='1')
1979            hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
1980
1981            params['channel'] = '6'
1982            hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
1983
1984            wpas.set("sae_pwe", "1")
1985            wpas.connect(ssid, sae_password=passphrase, scan_freq="2412 2437",
1986                         key_mgmt="SAE", ieee80211w="2")
1987
1988            eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True,
1989                              valid_links=3, active_links=3)
1990            eht_verify_wifi_version(wpas)
1991            traffic_test(wpas, hapd0)
1992
1993            logger.info("Perform CSA on 1st link")
1994            mlo_perform_csa(hapd0, "CHAN_SWITCH 5 2462 ht he eht blocktx",
1995                            2462, wpas)
1996
1997            logger.info("Test traffic after 1st link CSA completes")
1998            traffic_test(wpas, hapd0)
1999
2000            logger.info("Perform CSA on 2nd link")
2001            mlo_perform_csa(hapd1, "CHAN_SWITCH 5 2412 ht he eht blocktx",
2002                            2412, wpas)
2003
2004
2005            logger.info("Test traffic after 2nd link CSA completes")
2006            traffic_test(wpas, hapd1)
2007
2008            logger.info("Perform CSA on 2nd link and bring it back to original channel")
2009            mlo_perform_csa(hapd1, "CHAN_SWITCH 5 2437 ht he eht blocktx",
2010                            2437, wpas)
2011
2012            logger.info("Test traffic again after 2nd link CSA completes")
2013            traffic_test(wpas, hapd1)
2014
2015            logger.info("Perform CSA on 1st link and bring it back to original channel")
2016            mlo_perform_csa(hapd0, "CHAN_SWITCH 5 2412 ht he eht blocktx",
2017                            2412, wpas)
2018
2019            logger.info("Test traffic again after 1st link CSA completes")
2020            traffic_test(wpas, hapd0)
2021
2022def create_base_conf_file(iface, channel, prefix='hostapd-', hw_mode='g',
2023                          op_class=None):
2024    # Create configuration file and add phy characteristics
2025    fd, fname = tempfile.mkstemp(dir='/tmp',
2026                                 prefix=prefix + iface + "-chan-" + str(channel) + "-")
2027    f = os.fdopen(fd, 'w')
2028
2029    f.write("driver=nl80211\n")
2030    f.write("hw_mode=" + str(hw_mode) + "\n")
2031    f.write("ieee80211n=1\n")
2032    if hw_mode == 'a' and \
2033       (op_class is None or \
2034        op_class not in [131, 132, 133, 134, 135, 136, 137]):
2035        f.write("ieee80211ac=1\n")
2036    f.write("ieee80211ax=1\n")
2037    f.write("ieee80211be=1\n")
2038    f.write("channel=" + str(channel) + "\n")
2039
2040    return f, fname
2041
2042def append_bss_conf_to_file(f, ifname, params, first=False):
2043    # Add BSS specific characteristics
2044    config = "bss"
2045
2046    if first:
2047        config = "interface"
2048
2049    f.write("\n" + config + "=%s\n" % ifname)
2050
2051    for k, v in list(params.items()):
2052        f.write("{}={}\n".format(k,v))
2053
2054    f.write("mld_ap=1\n")
2055
2056def dump_config(fname):
2057    with open(fname, 'r') as f:
2058        cfg = f.read()
2059        logger.debug("hostapd config: " + str(fname) + "\n" + cfg)
2060
2061def get_config(iface, count, ssid, passphrase, channel, bssid_regex,
2062               rnr=False, debug=False, start_idx=0):
2063    f, fname = create_base_conf_file(iface, channel=channel)
2064    hapds = []
2065
2066    i = start_idx
2067    while count:
2068        if i == 0:
2069            ifname = iface
2070        else:
2071            ifname = iface + "-" + str(i)
2072
2073        set_ssid = ssid + str(i)
2074        set_passphrase = passphrase + str(i)
2075        params = hostapd.wpa2_params(ssid=set_ssid, passphrase=set_passphrase,
2076                                     wpa_key_mgmt="SAE", ieee80211w="2")
2077        params['sae_pwe'] = "2"
2078        params['group_mgmt_cipher'] = "AES-128-CMAC"
2079        params['beacon_prot'] = "1"
2080        params["ctrl_interface"] = "/var/run/hostapd/"
2081        params["bssid"] = bssid_regex % (i + 1)
2082
2083        if rnr:
2084            params["rnr"] = "1"
2085
2086        append_bss_conf_to_file(f, ifname, params, first=(i == start_idx))
2087
2088        hapds.append([ifname, i])
2089        count = count - 1
2090        i = i + 1
2091
2092    f.close()
2093
2094    if debug:
2095        dump_config(fname)
2096
2097    return fname, hapds
2098
2099def start_ap(prefix, configs):
2100    pid = prefix + ".hostapd.pid"
2101    configs = configs.split()
2102
2103    cmd = ['../../hostapd/hostapd', '-ddKtB', '-P', pid, '-f',
2104           prefix + ".hostapd-log"]
2105
2106    cmd = cmd + configs
2107
2108    logger.info("Starting APs")
2109    res = subprocess.check_call(cmd)
2110    if res != 0:
2111        raise Exception("Could not start hostapd: %s" % str(res))
2112
2113    # Wait for hostapd to complete initialization and daemonize.
2114    time.sleep(2)
2115    for i in range(20):
2116        if os.path.exists(pid):
2117            break
2118        time.sleep(0.2)
2119
2120    if not os.path.exists(pid):
2121        raise Exception("hostapd did not create PID file.")
2122
2123def get_mld_devs(hapd_iface, count, prefix, rnr=False):
2124    fname1, hapds1 = get_config(hapd_iface, count=count, ssid="mld-",
2125                                passphrase="qwertyuiop-", channel=1,
2126                                bssid_regex="02:00:00:00:07:%02x",
2127                                rnr=rnr, debug=True)
2128    fname2, hapds2 = get_config(hapd_iface, count=count, ssid="mld-",
2129                                passphrase="qwertyuiop-", channel=6,
2130                                bssid_regex="02:00:00:00:08:%02x",
2131                                rnr=rnr, debug=True)
2132
2133    start_ap(prefix, fname1 + " " + fname2)
2134
2135    hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0], bssidx=hapds1[0][1],
2136                                      link=0)
2137    hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[0][0], bssidx=hapds2[0][1],
2138                                      link=1)
2139
2140    hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds1[1][0], bssidx=hapds1[1][1],
2141                                      link=0)
2142    hapd_mld2_link1 = hostapd.Hostapd(ifname=hapds2[1][0], bssidx=hapds2[1][1],
2143                                      link=1)
2144
2145    if not hapd_mld1_link0.ping():
2146        raise Exception("Could not ping hostapd")
2147
2148    if not hapd_mld1_link1.ping():
2149        raise Exception("Could not ping hostapd")
2150
2151    if not hapd_mld2_link0.ping():
2152        raise Exception("Could not ping hostapd")
2153
2154    if not hapd_mld2_link1.ping():
2155        raise Exception("Could not ping hostapd")
2156
2157    os.remove(fname1)
2158    os.remove(fname2)
2159
2160    return [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0, hapd_mld2_link1]
2161
2162def stop_mld_devs(hapds, pid):
2163    pid = pid + ".hostapd.pid"
2164
2165    if "OK" not in hapds[0].request("TERMINATE"):
2166        raise Exception("Failed to terminate hostapd process")
2167
2168    ev = hapds[0].wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
2169    if ev is None:
2170        raise Exception("CTRL-EVENT-TERMINATING not seen")
2171
2172    time.sleep(0.5)
2173
2174    for i in range(30):
2175        time.sleep(0.1)
2176        if not os.path.exists(pid):
2177            break
2178    if os.path.exists(pid):
2179        raise Exception("PID file exits after process termination")
2180
2181def eht_parse_rnr(bss, rnr=False, exp_bssid=None):
2182        partner_rnr_pattern = re.compile(".*ap_info.*, mld ID=0, link ID=",
2183                                         re.MULTILINE)
2184        ml_pattern = re.compile(".*multi-link:.*, MLD addr=.*", re.MULTILINE)
2185
2186        if partner_rnr_pattern.search(bss) is None:
2187            raise Exception("RNR element not found for first link of first MLD")
2188
2189        if ml_pattern.search(bss) is None:
2190            raise Exception("ML element not found for first link of first MLD")
2191
2192        if not rnr:
2193            return
2194
2195        coloc_rnr_pattern = re.compile(".*ap_info.*, mld ID=255, link ID=..",
2196                                       re.MULTILINE)
2197
2198        if coloc_rnr_pattern.search(bss) is None:
2199            raise Exception("RNR element not found for co-located BSS")
2200
2201        line = coloc_rnr_pattern.search(bss).group()
2202        if line.count('bssid') > 1:
2203            raise Exception("More than one BSS found for co-located RNR")
2204
2205        # Get the BSSID carried in the RNR
2206        index = line.rindex('bssid')
2207        bssid = line[index+len('bssid')+1:].split(',')[0]
2208
2209        # Get the MLD ID carried in the RNR
2210        index = line.rindex('link ID')
2211        link_id = line[index+len('link ID')+1:].split(',')[0]
2212
2213        if link_id != "15":
2214            raise Exception("Unexpected link ID for co-located BSS which is not own partner")
2215
2216        if bssid != exp_bssid:
2217            raise Exception("Unexpected BSSID for co-located BSS")
2218
2219def eht_mld_cohosted_discovery(dev, apdev, params, rnr=False):
2220    with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
2221        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
2222
2223        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2224        wpas.interface_add(wpas_iface)
2225
2226        hapds = get_mld_devs(hapd_iface=hapd_iface, count=2,
2227                             prefix=params['prefix'], rnr=rnr)
2228
2229        # Only scan link 0
2230        res = wpas.request("SCAN freq=2412")
2231        if "FAIL" in res:
2232            raise Exception("Failed to start scan")
2233
2234        ev = wpas.wait_event(["CTRL-EVENT-SCAN-STARTED"])
2235        if ev is None:
2236            raise Exception("Scan did not start")
2237
2238        ev = wpas.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
2239        if ev is None:
2240            raise Exception("Scan did not complete")
2241
2242        logger.info("Scan done")
2243
2244        bss = wpas.request("BSS " + hapds[0].own_addr())
2245        logger.info("BSS 0_0: " + str(bss))
2246        eht_parse_rnr(bss, rnr, hapds[2].own_addr())
2247
2248        bss = wpas.request("BSS " + hapds[2].own_addr())
2249        logger.info("BSS 1_0: " + str(bss))
2250        eht_parse_rnr(bss, rnr, hapds[0].own_addr())
2251
2252        stop_mld_devs(hapds, params['prefix'])
2253
2254def test_eht_mld_cohosted_discovery(dev, apdev, params):
2255    """EHT 2 AP MLDs discovery"""
2256    eht_mld_cohosted_discovery(dev, apdev, params)
2257
2258def test_eht_mld_cohosted_discovery_with_rnr(dev, apdev, params):
2259    """EHT 2 AP MLDs discovery (with co-location RNR)"""
2260    eht_mld_cohosted_discovery(dev, apdev, params, rnr=True)
2261
2262def test_eht_mld_cohosted_connectivity(dev, apdev, params):
2263    """EHT 2 AP MLDs with 2 MLD clients connection"""
2264    check_sae_capab(dev[0])
2265
2266    with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
2267        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface), \
2268        HWSimRadio(use_mlo=True) as (wpas_radio1, wpas_iface1):
2269
2270        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2271        wpas.interface_add(wpas_iface)
2272        check_sae_capab(wpas)
2273
2274        wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2275        wpas1.interface_add(wpas_iface1)
2276        check_sae_capab(wpas1)
2277
2278        hapds = get_mld_devs(hapd_iface=hapd_iface, count=2,
2279                             prefix=params['prefix'], rnr=False)
2280
2281        passphrase = "qwertyuiop-"
2282        ssid = "mld-"
2283
2284        # Connect one client to first AP MLD and verify traffic on both links
2285        wpas.set("sae_pwe", "1")
2286        wpas.connect(ssid + "0", sae_password=passphrase+"0", scan_freq="2412",
2287                     key_mgmt="SAE", ieee80211w="2")
2288
2289        eht_verify_status(wpas, hapds[0], 2412, 20, is_ht=True, mld=True,
2290                          valid_links=3, active_links=3)
2291        eht_verify_wifi_version(wpas)
2292
2293        traffic_test(wpas, hapds[0])
2294        traffic_test(wpas, hapds[1])
2295
2296        # Connect another client to second AP MLD and verify traffic on both
2297        # links
2298        wpas1.set("sae_pwe", "1")
2299        wpas1.connect(ssid + "1", sae_password=passphrase+"1", scan_freq="2437",
2300                      key_mgmt="SAE", ieee80211w="2")
2301
2302        eht_verify_status(wpas1, hapds[3], 2437, 20, is_ht=True, mld=True,
2303                          valid_links=3, active_links=3)
2304        eht_verify_wifi_version(wpas1)
2305
2306        traffic_test(wpas1, hapds[3])
2307        traffic_test(wpas1, hapds[2])
2308
2309        stop_mld_devs(hapds, params['prefix'])
2310
2311def test_eht_mlo_color_change(dev, apdev):
2312    """AP MLD and Color Change Announcement"""
2313    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
2314        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
2315
2316        ssid = "mld_ap"
2317        passphrase = 'qwertyuiop'
2318
2319        params = eht_mld_ap_wpa2_params(ssid, passphrase,
2320                                        key_mgmt="SAE", mfp="2", pwe='1')
2321        params['he_bss_color'] = '42'
2322
2323        hapd0 = eht_mld_enable_ap(hapd_iface, 0, params)
2324
2325        params['channel'] = '6'
2326        params['he_bss_color'] = '24'
2327
2328        hapd1 = eht_mld_enable_ap(hapd_iface, 1, params)
2329
2330        logger.info("Perform CCA on 1st link")
2331        if "OK" not in hapd0.request("COLOR_CHANGE 10"):
2332            raise Exception("COLOR_CHANGE failed")
2333
2334        time.sleep(1.5)
2335
2336        color = hapd0.get_status_field("he_bss_color")
2337        if color != "10":
2338            raise Exception("Expected current he_bss_color to be 10; was " + color)
2339
2340        logger.info("Perform CCA on 1st link again")
2341        if "OK" not in hapd0.request("COLOR_CHANGE 60"):
2342            raise Exception("COLOR_CHANGE failed")
2343        time.sleep(1.5)
2344
2345        color = hapd0.get_status_field("he_bss_color")
2346        if color != "60":
2347            raise Exception("Expected current he_bss_color to be 60; was " + color)
2348
2349        logger.info("Perform CCA on 2nd link")
2350        if "OK" not in hapd1.request("COLOR_CHANGE 25"):
2351            raise Exception("COLOR_CHANGE failed")
2352        time.sleep(1.5)
2353
2354        color = hapd1.get_status_field("he_bss_color")
2355        if color != "25":
2356            raise Exception("Expected current he_bss_color to be 25; was " + color)
2357
2358        logger.info("Perform CCA on 2nd link again")
2359        if "OK" not in hapd1.request("COLOR_CHANGE 5"):
2360            raise Exception("COLOR_CHANGE failed")
2361        time.sleep(1.5)
2362
2363        color = hapd1.get_status_field("he_bss_color")
2364        if color != "5":
2365            raise Exception("Expected current he_bss_color to be 5; was " + color)
2366
2367        hapd0.dump_monitor()
2368        hapd1.dump_monitor()
2369
2370def test_eht_mld_control_socket_connectivity(dev, apdev):
2371    """AP MLD control socket connectivity"""
2372    with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \
2373        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface):
2374
2375        ssid = "mld_ap"
2376        link0_params = {"ssid": ssid,
2377                        "hw_mode": "g",
2378                        "channel": "1"}
2379        link1_params = {"ssid": ssid,
2380                        "hw_mode": "g",
2381                        "channel": "2"}
2382
2383        hapd0 = eht_mld_enable_ap(hapd_iface, 0, link0_params)
2384        hapd1 = eht_mld_enable_ap(hapd_iface, 1, link1_params)
2385
2386        mld_dev = mld.get_mld_obj(hapd_iface)
2387
2388        # Check status of each link
2389        res = str(mld_dev.request("LINKID 0 STATUS"))
2390        logger.info("LINK 0 STATUS:\n" + res)
2391        if "state" not in res:
2392            raise Exception("Failed to get link 0 status via MLD socket")
2393        if 'link_id=0' not in res.splitlines():
2394            raise Exception("link_id=0 not reported for link 0")
2395
2396        res = mld_dev.request("LINKID 1 STATUS")
2397        logger.info("LINK 1 STATUS:\n" + res)
2398        if "state" not in res:
2399            raise Exception("Failed to get link 1 status via MLD socket")
2400        if 'link_id=1' not in res.splitlines():
2401            raise Exception("link_id=0 not reported for link 1")
2402
2403def test_eht_mlo_single_drv(dev, apdev, params):
2404    "EHT MLO AP simple test but having single drv object"
2405    with HWSimRadio(use_mlo=True, n_channels=2) as (hapd_radio, hapd_iface), \
2406        HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface), \
2407        HWSimRadio(use_mlo=True) as (wpas_radio1, wpas_iface1):
2408
2409        wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2410        wpas.interface_add(wpas_iface)
2411        check_sae_capab(wpas)
2412
2413        wpas1 = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
2414        wpas1.interface_add(wpas_iface1)
2415        check_sae_capab(wpas1)
2416
2417        fname1, hapds1 = get_config(hapd_iface, count=1, ssid="mld-",
2418                                    passphrase="qwertyuiop-", channel=1,
2419                                    bssid_regex="02:00:00:00:07:%02x",
2420                                    rnr=False, debug=True, start_idx=1)
2421        fname2, hapds2 = get_config(hapd_iface, count=2, ssid="mld-",
2422                                    passphrase="qwertyuiop-", channel=6,
2423                                    bssid_regex="02:00:00:00:08:%02x",
2424                                    rnr=False, debug=True)
2425
2426        # Configs will be in this form:
2427        #
2428        #   +--------------------+     +------------------+
2429        #   |     config 2       |     |      config 1    |
2430        #   |                    |     |                  |
2431        #   |                    |     |                  |
2432        #   | +----------------+ |     |                  |
2433        #   | |     BSS 1      | |     |                  |
2434        #   | | ssid: mld-0    | |     |                  |
2435        #   | +----------------+ |     |                  |
2436        #   |                    |     |                  |
2437        # +---------------------------------------------------------------+
2438        # | | +----------------+ |     | +--------------+ |               |
2439        # | | |     BSS 2      | |     | |     BSS 1    | |               |
2440        # | | | ssid: mld-1    | |     | | ssid: mld-1  | | 2 Link MLO AP |
2441        # | | +----------------+ |     | +--------------+ |               |
2442        # | +--------------------+     +------------------+               |
2443        # +---------------------------------------------------------------+
2444
2445        # Since config 1 will be passed first, it will expect an interface
2446        # to be created already. Hence, create the interface for it
2447        cmd = ['iw', 'phy' + str(hapd_radio), 'interface', 'add', 'wlan7-1',
2448               'type', '__ap']
2449        proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT,
2450                                stdout=subprocess.PIPE, shell=False)
2451        out, err = proc.communicate()
2452        logger.debug("iw output: " + out.decode())
2453
2454        # Start hostapd
2455        start_ap(params['prefix'], fname1 + " " + fname2)
2456
2457        hapd_mld1_link0 = hostapd.Hostapd(ifname=hapds1[0][0],
2458                                          bssidx=hapds1[0][1],
2459                                          link=0)
2460        hapd_mld1_link1 = hostapd.Hostapd(ifname=hapds2[1][0],
2461                                          bssidx=hapds2[1][1],
2462                                          link=1)
2463
2464        hapd_mld2_link0 = hostapd.Hostapd(ifname=hapds2[0][0],
2465                                          bssidx=hapds2[0][1],
2466                                          link=0)
2467
2468        hapds = [hapd_mld1_link0, hapd_mld1_link1, hapd_mld2_link0]
2469
2470        if not hapd_mld1_link0.ping():
2471            raise Exception("Could not ping hostapd")
2472
2473        if not hapd_mld2_link0.ping():
2474            raise Exception("Could not ping hostapd")
2475
2476        if not hapd_mld1_link1.ping():
2477            raise Exception("Could not ping hostapd")
2478
2479        os.remove(fname1)
2480        os.remove(fname2)
2481
2482        passphrase = "qwertyuiop-"
2483        ssid = "mld-"
2484
2485        # Connect one client to the first AP MLD 1 and verify traffic
2486        wpas.set("sae_pwe", "1")
2487        wpas.connect(ssid + "1", sae_password=passphrase + "1",
2488                     scan_freq="2412", key_mgmt="SAE", ieee80211w="2")
2489
2490        eht_verify_status(wpas, hapds[0], 2412, 20, is_ht=True, mld=True,
2491                          valid_links=3, active_links=3)
2492        eht_verify_wifi_version(wpas)
2493
2494        traffic_test(wpas, hapds[1])
2495
2496        # Connect another client to the second AP MLD 2 and verify traffic
2497        wpas1.set("sae_pwe", "1")
2498        wpas1.connect(ssid + "0", sae_password=passphrase + "0",
2499                      scan_freq="2437", key_mgmt="SAE", ieee80211w="2")
2500
2501        eht_verify_status(wpas1, hapds[2], 2437, 20, is_ht=True, mld=True,
2502                          valid_links=1, active_links=1)
2503        eht_verify_wifi_version(wpas1)
2504
2505        traffic_test(wpas1, hapds[2])
2506
2507        stop_mld_devs(hapds, params['prefix'])
2508