1# hwsim testing utilities 2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import os 8import time 9import logging 10logger = logging.getLogger() 11 12from wpasupplicant import WpaSupplicant 13 14def sync_carrier(dev, ifname=None): 15 ifname = ifname or dev.ifname 16 carrier_p2p = None 17 try: 18 with open(f'/sys/class/net/{ifname}/carrier', 'r') as f: 19 carrier_main = f.read().strip() == '1' 20 except FileNotFoundError: 21 return 22 if (isinstance(dev, WpaSupplicant) and dev.group_ifname and 23 dev.group_ifname != ifname): 24 ifname = dev.group_ifname 25 try: 26 with open(f'/sys/class/net/{ifname}/carrier', 'r') as f: 27 carrier_p2p = f.read().strip() == '1' 28 except FileNotFoundError: 29 pass 30 assert carrier_main or carrier_p2p, "sending data without carrier won't work" 31 32def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2): 33 cmd = "DATA_TEST_CONFIG 1" 34 if ifname1: 35 cmd = cmd + " ifname=" + ifname1 36 if dev1group: 37 res = dev1.group_request(cmd) 38 else: 39 res = dev1.request(cmd) 40 if "OK" not in res: 41 raise Exception("Failed to enable data test functionality") 42 43 sync_carrier(dev1, ifname1) 44 45 cmd = "DATA_TEST_CONFIG 1" 46 if ifname2: 47 cmd = cmd + " ifname=" + ifname2 48 if dev2group: 49 res = dev2.group_request(cmd) 50 else: 51 res = dev2.request(cmd) 52 if "OK" not in res: 53 raise Exception("Failed to enable data test functionality") 54 55 sync_carrier(dev2, ifname2) 56 57def run_multicast_connectivity_test(dev1, dev2, tos=None, 58 dev1group=False, dev2group=False, 59 ifname1=None, ifname2=None, 60 config=True, timeout=5, 61 send_len=None, multicast_to_unicast=False, 62 broadcast_retry_c=1): 63 addr1 = dev1.get_addr(dev1group) 64 addr2 = dev2.get_addr(dev2group) 65 66 if config: 67 config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) 68 69 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) 70 if send_len is not None: 71 cmd += " len=" + str(send_len) 72 for i in range(broadcast_retry_c): 73 try: 74 if dev1group: 75 dev1.group_request(cmd) 76 else: 77 dev1.request(cmd) 78 if dev2group: 79 ev = dev2.wait_group_event(["DATA-TEST-RX"], 80 timeout=timeout) 81 else: 82 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) 83 if ev is None: 84 raise Exception("dev1->dev2 broadcast data delivery failed") 85 if multicast_to_unicast: 86 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev: 87 raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing") 88 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: 89 raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)") 90 else: 91 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: 92 raise Exception("Unexpected dev1->dev2 broadcast data result") 93 if send_len is not None: 94 if " len=" + str(send_len) not in ev: 95 raise Exception("Unexpected dev1->dev2 broadcast data length") 96 else: 97 if " len=" in ev: 98 raise Exception("Unexpected dev1->dev2 broadcast data length") 99 break 100 except Exception as e: 101 if i == broadcast_retry_c - 1: 102 raise 103 104def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, 105 ifname1=None, ifname2=None, config=True, timeout=5, 106 multicast_to_unicast=False, broadcast=True, 107 send_len=None): 108 addr1 = dev1.get_addr(dev1group) 109 addr2 = dev2.get_addr(dev2group) 110 111 dev1.dump_monitor() 112 dev2.dump_monitor() 113 114 if dev1.hostname is None and dev2.hostname is None: 115 broadcast_retry_c = 1 116 else: 117 broadcast_retry_c = 10 118 119 try: 120 if config: 121 config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) 122 123 cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos) 124 if send_len is not None: 125 cmd += " len=" + str(send_len) 126 if dev1group: 127 dev1.group_request(cmd) 128 else: 129 dev1.request(cmd) 130 if dev2group: 131 ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout) 132 else: 133 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) 134 if ev is None: 135 raise Exception("dev1->dev2 unicast data delivery failed") 136 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: 137 raise Exception("Unexpected dev1->dev2 unicast data result") 138 if send_len is not None: 139 if " len=" + str(send_len) not in ev: 140 raise Exception("Unexpected dev1->dev2 unicast data length") 141 else: 142 if " len=" in ev: 143 raise Exception("Unexpected dev1->dev2 unicast data length") 144 145 if broadcast: 146 run_multicast_connectivity_test(dev1, dev2, tos, 147 dev1group, dev2group, 148 ifname1, ifname2, False, timeout, 149 send_len, False, broadcast_retry_c) 150 151 cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos) 152 if send_len is not None: 153 cmd += " len=" + str(send_len) 154 if dev2group: 155 dev2.group_request(cmd) 156 else: 157 dev2.request(cmd) 158 if dev1group: 159 ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout) 160 else: 161 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) 162 if ev is None: 163 raise Exception("dev2->dev1 unicast data delivery failed") 164 if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: 165 raise Exception("Unexpected dev2->dev1 unicast data result") 166 if send_len is not None: 167 if " len=" + str(send_len) not in ev: 168 raise Exception("Unexpected dev2->dev1 unicast data length") 169 else: 170 if " len=" in ev: 171 raise Exception("Unexpected dev2->dev1 unicast data length") 172 173 if broadcast: 174 run_multicast_connectivity_test(dev2, dev1, tos, 175 dev2group, dev1group, 176 ifname2, ifname1, False, timeout, 177 send_len, multicast_to_unicast, 178 broadcast_retry_c) 179 180 finally: 181 if config: 182 if dev1group: 183 dev1.group_request("DATA_TEST_CONFIG 0") 184 else: 185 dev1.request("DATA_TEST_CONFIG 0") 186 if dev2group: 187 dev2.group_request("DATA_TEST_CONFIG 0") 188 else: 189 dev2.request("DATA_TEST_CONFIG 0") 190 191def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, 192 dev1group=False, dev2group=False, 193 ifname1=None, ifname2=None, config=True, timeout=5, 194 multicast_to_unicast=False, success_expected=True, 195 broadcast=True, send_len=None): 196 if dscp: 197 tos = dscp << 2 198 if not tos: 199 tos = 0 200 201 success = False 202 last_err = None 203 for i in range(0, max_tries): 204 try: 205 run_connectivity_test(dev1, dev2, tos, dev1group, dev2group, 206 ifname1, ifname2, config=config, 207 timeout=timeout, 208 multicast_to_unicast=multicast_to_unicast, 209 broadcast=broadcast, send_len=send_len) 210 success = True 211 break 212 except Exception as e: 213 last_err = e 214 if i + 1 < max_tries: 215 time.sleep(1) 216 if success_expected and not success: 217 raise Exception(last_err) 218 if not success_expected and success: 219 raise Exception("Unexpected connectivity detected") 220 221def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None, 222 max_tries=1, timeout=5): 223 test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname, 224 max_tries=max_tries, timeout=timeout) 225 226def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None): 227 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True) 228 229def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None): 230 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False) 231 232def test_connectivity_sta(dev1, dev2, dscp=None, tos=None): 233 test_connectivity(dev1, dev2, dscp, tos) 234 235(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = list(range(4)) 236 237def set_powersave(dev, val): 238 phy = dev.get_driver_status_field("phyname") 239 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy 240 data = '%d' % val 241 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True) 242 if res != 0: 243 raise Exception("Failed to set power save for device") 244 245def set_group_map(dev, val): 246 phy = dev.get_driver_status_field("phyname") 247 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy 248 data = '%d' % val 249 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True) 250 if res != 0: 251 raise Exception("Failed to set group map for %s" % phy) 252 253def set_rx_rssi(dev, val): 254 """ 255 Configure signal strength when receiving transmitted frames. 256 mac80211_hwsim driver sets rssi to: TX power - 50 257 According to that set tx_power in order to get the desired RSSI. 258 Valid RSSI range: -50 to -30. 259 """ 260 tx_power = (val + 50) * 100 261 ifname = dev.get_driver_status_field("ifname") 262 (res, data) = dev.cmd_execute(['iw', ifname, 'set', 'txpower', 263 'fixed', str(tx_power)]) 264 if res != 0: 265 raise Exception("Failed to set RSSI to %d" % val) 266 267def reset_rx_rssi(dev): 268 set_rx_rssi(dev, -30) 269