1# Multiple BSSID and enhanced multi-BSS advertisements (EMA)
2# Copyright (c) 2019, The Linux Foundation
3# Copyright (c) 2022, Qualcomm Innovation Center, Inc
4#
5# This software may be distributed under the terms of the BSD license.
6# See README for more details.
7
8import os, subprocess, time, logging, tempfile
9logger = logging.getLogger()
10
11import hwsim_utils, hostapd
12from wpasupplicant import WpaSupplicant
13from utils import *
14
15def mbssid_create_cfg_file(apdev, params, mbssid=1):
16    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
17    status = hapd.get_driver_status()
18    if "capa.mbssid_max_interfaces" not in status or \
19       int(status["capa.mbssid_max_interfaces"]) < 8:
20        raise HwsimSkip("MBSSID not supported")
21    if mbssid == 2 and \
22       ("capa.ema_max_periodicity" not in status or \
23        int(status["capa.ema_max_periodicity"]) < 3):
24        raise HwsimSkip("EMA not supported")
25    hapd.disable()
26    hglobal = hostapd.HostapdGlobal()
27    hglobal.remove(apdev[0]['ifname'])
28
29    # Create configuration file and add phy characteristics
30    fd, fname = tempfile.mkstemp(dir='/tmp',
31                                 prefix='hostapd-' + get_phy(apdev[0]))
32    f = os.fdopen(fd, 'w')
33    f.write("driver=nl80211\n")
34    f.write("hw_mode=g\n")
35    f.write("channel=1\n")
36    f.write("ieee80211n=1\n")
37    f.write("vht_oper_chwidth=0\n")
38    f.write("ieee80211ac=1\n")
39    f.write("he_oper_chwidth=0\n")
40    f.write("ieee80211ax=1\n")
41    f.write("mbssid=%d\n\n" % mbssid)
42
43    if isinstance(apdev[0], dict):
44        ifname = apdev[0]['ifname']
45    else:
46        ifname = apdev
47
48    return (f, fname, ifname)
49
50def mbssid_write_bss_params(f, ifname, idx, params=None, single_ssid=False):
51    # Add BSS specific characteristics
52    fields = ["wpa", "wpa_pairwise", "rsn_pairwise", "wpa_passphrase",
53              "wpa_key_mgmt", "ieee80211w", "sae_pwe", "beacon_prot",
54              "sae_password"]
55
56    if idx == 0:
57        f.write("interface=%s\n" % ifname)
58    else:
59        f.write("\nbss=%s\n" % (ifname + '-' + str(idx)))
60
61    f.write("ctrl_interface=/var/run/hostapd\n")
62    if single_ssid:
63        f.write("ssid=single\n")
64    else:
65        f.write("ssid=bss-%s\n" % idx)
66    f.write("bridge=br-lan\n")
67    f.write("bssid=00:03:7f:12:2a:%02x\n" % idx)
68    f.write("preamble=1\n")
69    f.write("auth_algs=1\n")
70
71    if params is None:
72        return
73
74    for field in fields:
75        if field in params:
76            f.write("%s=%s\n" % (field, params[field]))
77
78def mbssid_dump_config(fname):
79    with open(fname, 'r') as f:
80        cfg = f.read()
81        logger.debug("hostapd config:\n" + cfg)
82
83def mbssid_stop_ap(hapd, pid):
84    if "OK" not in hapd.request("TERMINATE"):
85        raise Exception("Failed to terminate hostapd process")
86    ev = hapd.wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
87    if ev is None:
88        raise Exception("CTRL-EVENT-TERMINATING not seen")
89    for i in range(30):
90        time.sleep(0.1)
91        if not os.path.exists(pid):
92            break
93    if os.path.exists(pid):
94        raise Exception("PID file exits after process termination")
95
96def mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params,
97                    sta_params2=None, single_ssid=False, only_start_ap=False):
98    pid = params['prefix'] + ".hostapd.pid"
99    cmd = ['../../hostapd/hostapd', '-ddKtSB', '-P', pid, '-f',
100           params['prefix'] + ".hostapd-log", fname]
101
102    logger.info("Starting APs for " + ifname)
103    res = subprocess.check_call(cmd)
104    if res != 0:
105        raise Exception("Could not start hostapd: %s" % str(res))
106
107    # Wait for hostapd to complete initialization and daemonize.
108    for i in range(10):
109        if os.path.exists(pid):
110            break
111        time.sleep(0.2)
112    if not os.path.exists(pid):
113        # For now, assume MBSSID is not supported in the kernel.
114        raise Exception("hostapd did not create PID file")
115
116    hapd = hostapd.Hostapd(apdev[0]['ifname'])
117    hapd.ping()
118    os.remove(fname);
119
120    # Allow enough time to pass for a Beacon frame to be captured.
121    time.sleep(0.1)
122    if only_start_ap:
123        return hapd, pid
124
125    ssid = "single" if single_ssid else "bss-0"
126    dev[0].connect(ssid, **sta_params)
127    sta = hapd.get_sta(dev[0].own_addr())
128    if "[HE]" not in sta['flags']:
129        raise Exception("Missing STA flag: HE")
130    dev[0].request("DISCONNECT")
131
132    if sta_params2:
133        dev[0].wait_disconnected()
134        ssid = "single" if single_ssid else "bss-1"
135        dev[0].connect(ssid, **sta_params2)
136        dev[0].request("DISCONNECT")
137
138    mbssid_stop_ap(hapd, pid)
139
140def test_he_ap_mbssid_open(dev, apdev, params):
141    """HE AP MBSSID all open"""
142    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
143    for idx in range(0, 4):
144        mbssid_write_bss_params(f, ifname, idx)
145    f.close()
146
147    try:
148        sta_params = {"key_mgmt": "NONE", "scan_freq": "2412"}
149        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params)
150    finally:
151        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
152                         'address', apdev[0]['bssid']])
153
154def test_he_ap_mbssid_same_security(dev, apdev, params):
155    """HE AP MBSSID all SAE"""
156    check_sae_capab(dev[0])
157    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
158
159    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
160                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
161                  "sae_pwe": "1", "ieee80211w": "2"}
162
163    for idx in range(0, 2):
164        mbssid_write_bss_params(f, ifname, idx, sae_params)
165
166    f.close()
167
168    try:
169        dev[0].set("sae_pwe", "1")
170        dev[0].set("sae_groups", "")
171        sta_params = {"psk": "12345678", "key_mgmt": "SAE", "ieee80211w": "2",
172                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
173        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params)
174    finally:
175        dev[0].set("sae_pwe", "0")
176        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
177                         'address', apdev[0]['bssid']])
178
179def test_he_ap_mbssid_mixed_security1(dev, apdev, params):
180    """HE AP MBSSID with mixed security (STA SAE)"""
181    check_sae_capab(dev[0])
182    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
183
184    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
185                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
186
187    owe_params = {"wpa": "2", "wpa_pairwise": "CCMP", "wpa_key_mgmt": "OWE"}
188
189    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
190                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
191                  "sae_pwe": "1", "ieee80211w": "2"}
192
193    mbssid_write_bss_params(f, ifname, 0, sae_params)
194    mbssid_write_bss_params(f, ifname, 1, psk_params)
195    mbssid_write_bss_params(f, ifname, 2, owe_params)
196    mbssid_write_bss_params(f, ifname, 3)
197    mbssid_write_bss_params(f, ifname, 4, psk_params)
198    mbssid_write_bss_params(f, ifname, 5, sae_params)
199    mbssid_write_bss_params(f, ifname, 6, owe_params)
200    mbssid_write_bss_params(f, ifname, 7)
201
202    f.close()
203
204    try:
205        dev[0].set("sae_pwe", "1")
206        dev[0].set("sae_groups", "")
207        sta_params = {"psk": "12345678", "key_mgmt": "SAE", "ieee80211w": "2",
208                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
209        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params)
210    finally:
211        dev[0].set("sae_pwe", "0")
212        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
213                         'address', apdev[0]['bssid']])
214
215def test_he_ap_mbssid_mixed_security2(dev, apdev, params):
216    """HE AP MBSSID with mixed security (STA open)"""
217    check_sae_capab(dev[0])
218    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
219
220    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
221                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
222
223    owe_params = {"wpa": "2", "wpa_pairwise": "CCMP", "wpa_key_mgmt": "OWE"}
224
225    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
226                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
227                  "sae_pwe": "1", "ieee80211w": "2"}
228
229    mbssid_write_bss_params(f, ifname, 0)
230    mbssid_write_bss_params(f, ifname, 1, psk_params)
231    mbssid_write_bss_params(f, ifname, 2, owe_params)
232    mbssid_write_bss_params(f, ifname, 3, sae_params)
233    mbssid_write_bss_params(f, ifname, 4)
234    mbssid_write_bss_params(f, ifname, 5, psk_params)
235    mbssid_write_bss_params(f, ifname, 6, sae_params)
236    mbssid_write_bss_params(f, ifname, 7, owe_params)
237
238    f.close()
239
240    try:
241        sta_params = {"key_mgmt": "NONE", "scan_freq": "2412"}
242        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params)
243    finally:
244        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
245                         'address', apdev[0]['bssid']])
246
247def test_he_ap_mbssid_mixed_security3(dev, apdev, params):
248    """HE AP MBSSID with mixed security (WPA2-Personal + WPA3-Personal)"""
249    check_sae_capab(dev[0])
250    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
251
252    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
253                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
254
255    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
256                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
257                  "sae_pwe": "1", "ieee80211w": "2"}
258
259    mbssid_write_bss_params(f, ifname, 0, psk_params)
260    mbssid_write_bss_params(f, ifname, 1, sae_params)
261
262    f.close()
263
264    try:
265        dev[0].set("sae_pwe", "1")
266        dev[0].set("sae_groups", "")
267        sta_params = {"psk": "12345678", "key_mgmt": "WPA-PSK",
268                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
269        sta_params2 = {"psk": "12345678", "key_mgmt": "SAE", "ieee80211w": "2",
270                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
271        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params,
272                        sta_params2)
273    finally:
274        dev[0].set("sae_pwe", "0")
275        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
276                         'address', apdev[0]['bssid']])
277
278def test_he_ap_mbssid_mixed_security4(dev, apdev, params):
279    """HE AP MBSSID with mixed security (WPA2-Personal + WPA3-Personal+beacon prot)"""
280    check_sae_capab(dev[0])
281    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
282
283    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
284                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
285
286    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
287                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
288                  "sae_pwe": "1", "ieee80211w": "2",
289                  "beacon_prot": "1"}
290
291    mbssid_write_bss_params(f, ifname, 0, psk_params)
292    mbssid_write_bss_params(f, ifname, 1, sae_params)
293
294    f.close()
295
296    try:
297        hapd, pid = mbssid_start_ap(dev, apdev, params, fname, ifname, None,
298                                    only_start_ap=True)
299        dev[0].set("sae_pwe", "1")
300        dev[0].set("sae_groups", "")
301        dev[0].connect("bss-1", psk="12345678", key_mgmt="SAE", ieee80211w="2",
302                       beacon_prot="1", scan_freq="2412")
303        dev[1].connect("bss-0", psk="12345678", key_mgmt="WPA-PSK",
304                       scan_freq="2412")
305        bssid0 = dev[0].get_status_field("bssid")
306        bss0 = dev[0].get_bss(bssid0)
307        ies0 = parse_ie(bss0['ie'])
308        ext_cap0 = ies0[127]
309        bssid1 = dev[1].get_status_field("bssid")
310        bss1 = dev[0].get_bss(bssid1)
311        ies1 = parse_ie(bss1['ie'])
312        ext_cap1 = ies1[127]
313        dev[0].request("DISCONNECT")
314        dev[1].request("DISCONNECT")
315        dev[0].wait_disconnected()
316        dev[1].wait_disconnected()
317        mbssid_stop_ap(hapd, pid)
318        if not (ext_cap0[10] & 0x10):
319            raise Exception("Beacon protection not enabled in non-TX BSS")
320        if ext_cap1[10] & 0x10:
321            raise Exception("Beacon protection enabled in TX BSS")
322    finally:
323        dev[0].set("sae_pwe", "0")
324        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
325                         'address', apdev[0]['bssid']])
326
327def test_he_ap_mbssid_single_ssid(dev, apdev, params):
328    """HE AP MBSSID with mixed security and single SSID"""
329    check_sae_capab(dev[0])
330    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
331
332    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
333                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
334
335    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
336                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
337                  "sae_pwe": "1", "ieee80211w": "2", "beacon_prot": "1"}
338
339    mbssid_write_bss_params(f, ifname, 0, psk_params, single_ssid=True)
340    mbssid_write_bss_params(f, ifname, 1, sae_params, single_ssid=True)
341
342    f.close()
343
344    try:
345        dev[0].set("sae_pwe", "1")
346        dev[0].set("sae_groups", "")
347        sta_params = {"psk": "12345678", "key_mgmt": "WPA-PSK",
348                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
349        sta_params2 = {"psk": "12345678", "key_mgmt": "SAE", "ieee80211w": "2",
350                       "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412",
351                       "beacon_prot": "1"}
352        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params,
353                        sta_params2, single_ssid=True)
354    finally:
355        dev[0].set("sae_pwe", "0")
356        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
357                         'address', apdev[0]['bssid']])
358
359def test_he_ap_mbssid_single_ssid_tm(dev, apdev, params):
360    """HE AP MBSSID with mixed security and single SSID and transition mode"""
361    check_sae_capab(dev[0])
362    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
363
364    psk_params = {"wpa": "2", "wpa_passphrase": "12345678",
365                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "WPA-PSK"}
366
367    tm_params = {"wpa": "2", "wpa_passphrase": "12345678",
368                 "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE WPA-PSK",
369                 "sae_pwe": "2", "ieee80211w": "1", "beacon_prot": "1"}
370
371    mbssid_write_bss_params(f, ifname, 0, psk_params, single_ssid=True)
372    mbssid_write_bss_params(f, ifname, 1, tm_params, single_ssid=True)
373
374    f.close()
375
376    try:
377        dev[0].set("sae_groups", "")
378        hapd, pid = mbssid_start_ap(dev, apdev, params, fname, ifname, None,
379                                    single_ssid=True, only_start_ap=True)
380        dev[0].connect("single", psk="12345678", key_mgmt="SAE WPA-PSK",
381                       ieee80211w="1", scan_freq="2412", beacon_prot="1")
382        beacon_loss = False
383        ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
384        if ev is not None:
385            beacon_loss = True
386        key_mgmt = dev[0].get_status_field("key_mgmt")
387        mbssid_stop_ap(hapd, pid)
388        if key_mgmt != "SAE":
389            raise Exception("Did not use SAE")
390    finally:
391        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
392                         'address', apdev[0]['bssid']])
393
394def test_he_ap_ema(dev, apdev, params):
395    """HE EMA AP"""
396    check_sae_capab(dev[0])
397    f, fname, ifname = mbssid_create_cfg_file(apdev, params, 2)
398
399    sae_params = {"wpa": "2", "wpa_passphrase": "12345678",
400                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
401                  "sae_pwe": "1", "ieee80211w": "2"}
402
403    for idx in range(0, 8):
404        mbssid_write_bss_params(f, ifname, idx, sae_params)
405
406    f.close()
407
408    try:
409        dev[0].set("sae_pwe", "1")
410        dev[0].set("sae_groups", "")
411        sta_params = {"psk": "12345678", "key_mgmt": "SAE", "ieee80211w": "2",
412                      "pairwise": "CCMP", "group": "CCMP", "scan_freq": "2412"}
413        mbssid_start_ap(dev, apdev, params, fname, ifname, sta_params)
414    finally:
415        dev[0].set("sae_pwe", "0")
416        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
417                         'address', apdev[0]['bssid']])
418
419def test_he_ap_mbssid_beacon_prot(dev, apdev, params):
420    """HE AP MBSSID beacon protection"""
421    check_sae_capab(dev[0])
422    check_sae_capab(dev[1])
423    f, fname, ifname = mbssid_create_cfg_file(apdev, params)
424
425    sae_params = {"wpa": "2", "sae_password": "12345678",
426                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
427                  "ieee80211w": "2", "beacon_prot": "1"}
428    mbssid_write_bss_params(f, ifname, 0, sae_params)
429    sae_params = {"wpa": "2", "sae_password": "another password",
430                  "wpa_pairwise": "CCMP", "wpa_key_mgmt": "SAE",
431                  "ieee80211w": "2", "beacon_prot": "1"}
432    mbssid_write_bss_params(f, ifname, 1, sae_params)
433
434    f.close()
435    mbssid_dump_config(fname)
436
437    try:
438        dev[0].set("sae_groups", "")
439        dev[1].set("sae_groups", "")
440        hapd, pid = mbssid_start_ap(dev, apdev, params, fname, ifname, None,
441                                    only_start_ap=True)
442        dev[0].connect("bss-0", psk="12345678", key_mgmt="SAE",
443                       ieee80211w="2", beacon_prot="1", scan_freq="2412")
444        dev[1].connect("bss-1", psk="another password", key_mgmt="SAE",
445                       ieee80211w="2", beacon_prot="1", scan_freq="2412")
446
447        beacon_loss0 = False
448        beacon_loss1 = False
449        ev = dev[1].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=5)
450        if ev is not None:
451            beacon_loss1 = True
452        ev = dev[0].wait_event(["CTRL-EVENT-BEACON-LOSS"], timeout=0.1)
453        if ev is not None:
454            beacon_loss0 = True
455
456        mbssid_stop_ap(hapd, pid)
457
458        if beacon_loss0 or beacon_loss1:
459            raise Exception("Beacon loss detected")
460    finally:
461        subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'],
462                         'address', apdev[0]['bssid']])
463