1 # FST configuration tests
2 # Copyright (c) 2015, Qualcomm Atheros, Inc.
3 #
4 # This software may be distributed under the terms of the BSD license.
5 # See README for more details.
6 
7 import logging
8 logger = logging.getLogger()
9 import subprocess
10 import time
11 import os
12 import signal
13 import hostapd
14 import wpasupplicant
15 import utils
16 
17 import fst_test_common
18 
19 class FstLauncherConfig:
20     """FstLauncherConfig class represents configuration to be used for
21     FST config tests related hostapd/wpa_supplicant instances"""
22     def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
23         self.iface = iface
24         self.fst_group = fst_group
25         self.fst_pri = fst_pri
26         self.fst_llt = fst_llt # None llt means no llt parameter will be set
27 
28     def ifname(self):
29         return self.iface
30 
31     def is_ap(self):
32         """Returns True if the configuration is for AP, otherwise - False"""
33         raise Exception("Virtual is_ap() called!")
34 
35     def to_file(self, pathname):
36         """Creates configuration file to be used by FST config tests related
37         hostapd/wpa_supplicant instances"""
38         raise Exception("Virtual to_file() called!")
39 
40 class FstLauncherConfigAP(FstLauncherConfig):
41     """FstLauncherConfigAP class represents configuration to be used for
42     FST config tests related hostapd instance"""
43     def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
44                  fst_llt=None):
45         self.ssid = ssid
46         self.mode = mode
47         self.chan = chan
48         FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
49 
50     def is_ap(self):
51         return True
52 
53     def get_channel(self):
54         return self.chan
55 
56     def to_file(self, pathname):
57         """Creates configuration file to be used by FST config tests related
58         hostapd instance"""
59         with open(pathname, "w") as f:
60             f.write("country_code=US\n"
61                     "interface=%s\n"
62                     "ctrl_interface=/var/run/hostapd\n"
63                     "ssid=%s\n"
64                     "channel=%s\n"
65                     "hw_mode=%s\n"
66                     "ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
67                                         self.mode))
68             if len(self.fst_group) != 0:
69                 f.write("fst_group_id=%s\n"
70                         "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
71                 if self.fst_llt is not None:
72                     f.write("fst_llt=%s\n" % self.fst_llt)
73         with open(pathname, "r") as f:
74             logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
75                                                                 f.read()))
76 
77 class FstLauncherConfigSTA(FstLauncherConfig):
78     """FstLauncherConfig class represents configuration to be used for
79     FST config tests related wpa_supplicant instance"""
80     def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
81         FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
82 
83     def is_ap(self):
84         return False
85 
86     def to_file(self, pathname):
87         """Creates configuration file to be used by FST config tests related
88         wpa_supplicant instance"""
89         with open(pathname, "w") as f:
90             f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
91                 "p2p_no_group_iface=1\n")
92             if len(self.fst_group) != 0:
93                 f.write("fst_group_id=%s\n"
94                     "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
95                 if self.fst_llt is not None:
96                     f.write("fst_llt=%s\n" % self.fst_llt)
97         with open(pathname, "r") as f:
98             logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
99 
100 class FstLauncher:
101     """FstLauncher class is responsible for launching and cleaning up of FST
102     config tests related hostapd/wpa_supplicant instances"""
103     def __init__(self, logpath):
104         self.logger = logging.getLogger()
105         self.fst_logpath = logpath
106         self.cfgs_to_run = []
107         self.hapd_fst_global = '/var/run/hostapd-fst-global'
108         self.wsup_fst_global = '/tmp/fststa'
109         self.nof_aps = 0
110         self.nof_stas = 0
111         self.reg_ctrl = fst_test_common.HapdRegCtrl()
112         self.test_is_supported()
113 
114     def __enter__(self):
115         return self
116 
117     def __exit__(self, type, value, traceback):
118         self.cleanup()
119 
120     @staticmethod
121     def test_is_supported():
122         h = hostapd.HostapdGlobal()
123         resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
124         if not resp.startswith("OK"):
125             raise utils.HwsimSkip("FST not supported")
126         w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
127         resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
128         if not resp.startswith("OK"):
129             raise utils.HwsimSkip("FST not supported")
130 
131     def get_cfg_pathname(self, cfg):
132         """Returns pathname of ifname based configuration file"""
133         return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
134 
135     def add_cfg(self, cfg):
136         """Adds configuration to be used for launching hostapd/wpa_supplicant
137         instances"""
138         if cfg not in self.cfgs_to_run:
139             self.cfgs_to_run.append(cfg)
140         if cfg.is_ap() == True:
141             self.nof_aps += 1
142         else:
143             self.nof_stas += 1
144 
145     def remove_cfg(self, cfg):
146         """Removes configuration previously added with add_cfg"""
147         if cfg in self.cfgs_to_run:
148             self.cfgs_to_run.remove(cfg)
149         if cfg.is_ap() == True:
150             self.nof_aps -= 1
151         else:
152             self.nof_stas -= 1
153         config_file = self.get_cfg_pathname(cfg)
154         if os.path.exists(config_file):
155             os.remove(config_file)
156 
157     def run_hostapd(self):
158         """Lauches hostapd with interfaces configured according to
159         FstLauncherConfigAP configurations added"""
160         if self.nof_aps == 0:
161             raise Exception("No FST APs to start")
162         pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
163         mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
164         prg = os.path.join(self.fst_logpath,
165                            'alt-hostapd/hostapd/hostapd')
166         if not os.path.exists(prg):
167             prg = '../../hostapd/hostapd'
168         cmd = [prg, '-B', '-dddt',
169                '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
170         for i in range(0, len(self.cfgs_to_run)):
171             cfg = self.cfgs_to_run[i]
172             if cfg.is_ap() == True:
173                 cfgfile = self.get_cfg_pathname(cfg)
174                 cfg.to_file(cfgfile)
175                 cmd.append(cfgfile)
176                 self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
177         self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
178         res = subprocess.call(cmd)
179         self.logger.debug("fst hostapd start result: %d" % res)
180         if res == 0:
181             self.reg_ctrl.start()
182         return res
183 
184     def run_wpa_supplicant(self):
185         """Lauches wpa_supplicant with interfaces configured according to
186         FstLauncherConfigSTA configurations added"""
187         if self.nof_stas == 0:
188             raise Exception("No FST STAs to start")
189         pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
190         mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
191         prg = os.path.join(self.fst_logpath,
192                            'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant')
193         if not os.path.exists(prg):
194             prg = '../../wpa_supplicant/wpa_supplicant'
195         cmd = [prg, '-B', '-dddt',
196                '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global]
197         sta_no = 0
198         for i in range(0, len(self.cfgs_to_run)):
199             cfg = self.cfgs_to_run[i]
200             if cfg.is_ap() == False:
201                 cfgfile = self.get_cfg_pathname(cfg)
202                 cfg.to_file(cfgfile)
203                 cmd.append('-c' + cfgfile)
204                 cmd.append('-i' + cfg.ifname())
205                 cmd.append('-Dnl80211')
206                 if sta_no != self.nof_stas -1:
207                     cmd.append('-N')    # Next station configuration
208                 sta_no += 1
209         self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
210         res = subprocess.call(cmd)
211         self.logger.debug("fst supplicant start result: %d" % res)
212         return res
213 
214     def cleanup(self):
215         """Terminates hostapd/wpa_supplicant processes previously launched with
216         run_hostapd/run_wpa_supplicant"""
217         pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
218         self.kill_pid(pidfile, self.nof_aps > 0)
219         pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
220         self.kill_pid(pidfile, self.nof_stas > 0)
221         self.reg_ctrl.stop()
222         while len(self.cfgs_to_run) != 0:
223             cfg = self.cfgs_to_run[0]
224             self.remove_cfg(cfg)
225         fst_test_common.fst_clear_regdom()
226 
227     def kill_pid(self, pidfile, try_again=False):
228         """Kills process by PID file"""
229         if not os.path.exists(pidfile):
230             if not try_again:
231                 return
232             # It might take some time for the process to write the PID file,
233             # so wait a bit longer before giving up.
234             self.logger.info("kill_pid: pidfile %s does not exist - try again after a second" % pidfile)
235             time.sleep(1)
236             if not os.path.exists(pidfile):
237                 self.logger.info("kill_pid: pidfile %s does not exist - could not kill the process" % pidfile)
238                 return
239         pid = -1
240         try:
241             for i in range(3):
242                 pf = open(pidfile, 'r')
243                 pidtxt = pf.read().strip()
244                 self.logger.debug("kill_pid: %s: '%s'" % (pidfile, pidtxt))
245                 pf.close()
246                 try:
247                     pid = int(pidtxt)
248                     break
249                 except Exception as e:
250                     self.logger.debug("kill_pid: No valid PID found: %s" % str(e))
251                     time.sleep(1)
252             self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
253             os.kill(pid, signal.SIGTERM)
254             for i in range(10):
255                 try:
256                     # Poll the pid (Is the process still existing?)
257                     os.kill(pid, 0)
258                 except OSError:
259                     # No, already done
260                     break
261                 # Wait and check again
262                 time.sleep(1)
263         except Exception as e:
264             self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
265 
266 
267 def parse_ies(iehex, el=-1):
268     """Parses the information elements hex string 'iehex' in format
269     "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
270     If 'el' is defined returns the list of hex values of the specific IE (or
271     empty list if the element is not in the string."""
272     iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
273     for i in range(0, len(iel)):
274          iel[i] = int(iel[i], 16)
275     # Validity check
276     i = 0
277     res = []
278     while i < len(iel):
279         logger.debug("IE found: %x" % iel[i])
280         if el != -1 and el == iel[i]:
281             res = iel[i + 2:i + 2 + iel[i + 1]]
282         i += 2 + iel[i + 1]
283     if i != len(iel):
284         logger.error("Bad IE string: " + iehex)
285         res = []
286     return res
287 
288 def scan_and_get_bss(dev, frq):
289     """Issues a scan on given device on given frequency, returns the bss info
290     dictionary ('ssid','ie','flags', etc.) or None. Note, the function
291     implies there is only one AP on the given channel. If not a case,
292     the function must be changed to call dev.get_bss() till the AP with the
293     [b]ssid that we need is found"""
294     dev.scan(freq=frq)
295     return dev.get_bss('0')
296 
297 
298 # AP configuration tests
299 
300 def run_test_ap_configuration(apdev, test_params,
301                               fst_group=fst_test_common.fst_test_def_group,
302                               fst_pri=fst_test_common.fst_test_def_prio_high,
303                               fst_llt=fst_test_common.fst_test_def_llt):
304     """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
305     configuration is provided by the parameters. Returns the result of the run:
306     0 - no errors discovered, an error otherwise. The function is used for
307     simplek "bad configuration" tests."""
308     logdir = test_params['logdir']
309     with FstLauncher(logdir) as fst_launcher:
310         ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
311                                   fst_test_common.fst_test_def_chan_a,
312                                   fst_test_common.fst_test_def_group,
313                                   fst_test_common.fst_test_def_prio_low,
314                                   fst_test_common.fst_test_def_llt)
315         ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
316                                   fst_test_common.fst_test_def_chan_g, fst_group,
317                                   fst_pri, fst_llt)
318         fst_launcher.add_cfg(ap1)
319         fst_launcher.add_cfg(ap2)
320         res = fst_launcher.run_hostapd()
321         return res
322 
323 def run_test_sta_configuration(test_params,
324                                fst_group=fst_test_common.fst_test_def_group,
325                                fst_pri=fst_test_common.fst_test_def_prio_high,
326                                fst_llt=fst_test_common.fst_test_def_llt):
327     """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
328     2nd fst configuration is provided by the parameters. Returns the result of
329     the run: 0 - no errors discovered, an error otherwise. The function is used
330     for simple "bad configuration" tests."""
331     logdir = test_params['logdir']
332     with FstLauncher(logdir) as fst_launcher:
333         sta1 = FstLauncherConfigSTA('wlan5',
334                                     fst_test_common.fst_test_def_group,
335                                     fst_test_common.fst_test_def_prio_low,
336                                     fst_test_common.fst_test_def_llt)
337         sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
338         fst_launcher.add_cfg(sta1)
339         fst_launcher.add_cfg(sta2)
340         res = fst_launcher.run_wpa_supplicant()
341         return res
342 
343 def test_fst_ap_config_llt_neg(dev, apdev, test_params):
344     """FST AP configuration negative LLT"""
345     res = run_test_ap_configuration(apdev, test_params, fst_llt='-1')
346     if res == 0:
347         raise Exception("hostapd started with a negative llt")
348 
349 def test_fst_ap_config_llt_zero(dev, apdev, test_params):
350     """FST AP configuration zero LLT"""
351     res = run_test_ap_configuration(apdev, test_params, fst_llt='0')
352     if res == 0:
353         raise Exception("hostapd started with a zero llt")
354 
355 def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
356     """FST AP configuration LLT is too big"""
357     res = run_test_ap_configuration(apdev, test_params,
358                                     fst_llt='4294967296') #0x100000000
359     if res == 0:
360         raise Exception("hostapd started with llt that is too big")
361 
362 def test_fst_ap_config_llt_nan(dev, apdev, test_params):
363     """FST AP configuration LLT is not a number"""
364     res = run_test_ap_configuration(apdev, test_params, fst_llt='nan')
365     if res == 0:
366         raise Exception("hostapd started with llt not a number")
367 
368 def test_fst_ap_config_pri_neg(dev, apdev, test_params):
369     """FST AP configuration Priority negative"""
370     res = run_test_ap_configuration(apdev, test_params, fst_pri='-1')
371     if res == 0:
372         raise Exception("hostapd started with a negative fst priority")
373 
374 def test_fst_ap_config_pri_zero(dev, apdev, test_params):
375     """FST AP configuration Priority zero"""
376     res = run_test_ap_configuration(apdev, test_params, fst_pri='0')
377     if res == 0:
378         raise Exception("hostapd started with a zero fst priority")
379 
380 def test_fst_ap_config_pri_large(dev, apdev, test_params):
381     """FST AP configuration Priority too large"""
382     res = run_test_ap_configuration(apdev, test_params, fst_pri='256')
383     if res == 0:
384         raise Exception("hostapd started with too large fst priority")
385 
386 def test_fst_ap_config_pri_nan(dev, apdev, test_params):
387     """FST AP configuration Priority not a number"""
388     res = run_test_ap_configuration(apdev, test_params, fst_pri='nan')
389     if res == 0:
390         raise Exception("hostapd started with fst priority not a number")
391 
392 def test_fst_ap_config_group_len(dev, apdev, test_params):
393     """FST AP configuration Group max length"""
394     res = run_test_ap_configuration(apdev, test_params,
395                                     fst_group='fstg5678abcd34567')
396     if res == 0:
397         raise Exception("hostapd started with fst_group length too big")
398 
399 def test_fst_ap_config_good(dev, apdev, test_params):
400     """FST AP configuration good parameters"""
401     res = run_test_ap_configuration(apdev, test_params)
402     if res != 0:
403         raise Exception("hostapd didn't start with valid config parameters")
404 
405 def test_fst_ap_config_default(dev, apdev, test_params):
406     """FST AP configuration default parameters"""
407     res = run_test_ap_configuration(apdev, test_params, fst_llt=None)
408     if res != 0:
409         raise Exception("hostapd didn't start with valid config parameters")
410 
411 
412 # STA configuration tests
413 
414 def test_fst_sta_config_llt_neg(dev, apdev, test_params):
415     """FST STA configuration negative LLT"""
416     res = run_test_sta_configuration(test_params, fst_llt='-1')
417     if res == 0:
418         raise Exception("wpa_supplicant started with a negative llt")
419 
420 def test_fst_sta_config_llt_zero(dev, apdev, test_params):
421     """FST STA configuration zero LLT"""
422     res = run_test_sta_configuration(test_params, fst_llt='0')
423     if res == 0:
424         raise Exception("wpa_supplicant started with a zero llt")
425 
426 def test_fst_sta_config_llt_large(dev, apdev, test_params):
427     """FST STA configuration LLT is too large"""
428     res = run_test_sta_configuration(test_params,
429                                      fst_llt='4294967296') #0x100000000
430     if res == 0:
431         raise Exception("wpa_supplicant started with llt that is too large")
432 
433 def test_fst_sta_config_llt_nan(dev, apdev, test_params):
434     """FST STA configuration LLT is not a number"""
435     res = run_test_sta_configuration(test_params, fst_llt='nan')
436     if res == 0:
437         raise Exception("wpa_supplicant started with llt not a number")
438 
439 def test_fst_sta_config_pri_neg(dev, apdev, test_params):
440     """FST STA configuration Priority negative"""
441     res = run_test_sta_configuration(test_params, fst_pri='-1')
442     if res == 0:
443         raise Exception("wpa_supplicant started with a negative fst priority")
444 
445 def test_fst_sta_config_pri_zero(dev, apdev, test_params):
446     """FST STA configuration Priority zero"""
447     res = run_test_sta_configuration(test_params, fst_pri='0')
448     if res == 0:
449         raise Exception("wpa_supplicant started with a zero fst priority")
450 
451 def test_fst_sta_config_pri_big(dev, apdev, test_params):
452     """FST STA configuration Priority too large"""
453     res = run_test_sta_configuration(test_params, fst_pri='256')
454     if res == 0:
455         raise Exception("wpa_supplicant started with too large fst priority")
456 
457 def test_fst_sta_config_pri_nan(dev, apdev, test_params):
458     """FST STA configuration Priority not a number"""
459     res = run_test_sta_configuration(test_params, fst_pri='nan')
460     if res == 0:
461         raise Exception("wpa_supplicant started with fst priority not a number")
462 
463 def test_fst_sta_config_group_len(dev, apdev, test_params):
464     """FST STA configuration Group max length"""
465     res = run_test_sta_configuration(test_params,
466                                      fst_group='fstg5678abcd34567')
467     if res == 0:
468         raise Exception("wpa_supplicant started with fst_group length too big")
469 
470 def test_fst_sta_config_good(dev, apdev, test_params):
471     """FST STA configuration good parameters"""
472     res = run_test_sta_configuration(test_params)
473     if res != 0:
474         raise Exception("wpa_supplicant didn't start with valid config parameters")
475 
476 def test_fst_sta_config_default(dev, apdev, test_params):
477     """FST STA configuration default parameters"""
478     res = run_test_sta_configuration(test_params, fst_llt=None)
479     if res != 0:
480         raise Exception("wpa_supplicant didn't start with valid config parameters")
481 
482 def test_fst_scan_mb(dev, apdev, test_params):
483     """FST scan valid MB IE presence with normal start"""
484     logdir = test_params['logdir']
485 
486     # Test valid MB IE in scan results
487     with FstLauncher(logdir) as fst_launcher:
488         ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
489                                   fst_test_common.fst_test_def_chan_a,
490                                   fst_test_common.fst_test_def_group,
491                                   fst_test_common.fst_test_def_prio_high)
492         ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
493                                   fst_test_common.fst_test_def_chan_g,
494                                   fst_test_common.fst_test_def_group,
495                                   fst_test_common.fst_test_def_prio_low)
496         fst_launcher.add_cfg(ap1)
497         fst_launcher.add_cfg(ap2)
498         res = fst_launcher.run_hostapd()
499         if res != 0:
500             raise Exception("hostapd didn't start properly")
501 
502         mbie1 = []
503         flags1 = ''
504         mbie2 = []
505         flags2 = ''
506         # Scan 1st AP
507         vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
508         if vals1 != None:
509             if 'ie' in vals1:
510                 mbie1 = parse_ies(vals1['ie'], 0x9e)
511             if 'flags' in vals1:
512                 flags1 = vals1['flags']
513         # Scan 2nd AP
514         vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
515         if vals2 != None:
516             if 'ie' in vals2:
517                 mbie2 = parse_ies(vals2['ie'], 0x9e)
518             if 'flags' in vals2:
519                 flags2 = vals2['flags']
520 
521     if len(mbie1) == 0:
522         raise Exception("No MB IE created by 1st AP")
523     if len(mbie2) == 0:
524         raise Exception("No MB IE created by 2nd AP")
525 
526 def test_fst_scan_nomb(dev, apdev, test_params):
527     """FST scan no MB IE presence with 1 AP start"""
528     logdir = test_params['logdir']
529 
530     # Test valid MB IE in scan results
531     with FstLauncher(logdir) as fst_launcher:
532         ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
533                                   fst_test_common.fst_test_def_chan_a,
534                                   fst_test_common.fst_test_def_group,
535                                   fst_test_common.fst_test_def_prio_high)
536         fst_launcher.add_cfg(ap1)
537         res = fst_launcher.run_hostapd()
538         if res != 0:
539             raise Exception("Hostapd didn't start properly")
540 
541         time.sleep(2)
542         mbie1 = []
543         flags1 = ''
544         vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
545         if vals1 != None:
546             if 'ie' in vals1:
547                 mbie1 = parse_ies(vals1['ie'], 0x9e)
548             if 'flags' in vals1:
549                 flags1 = vals1['flags']
550 
551     if len(mbie1) != 0:
552         raise Exception("MB IE exists with 1 AP")
553