1# QoS Mapping tests
2# Copyright (c) 2013-2016, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7from remotehost import remote_compatible
8import time
9import logging
10logger = logging.getLogger()
11
12import hwsim_utils
13import hostapd
14from utils import HwsimSkip, alloc_fail, fail_test
15from wlantest import Wlantest
16
17def check_qos_map(ap, hapd, dev, sta, dscp, tid, ap_tid=None):
18    if not ap_tid:
19        ap_tid = tid
20    bssid = ap['bssid']
21    wt = Wlantest()
22    wt.clear_sta_counters(bssid, sta)
23    hwsim_utils.test_connectivity(dev, hapd, dscp=dscp, config=False)
24    sleep_time = 0.02 if dev.hostname is None else 0.2
25    time.sleep(sleep_time)
26    tx = wt.get_tx_tid(bssid, sta, tid)
27    if tx == 0:
28        [tx, rx] = wt.get_tid_counters(bssid, sta)
29        logger.info("Expected TX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(tx))
30        raise Exception("No STA->AP data frame using the expected TID")
31    rx = wt.get_rx_tid(bssid, sta, ap_tid)
32    if rx == 0:
33        [tx, rx] = wt.get_tid_counters(bssid, sta)
34        logger.info("Expected RX DSCP " + str(dscp) + " with TID " + str(ap_tid) + " but counters: " + str(rx))
35        raise Exception("No AP->STA data frame using the expected TID")
36
37@remote_compatible
38def test_ap_qosmap(dev, apdev):
39    """QoS mapping"""
40    drv_flags = dev[0].get_driver_status_field("capa.flags")
41    if int(drv_flags, 0) & 0x40000000 == 0:
42        raise HwsimSkip("Driver does not support QoS Map")
43    ssid = "test-qosmap"
44    params = {"ssid": ssid}
45    params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55'
46    hapd = hostapd.add_ap(apdev[0], params)
47    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
48    hapd.wait_sta()
49    time.sleep(0.1)
50    addr = dev[0].p2p_interface_addr()
51    dev[0].request("DATA_TEST_CONFIG 1")
52    hapd.request("DATA_TEST_CONFIG 1")
53    Wlantest.setup(hapd)
54    check_qos_map(apdev[0], hapd, dev[0], addr, 53, 2)
55    check_qos_map(apdev[0], hapd, dev[0], addr, 22, 6)
56    check_qos_map(apdev[0], hapd, dev[0], addr, 8, 0)
57    check_qos_map(apdev[0], hapd, dev[0], addr, 15, 0)
58    check_qos_map(apdev[0], hapd, dev[0], addr, 0, 1)
59    check_qos_map(apdev[0], hapd, dev[0], addr, 7, 1)
60    check_qos_map(apdev[0], hapd, dev[0], addr, 16, 3)
61    check_qos_map(apdev[0], hapd, dev[0], addr, 31, 3)
62    check_qos_map(apdev[0], hapd, dev[0], addr, 32, 4)
63    check_qos_map(apdev[0], hapd, dev[0], addr, 39, 4)
64    check_qos_map(apdev[0], hapd, dev[0], addr, 40, 6)
65    check_qos_map(apdev[0], hapd, dev[0], addr, 47, 6)
66    check_qos_map(apdev[0], hapd, dev[0], addr, 48, 7)
67    check_qos_map(apdev[0], hapd, dev[0], addr, 55, 7)
68    hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55")
69    hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address"))
70    check_qos_map(apdev[0], hapd, dev[0], addr, 53, 7)
71    check_qos_map(apdev[0], hapd, dev[0], addr, 22, 6)
72    check_qos_map(apdev[0], hapd, dev[0], addr, 48, 7)
73    check_qos_map(apdev[0], hapd, dev[0], addr, 55, 7)
74    check_qos_map(apdev[0], hapd, dev[0], addr, 56, 56 >> 3)
75    check_qos_map(apdev[0], hapd, dev[0], addr, 63, 63 >> 3)
76    dev[0].request("DATA_TEST_CONFIG 0")
77    hapd.request("DATA_TEST_CONFIG 0")
78
79def build_dscp_to_tid(old=False):
80    # Build initial default mapping
81    dscp_to_tid = {}
82    for dscp in [0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]:
83        dscp_to_tid[dscp] = dscp >> 3
84
85    if old:
86        return dscp_to_tid
87
88    # Update the mapping based on the recommendations in section 4 in RFC 8325
89    dscp_to_tid[16] = 0
90    dscp_to_tid[24] = 4
91    dscp_to_tid[48] = 7
92
93    return dscp_to_tid
94
95@remote_compatible
96def test_ap_qosmap_default(dev, apdev):
97    """QoS mapping with default values"""
98    ssid = "test-qosmap-default"
99    params = {"ssid": ssid}
100    hapd = hostapd.add_ap(apdev[0], params)
101    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
102    hapd.wait_sta()
103    time.sleep(0.1)
104    addr = dev[0].p2p_interface_addr()
105    dev[0].request("DATA_TEST_CONFIG 1")
106    hapd.request("DATA_TEST_CONFIG 1")
107    Wlantest.setup(hapd)
108
109    try:
110        dscp_to_tid = build_dscp_to_tid()
111        for dscp, tid in dscp_to_tid.items():
112            check_qos_map(apdev[0], hapd, dev[0], addr, dscp, tid)
113        logger.info("Kernel using new mapping")
114    except Exception as e:
115        if str(e) == "No STA->AP data frame using the expected TID":
116            dscp_to_tid = build_dscp_to_tid(old=True)
117            for dscp, tid in dscp_to_tid.items():
118                check_qos_map(apdev[0], hapd, dev[0], addr, dscp, tid)
119            logger.info("Kernel using old mapping")
120        else:
121            raise
122
123    dev[0].request("DATA_TEST_CONFIG 0")
124    hapd.request("DATA_TEST_CONFIG 0")
125
126@remote_compatible
127def test_ap_qosmap_default_acm(dev, apdev):
128    """QoS mapping with default values and ACM=1 for VO/VI"""
129    ssid = "test-qosmap-default"
130    params = {"ssid": ssid,
131              "wmm_ac_bk_aifs": "7",
132              "wmm_ac_bk_cwmin": "4",
133              "wmm_ac_bk_cwmax": "10",
134              "wmm_ac_bk_txop_limit": "0",
135              "wmm_ac_bk_acm": "0",
136              "wmm_ac_be_aifs": "3",
137              "wmm_ac_be_cwmin": "4",
138              "wmm_ac_be_cwmax": "10",
139              "wmm_ac_be_txop_limit": "0",
140              "wmm_ac_be_acm": "0",
141              "wmm_ac_vi_aifs": "2",
142              "wmm_ac_vi_cwmin": "3",
143              "wmm_ac_vi_cwmax": "4",
144              "wmm_ac_vi_txop_limit": "94",
145              "wmm_ac_vi_acm": "1",
146              "wmm_ac_vo_aifs": "2",
147              "wmm_ac_vo_cwmin": "2",
148              "wmm_ac_vo_cwmax": "2",
149              "wmm_ac_vo_txop_limit": "47",
150              "wmm_ac_vo_acm": "1"}
151    hapd = hostapd.add_ap(apdev[0], params)
152    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
153    addr = dev[0].p2p_interface_addr()
154    hapd.wait_sta()
155    time.sleep(0.1)
156    dev[0].request("DATA_TEST_CONFIG 1")
157    hapd.request("DATA_TEST_CONFIG 1")
158    Wlantest.setup(hapd)
159
160    try:
161        dscp_to_tid = build_dscp_to_tid()
162        for dscp, tid in dscp_to_tid.items():
163            ap_tid = tid
164
165            # downgrade VI/VO to BE
166            if tid in [4, 5, 6, 7]:
167                tid = 3
168            check_qos_map(apdev[0], hapd, dev[0], addr, dscp, tid, ap_tid)
169        logger.info("Kernel using new mapping")
170    except Exception as e:
171        if str(e) == "No STA->AP data frame using the expected TID":
172            dscp_to_tid = build_dscp_to_tid(old=True)
173            for dscp, tid in dscp_to_tid.items():
174                ap_tid = tid
175
176                # downgrade VI/VO to BE
177                if tid in [4, 5, 6, 7]:
178                    tid = 3
179                check_qos_map(apdev[0], hapd, dev[0], addr, dscp, tid, ap_tid)
180            logger.info("Kernel using old mapping")
181        else:
182            raise
183    dev[0].request("DATA_TEST_CONFIG 0")
184    hapd.request("DATA_TEST_CONFIG 0")
185
186@remote_compatible
187def test_ap_qosmap_invalid(dev, apdev):
188    """QoS mapping ctrl_iface error handling"""
189    ssid = "test-qosmap"
190    params = {"ssid": ssid}
191    hapd = hostapd.add_ap(apdev[0], params)
192    if "FAIL" not in hapd.request("SEND_QOS_MAP_CONF 00:11:22:33:44:55"):
193        raise Exception("Unexpected SEND_QOS_MAP_CONF success")
194    if "FAIL" not in hapd.request("SET_QOS_MAP_SET "):
195        raise Exception("Unexpected SET_QOS_MAP_SET success")
196    if "FAIL" not in hapd.request("SET_QOS_MAP_SET 1,2,3"):
197        raise Exception("Unexpected SET_QOS_MAP_SET success")
198    if "FAIL" not in hapd.request("SET_QOS_MAP_SET 1,-2,3"):
199        raise Exception("Unexpected SET_QOS_MAP_SET success")
200    if "FAIL" not in hapd.request("SET_QOS_MAP_SET 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59"):
201        raise Exception("Unexpected SET_QOS_MAP_SET success")
202    if "FAIL" not in hapd.request("SET_QOS_MAP_SET 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21"):
203        raise Exception("Unexpected SET_QOS_MAP_SET success")
204
205    if "FAIL" in hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55"):
206        raise Exception("Unexpected SET_QOS_MAP_SET failure")
207    if "FAIL" not in hapd.request("SEND_QOS_MAP_CONF 00:11:22:33:44:55"):
208        raise Exception("Unexpected SEND_QOS_MAP_CONF success")
209    if "FAIL" not in hapd.request("SEND_QOS_MAP_CONF 00:11:22:33:44"):
210        raise Exception("Unexpected SEND_QOS_MAP_CONF success")
211
212    with fail_test(hapd, 1, "hostapd_ctrl_iface_set_qos_map_set"):
213        if "FAIL" not in hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55"):
214            raise Exception("SET_QOS_MAP_SET accepted during forced driver failure")
215
216    dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
217    hapd.wait_sta()
218    time.sleep(0.1)
219    with alloc_fail(hapd, 1,
220                    "wpabuf_alloc;hostapd_ctrl_iface_send_qos_map_conf"):
221        if "FAIL" not in hapd.request("SEND_QOS_MAP_CONF " + dev[0].own_addr()):
222            raise Exception("SEND_QOS_MAP_CONF accepted during OOM")
223