1 # Python class for controlling wlantest
2 # Copyright (c) 2013-2019, Jouni Malinen <j@w1.fi>
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import re
8 import os
9 import posixpath
10 import time
11 import subprocess
12 import logging
13 import wpaspy
14 
15 logger = logging.getLogger()
16 
17 class Wlantest:
18     remote_host = None
19     setup_params = None
20     exe_thread = None
21     exe_res = []
22     monitor_mod = None
23     setup_done = False
24 
25     @classmethod
26     def stop_remote_wlantest(cls):
27         if cls.exe_thread is None:
28             # Local flow - no need for remote operations
29             return
30 
31         cls.remote_host.execute(["killall", "-9", "wlantest"])
32         cls.remote_host.thread_wait(cls.exe_thread, 5)
33         cls.exe_thread = None
34         cls.exe_res = []
35 
36     @classmethod
37     def reset_remote_wlantest(cls):
38         cls.stop_remote_wlantest()
39         cls.remote_host = None
40         cls.setup_params = None
41         cls.exe_thread = None
42         cls.exe_res = []
43         cls.monitor_mod = None
44         cls.setup_done = False
45 
46     @classmethod
47     def start_remote_wlantest(cls):
48         if cls.remote_host is None:
49             # Local flow - no need for remote operations
50             return
51         if cls.exe_thread is not None:
52             raise Exception("Cannot start wlantest twice")
53 
54         log_dir = cls.setup_params['log_dir']
55         ifaces = re.split('; | |, ', cls.remote_host.ifname)
56         ifname = ifaces[0]
57         exe = cls.setup_params["wlantest"]
58         tc_name = cls.setup_params["tc_name"]
59         base_log_name = tc_name + "_wlantest_" + \
60                         cls.remote_host.name + "_" + ifname
61         log_file = posixpath.join(log_dir, base_log_name + ".log")
62         pcap_file = posixpath.join(log_dir, base_log_name + ".pcapng")
63         cmd = "{} -i {} -n {} -c -dtN -L {}".format(exe, ifname,
64                                                     pcap_file, log_file)
65         cls.remote_host.add_log(log_file)
66         cls.remote_host.add_log(pcap_file)
67         cls.exe_thread = cls.remote_host.thread_run(cmd.split(), cls.exe_res)
68         # Give wlantest a chance to start working
69         time.sleep(1)
70 
71     @classmethod
72     def register_remote_wlantest(cls, host, setup_params, monitor_mod):
73         if cls.remote_host is not None:
74             raise Exception("Cannot register remote wlantest twice")
75         cls.remote_host = host
76         cls.setup_params = setup_params
77         cls.monitor_mod = monitor_mod
78         status, buf = host.execute(["which", setup_params['wlantest']])
79         if status != 0:
80             raise Exception(host.name + " - wlantest: " + buf)
81         status, buf = host.execute(["which", setup_params['wlantest_cli']])
82         if status != 0:
83             raise Exception(host.name + " - wlantest_cli: " + buf)
84 
85     @classmethod
86     def chan_from_wpa(cls, wpa, is_p2p=False):
87         if cls.monitor_mod is None:
88             return
89         m = cls.monitor_mod
90         return m.setup(cls.remote_host, [m.get_monitor_params(wpa, is_p2p)])
91 
92     @classmethod
93     def setup(cls, wpa, is_p2p=False):
94         if wpa:
95             cls.chan_from_wpa(wpa, is_p2p)
96         cls.start_remote_wlantest()
97         cls.setup_done = True
98 
99     def __init__(self):
100         if not self.setup_done:
101             raise Exception("Cannot create Wlantest instance before setup()")
102         if os.path.isfile('../../wlantest/wlantest_cli'):
103             self.wlantest_cli = '../../wlantest/wlantest_cli'
104         else:
105             self.wlantest_cli = 'wlantest_cli'
106 
107     def cli_cmd(self, params):
108         if self.remote_host is not None:
109             exe = self.setup_params["wlantest_cli"]
110             ret = self.remote_host.execute([exe] + params)
111             if ret[0] != 0:
112                 raise Exception("wlantest_cli failed")
113             return ret[1]
114         else:
115             return subprocess.check_output([self.wlantest_cli] + params).decode()
116 
117     def flush(self):
118         res = self.cli_cmd(["flush"])
119         if "FAIL" in res:
120             raise Exception("wlantest_cli flush failed")
121 
122     def relog(self):
123         res = self.cli_cmd(["relog"])
124         if "FAIL" in res:
125             raise Exception("wlantest_cli relog failed")
126 
127     def add_passphrase(self, passphrase):
128         res = self.cli_cmd(["add_passphrase", passphrase])
129         if "FAIL" in res:
130             raise Exception("wlantest_cli add_passphrase failed")
131 
132     def add_wepkey(self, key):
133         res = self.cli_cmd(["add_wepkey", key])
134         if "FAIL" in res:
135             raise Exception("wlantest_cli add_key failed")
136 
137     def info_bss(self, field, bssid):
138         res = self.cli_cmd(["info_bss", field, bssid])
139         if "FAIL" in res:
140             raise Exception("Could not get BSS info from wlantest for " + bssid)
141         return res
142 
143     def get_bss_counter(self, field, bssid):
144         try:
145             res = self.cli_cmd(["get_bss_counter", field, bssid])
146         except Exception as e:
147             return 0
148         if "FAIL" in res:
149             return 0
150         return int(res)
151 
152     def clear_bss_counters(self, bssid):
153         self.cli_cmd(["clear_bss_counters", bssid])
154 
155     def info_sta(self, field, bssid, addr):
156         res = self.cli_cmd(["info_sta", field, bssid, addr])
157         if "FAIL" in res:
158             raise Exception("Could not get STA info from wlantest for " + addr)
159         return res
160 
161     def get_sta_counter(self, field, bssid, addr):
162         res = self.cli_cmd(["get_sta_counter", field, bssid, addr])
163         if "FAIL" in res:
164             raise Exception("wlantest_cli command failed")
165         return int(res)
166 
167     def clear_sta_counters(self, bssid, addr):
168         res = self.cli_cmd(["clear_sta_counters", bssid, addr])
169         if "FAIL" in res:
170             raise Exception("wlantest_cli command failed")
171 
172     def tdls_clear(self, bssid, addr1, addr2):
173         self.cli_cmd(["clear_tdls_counters", bssid, addr1, addr2])
174 
175     def get_tdls_counter(self, field, bssid, addr1, addr2):
176         res = self.cli_cmd(["get_tdls_counter", field, bssid, addr1, addr2])
177         if "FAIL" in res:
178             raise Exception("wlantest_cli command failed")
179         return int(res)
180 
181     def require_ap_pmf_mandatory(self, bssid):
182         res = self.info_bss("rsn_capab", bssid)
183         if "MFPR" not in res:
184             raise Exception("AP did not require PMF")
185         if "MFPC" not in res:
186             raise Exception("AP did not enable PMF")
187         res = self.info_bss("key_mgmt", bssid)
188         if "PSK-SHA256" not in res:
189             raise Exception("AP did not enable SHA256-based AKM for PMF")
190 
191     def require_ap_pmf_optional(self, bssid):
192         res = self.info_bss("rsn_capab", bssid)
193         if "MFPR" in res:
194             raise Exception("AP required PMF")
195         if "MFPC" not in res:
196             raise Exception("AP did not enable PMF")
197 
198     def require_ap_no_pmf(self, bssid):
199         res = self.info_bss("rsn_capab", bssid)
200         if "MFPR" in res:
201             raise Exception("AP required PMF")
202         if "MFPC" in res:
203             raise Exception("AP enabled PMF")
204 
205     def require_sta_pmf_mandatory(self, bssid, addr):
206         res = self.info_sta("rsn_capab", bssid, addr)
207         if "MFPR" not in res:
208             raise Exception("STA did not require PMF")
209         if "MFPC" not in res:
210             raise Exception("STA did not enable PMF")
211 
212     def require_sta_pmf(self, bssid, addr):
213         res = self.info_sta("rsn_capab", bssid, addr)
214         if "MFPC" not in res:
215             raise Exception("STA did not enable PMF")
216 
217     def require_sta_no_pmf(self, bssid, addr):
218         res = self.info_sta("rsn_capab", bssid, addr)
219         if "MFPC" in res:
220             raise Exception("STA enabled PMF")
221 
222     def require_sta_key_mgmt(self, bssid, addr, key_mgmt):
223         res = self.info_sta("key_mgmt", bssid, addr)
224         if key_mgmt not in res:
225             raise Exception("Unexpected STA key_mgmt")
226 
227     def get_tx_tid(self, bssid, addr, tid):
228         res = self.cli_cmd(["get_tx_tid", bssid, addr, str(tid)])
229         if "FAIL" in res:
230             raise Exception("wlantest_cli command failed")
231         return int(res)
232 
233     def get_rx_tid(self, bssid, addr, tid):
234         res = self.cli_cmd(["get_rx_tid", bssid, addr, str(tid)])
235         if "FAIL" in res:
236             raise Exception("wlantest_cli command failed")
237         return int(res)
238 
239     def get_tid_counters(self, bssid, addr):
240         tx = {}
241         rx = {}
242         for tid in range(0, 17):
243             tx[tid] = self.get_tx_tid(bssid, addr, tid)
244             rx[tid] = self.get_rx_tid(bssid, addr, tid)
245         return [tx, rx]
246 
247 class WlantestCapture:
248     def __init__(self, ifname, output, netns=None):
249         self.cmd = None
250         self.ifname = ifname
251         if os.path.isfile('../../wlantest/wlantest'):
252             bin = '../../wlantest/wlantest'
253         else:
254             bin = 'wlantest'
255         logger.debug("wlantest[%s] starting" % ifname)
256         args = [bin, '-e', '-i', ifname, '-w', output]
257         if netns:
258             args = ['ip', 'netns', 'exec', netns] + args
259         self.cmd = subprocess.Popen(args,
260                                     stdout=subprocess.PIPE,
261                                     stderr=subprocess.PIPE)
262         time.sleep(1)
263 
264     def __enter__(self):
265         return self
266 
267     def __exit__(self, type, value, traceback):
268         time.sleep(0.5)
269         self.close()
270         time.sleep(0.5)
271 
272     def __del__(self):
273         if self.cmd:
274             print("WlantestCapture.__del__ needed to run close()")
275             self.close()
276 
277     def close(self):
278         if not self.cmd:
279             return
280         logger.debug("wlantest[%s] stopping" % self.ifname)
281         self.cmd.terminate()
282         res = self.cmd.communicate()
283         if len(res[0]) > 0:
284             logger.debug("wlantest[%s] stdout: %s" % (self.ifname,
285                                                       res[0].decode().strip()))
286         if len(res[1]) > 0:
287             logger.debug("wlantest[%s] stderr: %s" % (self.ifname,
288                                                       res[1].decode().strip()))
289         self.cmd = None
290