1# hwsim testing utilities
2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8import time
9import logging
10logger = logging.getLogger()
11
12from wpasupplicant import WpaSupplicant
13
14def sync_carrier(dev, ifname=None):
15    ifname = ifname or dev.ifname
16    carrier_p2p = None
17    try:
18        with open(f'/sys/class/net/{ifname}/carrier', 'r') as f:
19            carrier_main = f.read().strip() == '1'
20    except FileNotFoundError:
21        return
22    if (isinstance(dev, WpaSupplicant) and dev.group_ifname and
23        dev.group_ifname != ifname):
24        ifname = dev.group_ifname
25        try:
26            with open(f'/sys/class/net/{ifname}/carrier', 'r') as f:
27                carrier_p2p = f.read().strip() == '1'
28        except FileNotFoundError:
29            pass
30    assert carrier_main or carrier_p2p, "sending data without carrier won't work"
31
32def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
33    cmd = "DATA_TEST_CONFIG 1"
34    if ifname1:
35        cmd = cmd + " ifname=" + ifname1
36    if dev1group:
37        res = dev1.group_request(cmd)
38    else:
39        res = dev1.request(cmd)
40    if "OK" not in res:
41        raise Exception("Failed to enable data test functionality")
42
43    sync_carrier(dev1, ifname1)
44
45    cmd = "DATA_TEST_CONFIG 1"
46    if ifname2:
47        cmd = cmd + " ifname=" + ifname2
48    if dev2group:
49        res = dev2.group_request(cmd)
50    else:
51        res = dev2.request(cmd)
52    if "OK" not in res:
53        raise Exception("Failed to enable data test functionality")
54
55    sync_carrier(dev2, ifname2)
56
57def run_multicast_connectivity_test(dev1, dev2, tos=None,
58                                    dev1group=False, dev2group=False,
59                                    ifname1=None, ifname2=None,
60                                    config=True, timeout=5,
61                                    send_len=None, multicast_to_unicast=False,
62                                    broadcast_retry_c=1):
63    addr1 = dev1.get_addr(dev1group)
64    addr2 = dev2.get_addr(dev2group)
65
66    if config:
67        config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
68
69    cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
70    if send_len is not None:
71        cmd += " len=" + str(send_len)
72    for i in range(broadcast_retry_c):
73        try:
74            if dev1group:
75                dev1.group_request(cmd)
76            else:
77                dev1.request(cmd)
78            if dev2group:
79                ev = dev2.wait_group_event(["DATA-TEST-RX"],
80                                           timeout=timeout)
81            else:
82                ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
83            if ev is None:
84                raise Exception("dev1->dev2 broadcast data delivery failed")
85            if multicast_to_unicast:
86                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
87                    raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
88                if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
89                    raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
90            else:
91                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
92                    raise Exception("Unexpected dev1->dev2 broadcast data result")
93            if send_len is not None:
94                if " len=" + str(send_len) not in ev:
95                    raise Exception("Unexpected dev1->dev2 broadcast data length")
96            else:
97                if " len=" in ev:
98                    raise Exception("Unexpected dev1->dev2 broadcast data length")
99            break
100        except Exception as e:
101            if i == broadcast_retry_c - 1:
102                raise
103
104def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
105                          ifname1=None, ifname2=None, config=True, timeout=5,
106                          multicast_to_unicast=False, broadcast=True,
107                          send_len=None):
108    addr1 = dev1.get_addr(dev1group)
109    addr2 = dev2.get_addr(dev2group)
110
111    dev1.dump_monitor()
112    dev2.dump_monitor()
113
114    if dev1.hostname is None and dev2.hostname is None:
115        broadcast_retry_c = 1
116    else:
117        broadcast_retry_c = 10
118
119    try:
120        if config:
121            config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
122
123        cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
124        if send_len is not None:
125            cmd += " len=" + str(send_len)
126        if dev1group:
127            dev1.group_request(cmd)
128        else:
129            dev1.request(cmd)
130        if dev2group:
131            ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
132        else:
133            ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
134        if ev is None:
135            raise Exception("dev1->dev2 unicast data delivery failed")
136        if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
137            raise Exception("Unexpected dev1->dev2 unicast data result")
138        if send_len is not None:
139            if " len=" + str(send_len) not in ev:
140                raise Exception("Unexpected dev1->dev2 unicast data length")
141        else:
142            if " len=" in ev:
143                raise Exception("Unexpected dev1->dev2 unicast data length")
144
145        if broadcast:
146            run_multicast_connectivity_test(dev1, dev2, tos,
147                                            dev1group, dev2group,
148                                            ifname1, ifname2, False, timeout,
149                                            send_len, False, broadcast_retry_c)
150
151        cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
152        if send_len is not None:
153            cmd += " len=" + str(send_len)
154        if dev2group:
155            dev2.group_request(cmd)
156        else:
157            dev2.request(cmd)
158        if dev1group:
159            ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
160        else:
161            ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
162        if ev is None:
163            raise Exception("dev2->dev1 unicast data delivery failed")
164        if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
165            raise Exception("Unexpected dev2->dev1 unicast data result")
166        if send_len is not None:
167            if " len=" + str(send_len) not in ev:
168                raise Exception("Unexpected dev2->dev1 unicast data length")
169        else:
170            if " len=" in ev:
171                raise Exception("Unexpected dev2->dev1 unicast data length")
172
173        if broadcast:
174            run_multicast_connectivity_test(dev2, dev1, tos,
175                                            dev2group, dev1group,
176                                            ifname2, ifname1, False, timeout,
177                                            send_len, multicast_to_unicast,
178                                            broadcast_retry_c)
179
180    finally:
181        if config:
182            if dev1group:
183                dev1.group_request("DATA_TEST_CONFIG 0")
184            else:
185                dev1.request("DATA_TEST_CONFIG 0")
186            if dev2group:
187                dev2.group_request("DATA_TEST_CONFIG 0")
188            else:
189                dev2.request("DATA_TEST_CONFIG 0")
190
191def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
192                      dev1group=False, dev2group=False,
193                      ifname1=None, ifname2=None, config=True, timeout=5,
194                      multicast_to_unicast=False, success_expected=True,
195                      broadcast=True, send_len=None):
196    if dscp:
197        tos = dscp << 2
198    if not tos:
199        tos = 0
200
201    success = False
202    last_err = None
203    for i in range(0, max_tries):
204        try:
205            run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
206                                  ifname1, ifname2, config=config,
207                                  timeout=timeout,
208                                  multicast_to_unicast=multicast_to_unicast,
209                                  broadcast=broadcast, send_len=send_len)
210            success = True
211            break
212        except Exception as e:
213            last_err = e
214            if i + 1 < max_tries:
215                time.sleep(1)
216    if success_expected and not success:
217        raise Exception(last_err)
218    if not success_expected and success:
219        raise Exception("Unexpected connectivity detected")
220
221def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
222                            max_tries=1, timeout=5):
223    test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
224                      max_tries=max_tries, timeout=timeout)
225
226def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
227    test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
228
229def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
230    test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
231
232def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
233    test_connectivity(dev1, dev2, dscp, tos)
234
235(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = list(range(4))
236
237def set_powersave(dev, val):
238    phy = dev.get_driver_status_field("phyname")
239    fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
240    data = '%d' % val
241    (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
242    if res != 0:
243        raise Exception("Failed to set power save for device")
244
245def set_group_map(dev, val):
246    phy = dev.get_driver_status_field("phyname")
247    fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy
248    data = '%d' % val
249    (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
250    if res != 0:
251        raise Exception("Failed to set group map for %s" % phy)
252
253def set_rx_rssi(dev, val):
254    """
255    Configure signal strength when receiving transmitted frames.
256    mac80211_hwsim driver sets rssi to: TX power - 50
257    According to that set tx_power in order to get the desired RSSI.
258    Valid RSSI range: -50 to -30.
259    """
260    tx_power = (val + 50) * 100
261    ifname = dev.get_driver_status_field("ifname")
262    (res, data) = dev.cmd_execute(['iw', ifname, 'set', 'txpower',
263                                   'fixed', str(tx_power)])
264    if res != 0:
265        raise Exception("Failed to set RSSI to %d" % val)
266
267def reset_rx_rssi(dev):
268    set_rx_rssi(dev, -30)
269