1# Test cases for dscp policy 2# Copyright (c) 2021, Jouni Malinen <j@w1.fi> 3# Copyright (c) 2021, The Linux Foundation 4# 5# This software may be distributed under the terms of the BSD license. 6# See README for more details. 7 8import struct 9import time 10import sys 11import socket 12 13import hostapd 14from wpasupplicant import WpaSupplicant 15from utils import * 16 17def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie, 18 truncate=False): 19 type = 0 20 subtype = 13 21 category = 126 22 oui_type = 0x506f9a1a 23 if truncate: 24 req = struct.pack('>BLBB', category, oui_type, oui_subtype, 25 dialog_token) 26 else: 27 req = struct.pack('>BLBBB', category, oui_type, oui_subtype, 28 dialog_token, req_control) 29 if qos_ie: 30 req += qos_ie 31 32 msg = {} 33 msg['fc'] = 0x00d0 34 msg['sa'] = hapd.own_addr() 35 msg['da'] = da 36 msg['bssid'] = hapd.own_addr() 37 msg['type'] = type 38 msg['subtype'] = subtype 39 msg['payload'] = req 40 41 hapd.mgmt_tx(msg) 42 ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5) 43 if ev is None or "stype=13 ok=1" not in ev: 44 raise Exception("No DSCP Policy Request sent") 45 46def prepare_qos_ie(policy_id, req_type, dscp, start_port=0, end_port=0, 47 frame_classifier=None, frame_class_len=0, domain_name=None): 48 qos_elem_oui_type = 0x229a6f50 49 qos_elem_id = 221 50 51 if policy_id: 52 qos_attr = struct.pack('BBBBB', 2, 3, policy_id, req_type, dscp) 53 qos_attr_len = 5 54 else: 55 qos_attr = 0 56 qos_attr_len = 0 57 58 if start_port and end_port: 59 port_range_attr = struct.pack('>BBHH', 1, 4, start_port, end_port) 60 if qos_attr: 61 qos_attr += port_range_attr 62 else: 63 qos_attr = port_range_attr 64 qos_attr_len += 6 65 66 if frame_classifier and frame_class_len: 67 tclas_attr = struct.pack('>BB%ds' % (len(frame_classifier),), 3, 68 len(frame_classifier), frame_classifier) 69 if qos_attr: 70 qos_attr += tclas_attr 71 else: 72 qos_attr = tclas_attr 73 qos_attr_len += 2 + len(frame_classifier) 74 75 if domain_name: 76 s = bytes(domain_name, 'utf-8') 77 domain_name_attr = struct.pack('>BB%ds' % (len(s),), 4, len(s), s) 78 if qos_attr: 79 qos_attr += domain_name_attr 80 else: 81 qos_attr = domain_name_attr 82 qos_attr_len += 2 + len(s) 83 84 qos_attr_len += 4 85 qos_ie = struct.pack('<BBL', qos_elem_id, qos_attr_len, 86 qos_elem_oui_type) + qos_attr 87 88 return qos_ie 89 90def validate_dscp_req_event(dev, event): 91 ev = dev.wait_event(["CTRL-EVENT-DSCP-POLICY"], timeout=2) 92 if ev is None: 93 raise Exception("No DSCP request reported") 94 if ev != event: 95 raise Exception("Invalid DSCP event received (%s; expected: %s)" % (ev, event)) 96 97def handle_dscp_query(hapd, query): 98 msg = hapd.mgmt_rx() 99 if msg['payload'] != query: 100 raise Exception("Invalid DSCP Query received at AP") 101 102def handle_dscp_response(hapd, response): 103 msg = hapd.mgmt_rx() 104 if msg['payload'] != response: 105 raise Exception("Invalid DSCP Response received at AP") 106 107def ap_sta_connectivity(dev, apdev, params): 108 p = hostapd.wpa2_params(passphrase="12345678") 109 p["wpa_key_mgmt"] = "WPA-PSK" 110 p["ieee80211w"] = "1" 111 p.update(params) 112 hapd = hostapd.add_ap(apdev[0], p) 113 114 dev[0].request("SET enable_dscp_policy_capa 1") 115 dev[0].connect("dscp", psk="12345678", ieee80211w="1", 116 key_mgmt="WPA-PSK WPA-PSK-SHA256", scan_freq="2412") 117 hapd.wait_sta() 118 119 hapd.dump_monitor() 120 hapd.set("ext_mgmt_frame_handling", "1") 121 return hapd 122 123def test_dscp_query(dev, apdev): 124 """DSCP Policy Query""" 125 126 # Positive tests 127 #AP with DSCP Capabilities 128 params = {"ssid": "dscp", 129 "ext_capa": 6*"00" + "40", 130 "assocresp_elements": "dd06506f9a230101", 131 "vendor_elements": "dd06506f9a230101"} 132 133 hapd = ap_sta_connectivity(dev, apdev, params) 134 da = dev[0].own_addr() 135 136 # Query 1 137 cmd = "DSCP_QUERY wildcard" 138 if "OK" not in dev[0].request(cmd): 139 raise Exception("Sending DSCP Query failed") 140 query = b'\x7e\x50\x6f\x9a\x1a\x00\x01' 141 handle_dscp_query(hapd, query) 142 143 # Query 2 144 cmd = "DSCP_QUERY domain_name=example.com" 145 if "OK" not in dev[0].request(cmd): 146 raise Exception("Sending DSCP Query failed") 147 query = b'\x7e\x50\x6f\x9a\x1a\x00\x02\xdd\x11\x50\x6f\x9a\x22\x04\x0b\x65\x78\x61\x6d\x70\x6c\x65\x2e\x63\x6f\x6d' 148 handle_dscp_query(hapd, query) 149 150 # Negative tests 151 152 cmd = "DSCP_QUERY domain_name=" + 250*'a' + ".example.com" 153 if "FAIL" not in dev[0].request(cmd): 154 raise Exception("Invalid DSCP_QUERY accepted") 155 156 dev[0].disconnect_and_stop_scan() 157 # AP without DSCP Capabilities 158 params = {"ssid": "dscp", 159 "ext_capa": 6*"00" + "40"} 160 hapd = ap_sta_connectivity(dev, apdev, params) 161 162 # Query 3 163 cmd = "DSCP_QUERY wildcard" 164 if "FAIL" not in dev[0].request(cmd): 165 raise Exception("Able to send invalid DSCP Query") 166 167def test_dscp_request(dev, apdev): 168 """DSCP Policy Request""" 169 170 # Positive tests 171 172 #AP with DSCP Capabilities 173 params = {"ssid": "dscp", 174 "ext_capa": 6*"00" + "40", 175 "assocresp_elements": "dd06506f9a230101", 176 "vendor_elements": "dd06506f9a230101"} 177 178 hapd = ap_sta_connectivity(dev, apdev, params) 179 da = dev[0].own_addr() 180 181 # Request 1 182 dialog_token = 5 183 send_dscp_req(hapd, da, 1, dialog_token, 2, 0) 184 event = "<3>CTRL-EVENT-DSCP-POLICY request_start clear_all" 185 validate_dscp_req_event(dev[0], event) 186 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 187 validate_dscp_req_event(dev[0], event) 188 189 # DSCP Request with multiple QoS IEs 190 # QoS IE 1 191 dialog_token = 1 192 domain_name = "example.com" 193 ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1") 194 ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2") 195 frame_classifier_start = [4, 91, 4] 196 frame_classifier_end = [12, 34, 12, 34, 0, 17, 0] 197 frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end) 198 frame_len = len(frame_classifier) 199 qos_ie = prepare_qos_ie(1, 0, 22, 0, 0, frame_classifier, frame_len, domain_name) 200 201 # QoS IE 2 202 ipv6_src_addr = socket.inet_pton(socket.AF_INET6, "aaaa:bbbb:cccc::1") 203 ipv6_dest_addr = socket.inet_pton(socket.AF_INET6, "aaaa:bbbb:cccc::2") 204 frame_classifier_start = [4, 79, 6] 205 frame_classifier_end = [0, 12, 34, 0, 0, 17, 0, 0, 0] 206 frame_classifier = bytes(frame_classifier_start) + ipv6_src_addr + ipv6_dest_addr + bytes(frame_classifier_end) 207 frame_len = len(frame_classifier) 208 ie = prepare_qos_ie(5, 0, 48, 12345, 23456, frame_classifier, frame_len, 209 None) 210 qos_ie += ie 211 212 # QoS IE 3 213 ie = prepare_qos_ie(4, 0, 32, 12345, 23456, 0, 0, domain_name) 214 qos_ie += ie 215 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 216 217 event = "<3>CTRL-EVENT-DSCP-POLICY request_start" 218 validate_dscp_req_event(dev[0], event) 219 event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=22 ip_version=4 src_ip=192.168.0.1 src_port=3106 dst_port=3106 protocol=17 domain_name=example.com" 220 validate_dscp_req_event(dev[0], event) 221 event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=5 dscp=48 ip_version=6 src_ip=aaaa:bbbb:cccc::1 dst_ip=aaaa:bbbb:cccc::2 src_port=12 protocol=17 start_port=12345 end_port=23456" 222 validate_dscp_req_event(dev[0], event) 223 event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=4 dscp=32 ip_version=0 start_port=12345 end_port=23456 domain_name=example.com" 224 validate_dscp_req_event(dev[0], event) 225 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 226 validate_dscp_req_event(dev[0], event) 227 228 # Negative Tests 229 230 # No DSCP policy attribute 231 dialog_token = 4 232 domain_name = "example.com" 233 qos_ie = prepare_qos_ie(0, 0, 0, 12345, 23456, frame_classifier, frame_len, 234 domain_name) 235 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 236 event = "<3>CTRL-EVENT-DSCP-POLICY request_start" 237 validate_dscp_req_event(dev[0], event) 238 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 239 validate_dscp_req_event(dev[0], event) 240 241 # No DSCP stream classifier params 242 dialog_token = 6 243 qos_ie = prepare_qos_ie(1, 0, 32, 0, 0, 0, 0, None) 244 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 245 event = "<3>CTRL-EVENT-DSCP-POLICY request_start" 246 validate_dscp_req_event(dev[0], event) 247 event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1" 248 validate_dscp_req_event(dev[0], event) 249 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 250 validate_dscp_req_event(dev[0], event) 251 252 # DSCP request with both destination and domain name 253 dialog_token = 7 254 domain_name = "example.com" 255 ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1") 256 ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2") 257 frame_classifier_start = [4, 69, 4] 258 frame_classifier_end = [0, 0, 0, 0, 0, 17, 0] 259 frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end) 260 frame_len = len(frame_classifier) 261 qos_ie = prepare_qos_ie(1, 0, 36, 0, 0, frame_classifier, frame_len, 262 domain_name) 263 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 264 event = "<3>CTRL-EVENT-DSCP-POLICY request_start" 265 validate_dscp_req_event(dev[0], event) 266 event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1" 267 validate_dscp_req_event(dev[0], event) 268 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 269 validate_dscp_req_event(dev[0], event) 270 271 # DSCP request with both port range and destination port 272 frame_classifier_start = [4, 81, 4] 273 frame_classifier_end = [0, 0, 23, 45, 0, 17, 0] 274 frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end) 275 frame_len = len(frame_classifier) 276 qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456, frame_classifier, frame_len, 277 None) 278 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 279 event = "<3>CTRL-EVENT-DSCP-POLICY request_start" 280 validate_dscp_req_event(dev[0], event) 281 event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1" 282 validate_dscp_req_event(dev[0], event) 283 event = "<3>CTRL-EVENT-DSCP-POLICY request_end" 284 validate_dscp_req_event(dev[0], event) 285 286 # Too short DSCP Policy Request frame 287 dialog_token += 1 288 send_dscp_req(hapd, da, 1, dialog_token, 0, None, truncate=True) 289 290 # Request Type: Remove 291 dialog_token += 1 292 qos_ie = prepare_qos_ie(1, 1, 36) 293 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 294 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start") 295 validate_dscp_req_event(dev[0], 296 "<3>CTRL-EVENT-DSCP-POLICY remove policy_id=1") 297 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") 298 299 # Request Type: Reserved 300 dialog_token += 1 301 qos_ie = prepare_qos_ie(1, 2, 36) 302 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 303 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start") 304 validate_dscp_req_event(dev[0], 305 "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1") 306 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") 307 308def test_dscp_response(dev, apdev): 309 """DSCP Policy Response""" 310 311 # Positive tests 312 313 # AP with DSCP Capabilities 314 params = {"ssid": "dscp", 315 "ext_capa": 6*"00" + "40", 316 "assocresp_elements": "dd06506f9a230101", 317 "vendor_elements": "dd06506f9a230101"} 318 hapd = ap_sta_connectivity(dev, apdev, params) 319 da = dev[0].own_addr() 320 321 # Sending solicited DSCP response after receiving DSCP request 322 dialog_token = 1 323 domain_name = "example.com" 324 ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1") 325 ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2") 326 frame_classifier_start = [4,91,4] 327 frame_classifier_end = [12,34,12,34,0,17,0] 328 frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end) 329 frame_len = len(frame_classifier) 330 qos_ie = prepare_qos_ie(1, 0, 22, 0, 0, frame_classifier, frame_len, 331 domain_name) 332 ie = prepare_qos_ie(4, 0, 32, 12345, 23456, 0, 0, domain_name) 333 qos_ie += ie 334 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 335 336 ev = dev[0].wait_event(["CTRL-EVENT-DSCP-POLICY"], timeout=5) 337 if ev is None: 338 raise Exception("DSCP event not reported") 339 if "request_start" not in ev: 340 raise Exception("Unexpected DSCP event: " + ev) 341 cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=4 status=0" 342 if "OK" not in dev[0].request(cmd): 343 raise Exception("Sending DSCP Response failed") 344 response = b'\x7e\x50\x6f\x9a\x1a\x02\x01\x00\x02\x01\x00\x04\x00' 345 handle_dscp_response(hapd, response) 346 347 # Unsolicited DSCP Response without status duples 348 cmd = "DSCP_RESP reset more" 349 if "OK" not in dev[0].request(cmd): 350 raise Exception("Sending DSCP Response failed") 351 response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x03\x00' 352 handle_dscp_response(hapd, response) 353 354 # Unsolicited DSCP Response with one status duple 355 cmd = "DSCP_RESP policy_id=2 status=0" 356 if "OK" not in dev[0].request(cmd): 357 raise Exception("Sending DSCP Response failed") 358 response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x00\x01\x02\x00' 359 handle_dscp_response(hapd, response) 360 361 # Negative tests 362 363 # Send solicited DSCP Response without prior DSCP request 364 cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=5 status=0" 365 if "FAIL" not in dev[0].request(cmd): 366 raise Exception("Able to send invalid DSCP response") 367 368def test_dscp_unsolicited_req_at_assoc(dev, apdev): 369 """DSCP Policy and unsolicited request at association""" 370 params = {"ssid": "dscp", 371 "ext_capa": 6*"00" + "40", 372 "assocresp_elements": "dd06506f9a230103", 373 "vendor_elements": "dd06506f9a230103"} 374 hapd = ap_sta_connectivity(dev, apdev, params) 375 da = dev[0].own_addr() 376 377 dialog_token = 1 378 qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456) 379 send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie) 380 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start") 381 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=36 ip_version=0 start_port=12345 end_port=23456") 382 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end") 383 384 cmd = "DSCP_QUERY wildcard" 385 if "OK" not in dev[0].request(cmd): 386 raise Exception("Sending DSCP Query failed") 387 388def test_dscp_missing_unsolicited_req_at_assoc(dev, apdev): 389 """DSCP Policy and missing unsolicited request at association""" 390 params = {"ssid": "dscp", 391 "ext_capa": 6*"00" + "40", 392 "assocresp_elements": "dd06506f9a230103", 393 "vendor_elements": "dd06506f9a230103"} 394 hapd = ap_sta_connectivity(dev, apdev, params) 395 da = dev[0].own_addr() 396 397 cmd = "DSCP_QUERY wildcard" 398 if "FAIL" not in dev[0].request(cmd): 399 raise Exception("DSCP_QUERY accepted during wait for unsolicited requesdt") 400 time.sleep(5) 401 validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_wait end") 402 403 cmd = "DSCP_QUERY wildcard" 404 if "OK" not in dev[0].request(cmd): 405 raise Exception("Sending DSCP Query failed") 406