1 # SPP A-MSDU tests
2 # Copyright (c) 2025, Intel Corporation
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import logging
8 import hostapd
9 from utils import HwsimSkip, parse_ie
10 import hwsim_utils
11 import time
12 import os
13 from tshark import run_tshark
14 
15 logger = logging.getLogger()
16 
17 SPP_AMSDU_SUPP_CIPHERS = ['CCMP', 'GCMP', 'CCMP-256', 'GCMP-256']
18 SPP_AMSDU_STA_FLAG = '[SPP-A-MSDU]'
19 WLAN_EID_RSNX = 244
20 RSNX_SPP_AMSDU_CAPAB_MASK = 0x4000
21 RSNX_SPP_AMSDU_CAPAB_BIT = 14
22 
23 def setup_ap(apdev, ssid, spp_amsdu, cipher):
24     params = {'ssid': ssid}
25     wpa_passphrase = ''
26 
27     if cipher != '':
28         wpa_passphrase = '123456789'
29         params.update({'wpa': '2',
30                        'wpa_passphrase': wpa_passphrase,
31                        'wpa_key_mgmt': 'WPA-PSK',
32                        'rsn_pairwise': cipher,
33                        'group_cipher': cipher})
34 
35     params['spp_amsdu'] = spp_amsdu
36 
37     return hostapd.add_ap(apdev, params), wpa_passphrase
38 
39 def skip_unsupported_spp_amsdu(dev, hapd):
40     flags2 = hapd.request("DRIVER_FLAGS2").splitlines()[1:]
41     if "SPP_AMSDU" not in flags2:
42         raise HwsimSkip('AP: SPP A-MSDU is not supported')
43 
44     # Sanity (should be the same for both AP/STA)
45     flags2 = dev.request("DRIVER_FLAGS2").splitlines()[1:]
46     if "SPP_AMSDU" not in flags2:
47         raise HwsimSkip('STA: SPP A-MSDU is not supported')
48 
49 def check_sta_rsnxe(spp_amsdu_enabled, logdir):
50     """Check STA assoc request RSXNE"""
51     # tshark returns the RSNXE data in a comma separated string
52     out = run_tshark(os.path.join(logdir, 'hwsim0.pcapng'),
53                      "wlan.fc.type_subtype == 0x0",
54                      display=["wlan.rsnx"])
55     hex_rsnxe = out.rstrip().split(',')
56     if len(hex_rsnxe) < 2:
57         if spp_amsdu_enabled:
58             raise Exception('STA: RSNXE SPP A-MSDU is not set')
59         else:
60             return
61 
62     second_rsnxe_byte = int(hex_rsnxe[1], 16)
63     spp_amsdu_set = second_rsnxe_byte & (1 << (RSNX_SPP_AMSDU_CAPAB_BIT % 8))
64     logger.debug('STA RSNXE SPP A-MSDU capable bit is %sset' %
65                  '' if spp_amsdu_set else 'not ')
66 
67     if spp_amsdu_enabled and not spp_amsdu_set:
68         raise Exception('STA: SPP A-MSDU capab bit is not set in RSNXE')
69     if not spp_amsdu_enabled and spp_amsdu_set:
70         raise Exception('STA: Unexpected SPP A-MSDU capab bit set in RSNXE')
71 
72 def check_ap_rsnxe(spp_amsdu_enabled, ies):
73     if not WLAN_EID_RSNX in ies:
74         if spp_amsdu_enabled:
75             raise Exception('AP: RSNXE is not present')
76         else:
77             # nothing to check
78             return
79 
80     rsnx_hex_str = ''.join([hex(byte)[2:].zfill(2) \
81                             for byte in reversed(ies[WLAN_EID_RSNX])])
82     rsnx_hex = int(rsnx_hex_str, 16)
83     spp_amsdu_set = rsnx_hex & RSNX_SPP_AMSDU_CAPAB_MASK
84     logger.debug('AP RSNXE SPP A-MSDU capable bit is %sset' %
85                  '' if spp_amsdu_set else 'not ')
86     if spp_amsdu_enabled and not spp_amsdu_set:
87         raise Exception('AP: SPP A-MSDU capab bit is not set in RSNXE')
88     if not spp_amsdu_enabled and spp_amsdu_set:
89         raise Exception('AP: Unexpected SPP A-MSDU capab bit set in RSNXE')
90 
91 def check_ap_sta_flags(dev, hapd, spp_amsdu_enabled):
92     """Check AP SPP A-MSDU STA flags"""
93     sta = hapd.get_sta(dev.own_addr())
94     spp_amsdu_flag_set = SPP_AMSDU_STA_FLAG in sta['flags']
95 
96     logger.debug('AP SPP A-MSDU STA flag is %sset' %
97                  '' if spp_amsdu_flag_set else 'not ')
98     if spp_amsdu_enabled and not spp_amsdu_flag_set:
99         raise Exception('SPP-A-MSDU flag not present for STA')
100     if not spp_amsdu_enabled and spp_amsdu_flag_set:
101         raise Exception('Unexpected SPP-A-MSDU flag present for STA')
102 
103 def _run(dev, apdev, logdir, spp_amsdu, cipher=''):
104     """
105     1. Connect to AP (SPP A-MSDU enabled/disabled)
106     2. test connectivity
107     3. Verify AP/STA advertised SPP A-MSDU capabilities (enabled/disabled)
108     """
109     # Sanity
110     if cipher != '' and cipher not in dev.get_capability('pairwise'):
111         raise HwsimSkip('%s not supported' % cipher)
112 
113     ssid='test-spp-amsdu-%s' % cipher if cipher != '' else 'open'
114     hapd, wpa_passphrase = setup_ap(apdev, ssid, spp_amsdu=spp_amsdu,
115                                     cipher=cipher)
116     skip_unsupported_spp_amsdu(dev, hapd)
117 
118     dev.connect(ssid, key_mgmt='NONE' if cipher == '' else '',
119                 psk=wpa_passphrase, pairwise=cipher, group=cipher)
120     hapd.wait_sta()
121     time.sleep(0.1)
122     hwsim_utils.test_connectivity(dev, hapd)
123 
124     if spp_amsdu != '0' and cipher in SPP_AMSDU_SUPP_CIPHERS:
125         spp_amsdu_enabled = True
126     else:
127         spp_amsdu_enabled = False
128 
129     # Verify AP capabilities
130     bss = dev.get_bss(hapd.own_addr())
131     bss_ies = parse_ie(bss['ie'])
132     check_ap_rsnxe(spp_amsdu_enabled, bss_ies)
133     check_ap_sta_flags(dev, hapd, spp_amsdu_enabled)
134 
135     # Verify STA capabilities
136     check_sta_rsnxe(spp_amsdu_enabled, logdir)
137 
138 def test_spp_amsdu_ccmp(dev, apdev, params):
139     """SPP A-MSDU with CCMP"""
140     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1', cipher='CCMP')
141 
142 def test_spp_amsdu_ccmp256(dev, apdev, params):
143     """SPP A-MSDU with CCMP-256"""
144     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1', cipher='CCMP-256')
145 
146 def test_spp_amsdu_gcmp(dev, apdev, params):
147     """SPP A-MSDU with GCMP"""
148     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1', cipher='GCMP')
149 
150 def test_spp_amsdu_gcmp256(dev, apdev, params):
151     """SPP A-MSDU with GCMP-256"""
152     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1', cipher='GCMP-256')
153 
154 def test_spp_amsdu_tkip(dev, apdev, params):
155     """SPP A-MSDU disabled with TKIP"""
156     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1', cipher='TKIP')
157 
158 def test_spp_amsdu_open(dev, apdev, params):
159     """SPP A-MSDU disabled in open network"""
160     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='1')
161 
162 def test_spp_amsdu_disabled(dev, apdev, params):
163     """SPP A-MSDU explicitly disabled"""
164     _run(dev[0], apdev[0], params['logdir'], spp_amsdu='0', cipher='CCMP')
165