1# FST configuration tests
2# Copyright (c) 2015, Qualcomm Atheros, Inc.
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import logging
8logger = logging.getLogger()
9import subprocess
10import time
11import os
12import signal
13import hostapd
14import wpasupplicant
15import utils
16
17import fst_test_common
18
19class FstLauncherConfig:
20    """FstLauncherConfig class represents configuration to be used for
21    FST config tests related hostapd/wpa_supplicant instances"""
22    def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
23        self.iface = iface
24        self.fst_group = fst_group
25        self.fst_pri = fst_pri
26        self.fst_llt = fst_llt # None llt means no llt parameter will be set
27
28    def ifname(self):
29        return self.iface
30
31    def is_ap(self):
32        """Returns True if the configuration is for AP, otherwise - False"""
33        raise Exception("Virtual is_ap() called!")
34
35    def to_file(self, pathname):
36        """Creates configuration file to be used by FST config tests related
37        hostapd/wpa_supplicant instances"""
38        raise Exception("Virtual to_file() called!")
39
40class FstLauncherConfigAP(FstLauncherConfig):
41    """FstLauncherConfigAP class represents configuration to be used for
42    FST config tests related hostapd instance"""
43    def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
44                 fst_llt=None):
45        self.ssid = ssid
46        self.mode = mode
47        self.chan = chan
48        FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
49
50    def is_ap(self):
51        return True
52
53    def get_channel(self):
54        return self.chan
55
56    def to_file(self, pathname):
57        """Creates configuration file to be used by FST config tests related
58        hostapd instance"""
59        with open(pathname, "w") as f:
60            f.write("country_code=US\n"
61                    "interface=%s\n"
62                    "ctrl_interface=/var/run/hostapd\n"
63                    "ssid=%s\n"
64                    "channel=%s\n"
65                    "hw_mode=%s\n"
66                    "ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
67                                        self.mode))
68            if len(self.fst_group) != 0:
69                f.write("fst_group_id=%s\n"
70                        "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
71                if self.fst_llt is not None:
72                    f.write("fst_llt=%s\n" % self.fst_llt)
73        with open(pathname, "r") as f:
74            logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
75                                                                f.read()))
76
77class FstLauncherConfigSTA(FstLauncherConfig):
78    """FstLauncherConfig class represents configuration to be used for
79    FST config tests related wpa_supplicant instance"""
80    def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
81        FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
82
83    def is_ap(self):
84        return False
85
86    def to_file(self, pathname):
87        """Creates configuration file to be used by FST config tests related
88        wpa_supplicant instance"""
89        with open(pathname, "w") as f:
90            f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
91                "p2p_no_group_iface=1\n")
92            if len(self.fst_group) != 0:
93                f.write("fst_group_id=%s\n"
94                    "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
95                if self.fst_llt is not None:
96                    f.write("fst_llt=%s\n" % self.fst_llt)
97        with open(pathname, "r") as f:
98            logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
99
100class FstLauncher:
101    """FstLauncher class is responsible for launching and cleaning up of FST
102    config tests related hostapd/wpa_supplicant instances"""
103    def __init__(self, logpath):
104        self.logger = logging.getLogger()
105        self.fst_logpath = logpath
106        self.cfgs_to_run = []
107        self.hapd_fst_global = '/var/run/hostapd-fst-global'
108        self.wsup_fst_global = '/tmp/fststa'
109        self.nof_aps = 0
110        self.nof_stas = 0
111        self.reg_ctrl = fst_test_common.HapdRegCtrl()
112        self.test_is_supported()
113
114    def __enter__(self):
115        return self
116
117    def __exit__(self, type, value, traceback):
118        self.cleanup()
119
120    @staticmethod
121    def test_is_supported():
122        h = hostapd.HostapdGlobal()
123        resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
124        if not resp.startswith("OK"):
125            raise utils.HwsimSkip("FST not supported")
126        w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
127        resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
128        if not resp.startswith("OK"):
129            raise utils.HwsimSkip("FST not supported")
130
131    def get_cfg_pathname(self, cfg):
132        """Returns pathname of ifname based configuration file"""
133        return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
134
135    def add_cfg(self, cfg):
136        """Adds configuration to be used for launching hostapd/wpa_supplicant
137        instances"""
138        if cfg not in self.cfgs_to_run:
139            self.cfgs_to_run.append(cfg)
140        if cfg.is_ap() == True:
141            self.nof_aps += 1
142        else:
143            self.nof_stas += 1
144
145    def remove_cfg(self, cfg):
146        """Removes configuration previously added with add_cfg"""
147        if cfg in self.cfgs_to_run:
148            self.cfgs_to_run.remove(cfg)
149        if cfg.is_ap() == True:
150            self.nof_aps -= 1
151        else:
152            self.nof_stas -= 1
153        config_file = self.get_cfg_pathname(cfg)
154        if os.path.exists(config_file):
155            os.remove(config_file)
156
157    def run_hostapd(self):
158        """Lauches hostapd with interfaces configured according to
159        FstLauncherConfigAP configurations added"""
160        if self.nof_aps == 0:
161            raise Exception("No FST APs to start")
162        pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
163        mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
164        prg = os.path.join(self.fst_logpath,
165                           'alt-hostapd/hostapd/hostapd')
166        if not os.path.exists(prg):
167            prg = '../../hostapd/hostapd'
168        cmd = [prg, '-B', '-dddt',
169               '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
170        for i in range(0, len(self.cfgs_to_run)):
171            cfg = self.cfgs_to_run[i]
172            if cfg.is_ap() == True:
173                cfgfile = self.get_cfg_pathname(cfg)
174                cfg.to_file(cfgfile)
175                cmd.append(cfgfile)
176                self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
177        self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
178        res = subprocess.call(cmd)
179        self.logger.debug("fst hostapd start result: %d" % res)
180        if res == 0:
181            self.reg_ctrl.start()
182        return res
183
184    def run_wpa_supplicant(self):
185        """Lauches wpa_supplicant with interfaces configured according to
186        FstLauncherConfigSTA configurations added"""
187        if self.nof_stas == 0:
188            raise Exception("No FST STAs to start")
189        pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
190        mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
191        prg = os.path.join(self.fst_logpath,
192                           'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant')
193        if not os.path.exists(prg):
194            prg = '../../wpa_supplicant/wpa_supplicant'
195        cmd = [prg, '-B', '-dddt',
196               '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global]
197        sta_no = 0
198        for i in range(0, len(self.cfgs_to_run)):
199            cfg = self.cfgs_to_run[i]
200            if cfg.is_ap() == False:
201                cfgfile = self.get_cfg_pathname(cfg)
202                cfg.to_file(cfgfile)
203                cmd.append('-c' + cfgfile)
204                cmd.append('-i' + cfg.ifname())
205                cmd.append('-Dnl80211')
206                if sta_no != self.nof_stas -1:
207                    cmd.append('-N')    # Next station configuration
208                sta_no += 1
209        self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
210        res = subprocess.call(cmd)
211        self.logger.debug("fst supplicant start result: %d" % res)
212        return res
213
214    def cleanup(self):
215        """Terminates hostapd/wpa_supplicant processes previously launched with
216        run_hostapd/run_wpa_supplicant"""
217        pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
218        self.kill_pid(pidfile, self.nof_aps > 0)
219        pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
220        self.kill_pid(pidfile, self.nof_stas > 0)
221        self.reg_ctrl.stop()
222        while len(self.cfgs_to_run) != 0:
223            cfg = self.cfgs_to_run[0]
224            self.remove_cfg(cfg)
225        fst_test_common.fst_clear_regdom()
226
227    def kill_pid(self, pidfile, try_again=False):
228        """Kills process by PID file"""
229        if not os.path.exists(pidfile):
230            if not try_again:
231                return
232            # It might take some time for the process to write the PID file,
233            # so wait a bit longer before giving up.
234            self.logger.info("kill_pid: pidfile %s does not exist - try again after a second" % pidfile)
235            time.sleep(1)
236            if not os.path.exists(pidfile):
237                self.logger.info("kill_pid: pidfile %s does not exist - could not kill the process" % pidfile)
238                return
239        pid = -1
240        try:
241            for i in range(3):
242                pf = open(pidfile, 'r')
243                pidtxt = pf.read().strip()
244                self.logger.debug("kill_pid: %s: '%s'" % (pidfile, pidtxt))
245                pf.close()
246                try:
247                    pid = int(pidtxt)
248                    break
249                except Exception as e:
250                    self.logger.debug("kill_pid: No valid PID found: %s" % str(e))
251                    time.sleep(1)
252            self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
253            os.kill(pid, signal.SIGTERM)
254            for i in range(10):
255                try:
256                    # Poll the pid (Is the process still existing?)
257                    os.kill(pid, 0)
258                except OSError:
259                    # No, already done
260                    break
261                # Wait and check again
262                time.sleep(1)
263        except Exception as e:
264            self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
265
266
267def parse_ies(iehex, el=-1):
268    """Parses the information elements hex string 'iehex' in format
269    "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
270    If 'el' is defined returns the list of hex values of the specific IE (or
271    empty list if the element is not in the string."""
272    iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
273    for i in range(0, len(iel)):
274         iel[i] = int(iel[i], 16)
275    # Validity check
276    i = 0
277    res = []
278    while i < len(iel):
279        logger.debug("IE found: %x" % iel[i])
280        if el != -1 and el == iel[i]:
281            res = iel[i + 2:i + 2 + iel[i + 1]]
282        i += 2 + iel[i + 1]
283    if i != len(iel):
284        logger.error("Bad IE string: " + iehex)
285        res = []
286    return res
287
288def scan_and_get_bss(dev, frq):
289    """Issues a scan on given device on given frequency, returns the bss info
290    dictionary ('ssid','ie','flags', etc.) or None. Note, the function
291    implies there is only one AP on the given channel. If not a case,
292    the function must be changed to call dev.get_bss() till the AP with the
293    [b]ssid that we need is found"""
294    dev.scan(freq=frq)
295    return dev.get_bss('0')
296
297
298# AP configuration tests
299
300def run_test_ap_configuration(apdev, test_params,
301                              fst_group=fst_test_common.fst_test_def_group,
302                              fst_pri=fst_test_common.fst_test_def_prio_high,
303                              fst_llt=fst_test_common.fst_test_def_llt):
304    """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
305    configuration is provided by the parameters. Returns the result of the run:
306    0 - no errors discovered, an error otherwise. The function is used for
307    simplek "bad configuration" tests."""
308    logdir = test_params['logdir']
309    with FstLauncher(logdir) as fst_launcher:
310        ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
311                                  fst_test_common.fst_test_def_chan_a,
312                                  fst_test_common.fst_test_def_group,
313                                  fst_test_common.fst_test_def_prio_low,
314                                  fst_test_common.fst_test_def_llt)
315        ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
316                                  fst_test_common.fst_test_def_chan_g, fst_group,
317                                  fst_pri, fst_llt)
318        fst_launcher.add_cfg(ap1)
319        fst_launcher.add_cfg(ap2)
320        res = fst_launcher.run_hostapd()
321        return res
322
323def run_test_sta_configuration(test_params,
324                               fst_group=fst_test_common.fst_test_def_group,
325                               fst_pri=fst_test_common.fst_test_def_prio_high,
326                               fst_llt=fst_test_common.fst_test_def_llt):
327    """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
328    2nd fst configuration is provided by the parameters. Returns the result of
329    the run: 0 - no errors discovered, an error otherwise. The function is used
330    for simple "bad configuration" tests."""
331    logdir = test_params['logdir']
332    with FstLauncher(logdir) as fst_launcher:
333        sta1 = FstLauncherConfigSTA('wlan5',
334                                    fst_test_common.fst_test_def_group,
335                                    fst_test_common.fst_test_def_prio_low,
336                                    fst_test_common.fst_test_def_llt)
337        sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
338        fst_launcher.add_cfg(sta1)
339        fst_launcher.add_cfg(sta2)
340        res = fst_launcher.run_wpa_supplicant()
341        return res
342
343def test_fst_ap_config_llt_neg(dev, apdev, test_params):
344    """FST AP configuration negative LLT"""
345    res = run_test_ap_configuration(apdev, test_params, fst_llt='-1')
346    if res == 0:
347        raise Exception("hostapd started with a negative llt")
348
349def test_fst_ap_config_llt_zero(dev, apdev, test_params):
350    """FST AP configuration zero LLT"""
351    res = run_test_ap_configuration(apdev, test_params, fst_llt='0')
352    if res == 0:
353        raise Exception("hostapd started with a zero llt")
354
355def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
356    """FST AP configuration LLT is too big"""
357    res = run_test_ap_configuration(apdev, test_params,
358                                    fst_llt='4294967296') #0x100000000
359    if res == 0:
360        raise Exception("hostapd started with llt that is too big")
361
362def test_fst_ap_config_llt_nan(dev, apdev, test_params):
363    """FST AP configuration LLT is not a number"""
364    res = run_test_ap_configuration(apdev, test_params, fst_llt='nan')
365    if res == 0:
366        raise Exception("hostapd started with llt not a number")
367
368def test_fst_ap_config_pri_neg(dev, apdev, test_params):
369    """FST AP configuration Priority negative"""
370    res = run_test_ap_configuration(apdev, test_params, fst_pri='-1')
371    if res == 0:
372        raise Exception("hostapd started with a negative fst priority")
373
374def test_fst_ap_config_pri_zero(dev, apdev, test_params):
375    """FST AP configuration Priority zero"""
376    res = run_test_ap_configuration(apdev, test_params, fst_pri='0')
377    if res == 0:
378        raise Exception("hostapd started with a zero fst priority")
379
380def test_fst_ap_config_pri_large(dev, apdev, test_params):
381    """FST AP configuration Priority too large"""
382    res = run_test_ap_configuration(apdev, test_params, fst_pri='256')
383    if res == 0:
384        raise Exception("hostapd started with too large fst priority")
385
386def test_fst_ap_config_pri_nan(dev, apdev, test_params):
387    """FST AP configuration Priority not a number"""
388    res = run_test_ap_configuration(apdev, test_params, fst_pri='nan')
389    if res == 0:
390        raise Exception("hostapd started with fst priority not a number")
391
392def test_fst_ap_config_group_len(dev, apdev, test_params):
393    """FST AP configuration Group max length"""
394    res = run_test_ap_configuration(apdev, test_params,
395                                    fst_group='fstg5678abcd34567')
396    if res == 0:
397        raise Exception("hostapd started with fst_group length too big")
398
399def test_fst_ap_config_good(dev, apdev, test_params):
400    """FST AP configuration good parameters"""
401    res = run_test_ap_configuration(apdev, test_params)
402    if res != 0:
403        raise Exception("hostapd didn't start with valid config parameters")
404
405def test_fst_ap_config_default(dev, apdev, test_params):
406    """FST AP configuration default parameters"""
407    res = run_test_ap_configuration(apdev, test_params, fst_llt=None)
408    if res != 0:
409        raise Exception("hostapd didn't start with valid config parameters")
410
411
412# STA configuration tests
413
414def test_fst_sta_config_llt_neg(dev, apdev, test_params):
415    """FST STA configuration negative LLT"""
416    res = run_test_sta_configuration(test_params, fst_llt='-1')
417    if res == 0:
418        raise Exception("wpa_supplicant started with a negative llt")
419
420def test_fst_sta_config_llt_zero(dev, apdev, test_params):
421    """FST STA configuration zero LLT"""
422    res = run_test_sta_configuration(test_params, fst_llt='0')
423    if res == 0:
424        raise Exception("wpa_supplicant started with a zero llt")
425
426def test_fst_sta_config_llt_large(dev, apdev, test_params):
427    """FST STA configuration LLT is too large"""
428    res = run_test_sta_configuration(test_params,
429                                     fst_llt='4294967296') #0x100000000
430    if res == 0:
431        raise Exception("wpa_supplicant started with llt that is too large")
432
433def test_fst_sta_config_llt_nan(dev, apdev, test_params):
434    """FST STA configuration LLT is not a number"""
435    res = run_test_sta_configuration(test_params, fst_llt='nan')
436    if res == 0:
437        raise Exception("wpa_supplicant started with llt not a number")
438
439def test_fst_sta_config_pri_neg(dev, apdev, test_params):
440    """FST STA configuration Priority negative"""
441    res = run_test_sta_configuration(test_params, fst_pri='-1')
442    if res == 0:
443        raise Exception("wpa_supplicant started with a negative fst priority")
444
445def test_fst_sta_config_pri_zero(dev, apdev, test_params):
446    """FST STA configuration Priority zero"""
447    res = run_test_sta_configuration(test_params, fst_pri='0')
448    if res == 0:
449        raise Exception("wpa_supplicant started with a zero fst priority")
450
451def test_fst_sta_config_pri_big(dev, apdev, test_params):
452    """FST STA configuration Priority too large"""
453    res = run_test_sta_configuration(test_params, fst_pri='256')
454    if res == 0:
455        raise Exception("wpa_supplicant started with too large fst priority")
456
457def test_fst_sta_config_pri_nan(dev, apdev, test_params):
458    """FST STA configuration Priority not a number"""
459    res = run_test_sta_configuration(test_params, fst_pri='nan')
460    if res == 0:
461        raise Exception("wpa_supplicant started with fst priority not a number")
462
463def test_fst_sta_config_group_len(dev, apdev, test_params):
464    """FST STA configuration Group max length"""
465    res = run_test_sta_configuration(test_params,
466                                     fst_group='fstg5678abcd34567')
467    if res == 0:
468        raise Exception("wpa_supplicant started with fst_group length too big")
469
470def test_fst_sta_config_good(dev, apdev, test_params):
471    """FST STA configuration good parameters"""
472    res = run_test_sta_configuration(test_params)
473    if res != 0:
474        raise Exception("wpa_supplicant didn't start with valid config parameters")
475
476def test_fst_sta_config_default(dev, apdev, test_params):
477    """FST STA configuration default parameters"""
478    res = run_test_sta_configuration(test_params, fst_llt=None)
479    if res != 0:
480        raise Exception("wpa_supplicant didn't start with valid config parameters")
481
482def test_fst_scan_mb(dev, apdev, test_params):
483    """FST scan valid MB IE presence with normal start"""
484    logdir = test_params['logdir']
485
486    # Test valid MB IE in scan results
487    with FstLauncher(logdir) as fst_launcher:
488        ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
489                                  fst_test_common.fst_test_def_chan_a,
490                                  fst_test_common.fst_test_def_group,
491                                  fst_test_common.fst_test_def_prio_high)
492        ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
493                                  fst_test_common.fst_test_def_chan_g,
494                                  fst_test_common.fst_test_def_group,
495                                  fst_test_common.fst_test_def_prio_low)
496        fst_launcher.add_cfg(ap1)
497        fst_launcher.add_cfg(ap2)
498        res = fst_launcher.run_hostapd()
499        if res != 0:
500            raise Exception("hostapd didn't start properly")
501
502        mbie1 = []
503        flags1 = ''
504        mbie2 = []
505        flags2 = ''
506        # Scan 1st AP
507        vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
508        if vals1 != None:
509            if 'ie' in vals1:
510                mbie1 = parse_ies(vals1['ie'], 0x9e)
511            if 'flags' in vals1:
512                flags1 = vals1['flags']
513        # Scan 2nd AP
514        vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
515        if vals2 != None:
516            if 'ie' in vals2:
517                mbie2 = parse_ies(vals2['ie'], 0x9e)
518            if 'flags' in vals2:
519                flags2 = vals2['flags']
520
521    if len(mbie1) == 0:
522        raise Exception("No MB IE created by 1st AP")
523    if len(mbie2) == 0:
524        raise Exception("No MB IE created by 2nd AP")
525
526def test_fst_scan_nomb(dev, apdev, test_params):
527    """FST scan no MB IE presence with 1 AP start"""
528    logdir = test_params['logdir']
529
530    # Test valid MB IE in scan results
531    with FstLauncher(logdir) as fst_launcher:
532        ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
533                                  fst_test_common.fst_test_def_chan_a,
534                                  fst_test_common.fst_test_def_group,
535                                  fst_test_common.fst_test_def_prio_high)
536        fst_launcher.add_cfg(ap1)
537        res = fst_launcher.run_hostapd()
538        if res != 0:
539            raise Exception("Hostapd didn't start properly")
540
541        time.sleep(2)
542        mbie1 = []
543        flags1 = ''
544        vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
545        if vals1 != None:
546            if 'ie' in vals1:
547                mbie1 = parse_ies(vals1['ie'], 0x9e)
548            if 'flags' in vals1:
549                flags1 = vals1['flags']
550
551    if len(mbie1) != 0:
552        raise Exception("MB IE exists with 1 AP")
553