1 # hwsim testing utilities
2 # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import os
8 import time
9 import logging
10 logger = logging.getLogger()
11 
12 from wpasupplicant import WpaSupplicant
13 
14 def sync_carrier(dev, ifname=None):
15     ifname = ifname or dev.ifname
16     carrier_p2p = None
17     try:
18         with open(f'/sys/class/net/{ifname}/carrier', 'r') as f:
19             carrier_main = f.read().strip() == '1'
20     except FileNotFoundError:
21         return
22     if (isinstance(dev, WpaSupplicant) and dev.group_ifname and
23         dev.group_ifname != ifname):
24         ifname = dev.group_ifname
25         try:
26             with open(f'/sys/class/net/{ifname}/carrier', 'r') as f:
27                 carrier_p2p = f.read().strip() == '1'
28         except FileNotFoundError:
29             pass
30     assert carrier_main or carrier_p2p, "sending data without carrier won't work"
31 
32 def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
33     cmd = "DATA_TEST_CONFIG 1"
34     if ifname1:
35         cmd = cmd + " ifname=" + ifname1
36     if dev1group:
37         res = dev1.group_request(cmd)
38     else:
39         res = dev1.request(cmd)
40     if "OK" not in res:
41         raise Exception("Failed to enable data test functionality")
42 
43     sync_carrier(dev1, ifname1)
44 
45     cmd = "DATA_TEST_CONFIG 1"
46     if ifname2:
47         cmd = cmd + " ifname=" + ifname2
48     if dev2group:
49         res = dev2.group_request(cmd)
50     else:
51         res = dev2.request(cmd)
52     if "OK" not in res:
53         raise Exception("Failed to enable data test functionality")
54 
55     sync_carrier(dev2, ifname2)
56 
57 def run_multicast_connectivity_test(dev1, dev2, tos=None,
58                                     dev1group=False, dev2group=False,
59                                     ifname1=None, ifname2=None,
60                                     config=True, timeout=5,
61                                     send_len=None, multicast_to_unicast=False,
62                                     broadcast_retry_c=1):
63     addr1 = dev1.get_addr(dev1group)
64     addr2 = dev2.get_addr(dev2group)
65 
66     if config:
67         config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
68 
69     cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
70     if send_len is not None:
71         cmd += " len=" + str(send_len)
72     for i in range(broadcast_retry_c):
73         try:
74             if dev1group:
75                 dev1.group_request(cmd)
76             else:
77                 dev1.request(cmd)
78             if dev2group:
79                 ev = dev2.wait_group_event(["DATA-TEST-RX"],
80                                            timeout=timeout)
81             else:
82                 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
83             if ev is None:
84                 raise Exception("dev1->dev2 broadcast data delivery failed")
85             if multicast_to_unicast:
86                 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
87                     raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
88                 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
89                     raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
90             else:
91                 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
92                     raise Exception("Unexpected dev1->dev2 broadcast data result")
93             if send_len is not None:
94                 if " len=" + str(send_len) not in ev:
95                     raise Exception("Unexpected dev1->dev2 broadcast data length")
96             else:
97                 if " len=" in ev:
98                     raise Exception("Unexpected dev1->dev2 broadcast data length")
99             break
100         except Exception as e:
101             if i == broadcast_retry_c - 1:
102                 raise
103 
104 def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
105                           ifname1=None, ifname2=None, config=True, timeout=5,
106                           multicast_to_unicast=False, broadcast=True,
107                           send_len=None):
108     addr1 = dev1.get_addr(dev1group)
109     addr2 = dev2.get_addr(dev2group)
110 
111     dev1.dump_monitor()
112     dev2.dump_monitor()
113 
114     if dev1.hostname is None and dev2.hostname is None:
115         broadcast_retry_c = 1
116     else:
117         broadcast_retry_c = 10
118 
119     try:
120         if config:
121             config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
122 
123         cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
124         if send_len is not None:
125             cmd += " len=" + str(send_len)
126         if dev1group:
127             dev1.group_request(cmd)
128         else:
129             dev1.request(cmd)
130         if dev2group:
131             ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
132         else:
133             ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
134         if ev is None:
135             raise Exception("dev1->dev2 unicast data delivery failed")
136         if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
137             raise Exception("Unexpected dev1->dev2 unicast data result")
138         if send_len is not None:
139             if " len=" + str(send_len) not in ev:
140                 raise Exception("Unexpected dev1->dev2 unicast data length")
141         else:
142             if " len=" in ev:
143                 raise Exception("Unexpected dev1->dev2 unicast data length")
144 
145         if broadcast:
146             run_multicast_connectivity_test(dev1, dev2, tos,
147                                             dev1group, dev2group,
148                                             ifname1, ifname2, False, timeout,
149                                             send_len, False, broadcast_retry_c)
150 
151         cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
152         if send_len is not None:
153             cmd += " len=" + str(send_len)
154         if dev2group:
155             dev2.group_request(cmd)
156         else:
157             dev2.request(cmd)
158         if dev1group:
159             ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
160         else:
161             ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
162         if ev is None:
163             raise Exception("dev2->dev1 unicast data delivery failed")
164         if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
165             raise Exception("Unexpected dev2->dev1 unicast data result")
166         if send_len is not None:
167             if " len=" + str(send_len) not in ev:
168                 raise Exception("Unexpected dev2->dev1 unicast data length")
169         else:
170             if " len=" in ev:
171                 raise Exception("Unexpected dev2->dev1 unicast data length")
172 
173         if broadcast:
174             run_multicast_connectivity_test(dev2, dev1, tos,
175                                             dev2group, dev1group,
176                                             ifname2, ifname1, False, timeout,
177                                             send_len, multicast_to_unicast,
178                                             broadcast_retry_c)
179 
180     finally:
181         if config:
182             if dev1group:
183                 dev1.group_request("DATA_TEST_CONFIG 0")
184             else:
185                 dev1.request("DATA_TEST_CONFIG 0")
186             if dev2group:
187                 dev2.group_request("DATA_TEST_CONFIG 0")
188             else:
189                 dev2.request("DATA_TEST_CONFIG 0")
190 
191 def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
192                       dev1group=False, dev2group=False,
193                       ifname1=None, ifname2=None, config=True, timeout=5,
194                       multicast_to_unicast=False, success_expected=True,
195                       broadcast=True, send_len=None):
196     if dscp:
197         tos = dscp << 2
198     if not tos:
199         tos = 0
200 
201     success = False
202     last_err = None
203     for i in range(0, max_tries):
204         try:
205             run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
206                                   ifname1, ifname2, config=config,
207                                   timeout=timeout,
208                                   multicast_to_unicast=multicast_to_unicast,
209                                   broadcast=broadcast, send_len=send_len)
210             success = True
211             break
212         except Exception as e:
213             last_err = e
214             if i + 1 < max_tries:
215                 time.sleep(1)
216     if success_expected and not success:
217         raise Exception(last_err)
218     if not success_expected and success:
219         raise Exception("Unexpected connectivity detected")
220 
221 def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
222                             max_tries=1, timeout=5):
223     test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
224                       max_tries=max_tries, timeout=timeout)
225 
226 def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
227     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
228 
229 def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
230     test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
231 
232 def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
233     test_connectivity(dev1, dev2, dscp, tos)
234 
235 (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = list(range(4))
236 
237 def set_powersave(dev, val):
238     phy = dev.get_driver_status_field("phyname")
239     fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
240     data = '%d' % val
241     (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
242     if res != 0:
243         raise Exception("Failed to set power save for device")
244 
245 def set_group_map(dev, val):
246     phy = dev.get_driver_status_field("phyname")
247     fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy
248     data = '%d' % val
249     (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
250     if res != 0:
251         raise Exception("Failed to set group map for %s" % phy)
252 
253 def set_rx_rssi(dev, val):
254     """
255     Configure signal strength when receiving transmitted frames.
256     mac80211_hwsim driver sets rssi to: TX power - 50
257     According to that set tx_power in order to get the desired RSSI.
258     Valid RSSI range: -50 to -30.
259     """
260     tx_power = (val + 50) * 100
261     ifname = dev.get_driver_status_field("ifname")
262     (res, data) = dev.cmd_execute(['iw', ifname, 'set', 'txpower',
263                                    'fixed', str(tx_power)])
264     if res != 0:
265         raise Exception("Failed to set RSSI to %d" % val)
266 
267 def reset_rx_rssi(dev):
268     set_rx_rssi(dev, -30)
269