1 # Test a few kernel bugs and functionality
2 # Copyright (c) 2016, Intel Deutschland GmbH
3 #
4 # Author: Johannes Berg <johannes.berg@intel.com>
5 #
6 # This software may be distributed under the terms of the BSD license.
7 # See README for more details.
8 
9 import hostapd
10 import binascii
11 import os, time
12 import struct
13 import subprocess, re
14 from test_wnm import expect_ack
15 from tshark import run_tshark
16 from utils import clear_regdom, long_duration_test
17 from utils import HwsimSkip
18 
19 def _test_kernel_bss_leak(dev, apdev, deauth):
20     ssid = "test-bss-leak"
21     passphrase = 'qwertyuiop'
22     params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
23     hapd = hostapd.add_ap(apdev[0], params)
24     hapd.set("ext_mgmt_frame_handling", "1")
25     dev[0].connect(ssid, psk=passphrase, scan_freq="2412", wait_connect=False)
26     while True:
27         pkt = hapd.mgmt_rx()
28         if not pkt:
29             raise Exception("MGMT RX wait timed out for auth frame")
30         if pkt['fc'] & 0xc:
31             continue
32         if pkt['subtype'] == 0: # assoc request
33             if deauth:
34                 # return a deauth immediately
35                 hapd.mgmt_tx({
36                     'fc': 0xc0,
37                     'sa': pkt['da'],
38                     'da': pkt['sa'],
39                     'bssid': pkt['bssid'],
40                     'payload': b'\x01\x00',
41                 })
42             break
43         else:
44             hapd.request("MGMT_RX_PROCESS freq=2412 datarate=0 ssi_signal=-30 frame=%s" % (
45                          binascii.hexlify(pkt['frame']).decode(), ))
46     hapd.set("ext_mgmt_frame_handling", "0")
47 
48     hapd.request("STOP_AP")
49 
50     dev[0].request("REMOVE_NETWORK all")
51     dev[0].wait_disconnected()
52 
53     dev[0].flush_scan_cache(freq=5180)
54     res = dev[0].request("SCAN_RESULTS")
55     if len(res.splitlines()) > 1:
56         raise Exception("BSS entry should no longer be around")
57 
58 def test_kernel_bss_leak_deauth(dev, apdev):
59     """cfg80211/mac80211 BSS leak on deauthentication"""
60     return _test_kernel_bss_leak(dev, apdev, deauth=True)
61 
62 def test_kernel_bss_leak_timeout(dev, apdev):
63     """cfg80211/mac80211 BSS leak on timeout"""
64     return _test_kernel_bss_leak(dev, apdev, deauth=False)
65 
66 MGMT_SUBTYPE_ACTION = 13
67 
68 def expect_no_ack(hapd):
69     ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
70     if ev is None:
71         raise Exception("Missing TX status")
72     if "ok=0" not in ev:
73         raise Exception("Action frame unexpectedly acknowledged")
74 
75 def test_kernel_unknown_action_frame_rejection_sta(dev, apdev, params):
76     """mac80211 and unknown Action frame rejection in STA mode"""
77     hapd = hostapd.add_ap(apdev[0], {"ssid": "unknown-action"})
78     dev[0].connect("unknown-action", key_mgmt="NONE", scan_freq="2412")
79     bssid = hapd.own_addr()
80     addr = dev[0].own_addr()
81 
82     hapd.set("ext_mgmt_frame_handling", "1")
83 
84     # Unicast Action frame with unknown category (response expected)
85     msg = {}
86     msg['fc'] = MGMT_SUBTYPE_ACTION << 4
87     msg['da'] = addr
88     msg['sa'] = bssid
89     msg['bssid'] = bssid
90     msg['payload'] = struct.pack("<BB", 0x70, 0)
91     hapd.mgmt_tx(msg)
92     expect_ack(hapd)
93 
94     # Note: mac80211 does not allow group-addressed Action frames in unknown
95     # categories to be transmitted in AP mode, so for now, these steps are
96     # commented out.
97 
98     # Multicast Action frame with unknown category (no response expected)
99     #msg['da'] = "01:ff:ff:ff:ff:ff"
100     #msg['payload'] = struct.pack("<BB", 0x71, 1)
101     #hapd.mgmt_tx(msg)
102     #expect_no_ack(hapd)
103 
104     # Broadcast Action frame with unknown category (no response expected)
105     #msg['da'] = "ff:ff:ff:ff:ff:ff"
106     #msg['payload'] = struct.pack("<BB", 0x72, 2)
107     #hapd.mgmt_tx(msg)
108     #expect_no_ack(hapd)
109 
110     # Unicast Action frame with error indication category (no response expected)
111     msg['da'] = addr
112     msg['payload'] = struct.pack("<BB", 0xf3, 3)
113     hapd.mgmt_tx(msg)
114     expect_ack(hapd)
115 
116     # Unicast Action frame with unknown category (response expected)
117     msg['da'] = addr
118     msg['payload'] = struct.pack("<BB", 0x74, 4)
119     hapd.mgmt_tx(msg)
120     expect_ack(hapd)
121 
122     out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
123                      "wlan.sa == %s && wlan.fc.type_subtype == 0x0d" % addr,
124                      display=["wlan_mgt.fixed.category_code"])
125     res = out.splitlines()
126     categ = [int(x) for x in res]
127 
128     if 0xf2 in categ or 0xf3 in categ:
129         raise Exception("Unexpected Action frame rejection: " + str(categ))
130     if 0xf0 not in categ or 0xf4 not in categ:
131         raise Exception("Action frame rejection missing: " + str(categ))
132 
133 @long_duration_test
134 def test_kernel_reg_disconnect(dev, apdev):
135     """Connect and force disconnect via regulatory"""
136     hapd = None
137     try:
138         ssid = "test-reg-disconnect"
139         passphrase = 'qwertyuiop'
140         params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
141         params["country_code"] = "DE"
142         params["hw_mode"] = "b"
143         params["channel"] = "13"
144         hapd = hostapd.add_ap(apdev[0], params)
145         dev[0].set("country", "DE")
146         dev[0].connect(ssid, psk=passphrase, scan_freq="2472")
147         dev[0].set("country", "US")
148         time.sleep(61)
149         dev[0].wait_disconnected(error="no regulatory disconnect")
150     finally:
151         dev[0].request("DISCONNECT")
152         clear_regdom(hapd, dev)
153         dev[0].set("country", "00")
154 
155 def test_kernel_kunit(dev, apdev):
156     """KUnit tests"""
157     modules = ('cfg80211-tests', 'mac80211-tests')
158     results = (subprocess.call(['modprobe', mod], stderr=subprocess.PIPE)
159                for mod in modules)
160 
161     if all((r != 0 for r in results)):
162         raise HwsimSkip("KUnit tests not available")
163 
164     dmesg = subprocess.check_output(['dmesg'])
165     fail_rx = re.compile(b'fail:[^0]')
166     for line in dmesg.split(b'\n'):
167         if fail_rx.search(line):
168             raise Exception(f'kunit test failed: {line.decode("utf-8")}')
169