1# FST configuration tests 2# Copyright (c) 2015, Qualcomm Atheros, Inc. 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import logging 8logger = logging.getLogger() 9import subprocess 10import time 11import os 12import signal 13import hostapd 14import wpasupplicant 15import utils 16 17import fst_test_common 18 19class FstLauncherConfig: 20 """FstLauncherConfig class represents configuration to be used for 21 FST config tests related hostapd/wpa_supplicant instances""" 22 def __init__(self, iface, fst_group, fst_pri, fst_llt=None): 23 self.iface = iface 24 self.fst_group = fst_group 25 self.fst_pri = fst_pri 26 self.fst_llt = fst_llt # None llt means no llt parameter will be set 27 28 def ifname(self): 29 return self.iface 30 31 def is_ap(self): 32 """Returns True if the configuration is for AP, otherwise - False""" 33 raise Exception("Virtual is_ap() called!") 34 35 def to_file(self, pathname): 36 """Creates configuration file to be used by FST config tests related 37 hostapd/wpa_supplicant instances""" 38 raise Exception("Virtual to_file() called!") 39 40class FstLauncherConfigAP(FstLauncherConfig): 41 """FstLauncherConfigAP class represents configuration to be used for 42 FST config tests related hostapd instance""" 43 def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri, 44 fst_llt=None): 45 self.ssid = ssid 46 self.mode = mode 47 self.chan = chan 48 FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt) 49 50 def is_ap(self): 51 return True 52 53 def get_channel(self): 54 return self.chan 55 56 def to_file(self, pathname): 57 """Creates configuration file to be used by FST config tests related 58 hostapd instance""" 59 with open(pathname, "w") as f: 60 f.write("country_code=US\n" 61 "interface=%s\n" 62 "ctrl_interface=/var/run/hostapd\n" 63 "ssid=%s\n" 64 "channel=%s\n" 65 "hw_mode=%s\n" 66 "ieee80211n=1\n" % (self.iface, self.ssid, self.chan, 67 self.mode)) 68 if len(self.fst_group) != 0: 69 f.write("fst_group_id=%s\n" 70 "fst_priority=%s\n" % (self.fst_group, self.fst_pri)) 71 if self.fst_llt is not None: 72 f.write("fst_llt=%s\n" % self.fst_llt) 73 with open(pathname, "r") as f: 74 logger.debug("wrote hostapd config file %s:\n%s" % (pathname, 75 f.read())) 76 77class FstLauncherConfigSTA(FstLauncherConfig): 78 """FstLauncherConfig class represents configuration to be used for 79 FST config tests related wpa_supplicant instance""" 80 def __init__(self, iface, fst_group, fst_pri, fst_llt=None): 81 FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt) 82 83 def is_ap(self): 84 return False 85 86 def to_file(self, pathname): 87 """Creates configuration file to be used by FST config tests related 88 wpa_supplicant instance""" 89 with open(pathname, "w") as f: 90 f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n" 91 "p2p_no_group_iface=1\n") 92 if len(self.fst_group) != 0: 93 f.write("fst_group_id=%s\n" 94 "fst_priority=%s\n" % (self.fst_group, self.fst_pri)) 95 if self.fst_llt is not None: 96 f.write("fst_llt=%s\n" % self.fst_llt) 97 with open(pathname, "r") as f: 98 logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read())) 99 100class FstLauncher: 101 """FstLauncher class is responsible for launching and cleaning up of FST 102 config tests related hostapd/wpa_supplicant instances""" 103 def __init__(self, logpath): 104 self.logger = logging.getLogger() 105 self.fst_logpath = logpath 106 self.cfgs_to_run = [] 107 self.hapd_fst_global = '/var/run/hostapd-fst-global' 108 self.wsup_fst_global = '/tmp/fststa' 109 self.nof_aps = 0 110 self.nof_stas = 0 111 self.reg_ctrl = fst_test_common.HapdRegCtrl() 112 self.test_is_supported() 113 114 def __enter__(self): 115 return self 116 117 def __exit__(self, type, value, traceback): 118 self.cleanup() 119 120 @staticmethod 121 def test_is_supported(): 122 h = hostapd.HostapdGlobal() 123 resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED") 124 if not resp.startswith("OK"): 125 raise utils.HwsimSkip("FST not supported") 126 w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5') 127 resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED") 128 if not resp.startswith("OK"): 129 raise utils.HwsimSkip("FST not supported") 130 131 def get_cfg_pathname(self, cfg): 132 """Returns pathname of ifname based configuration file""" 133 return self.fst_logpath +'/'+ cfg.ifname() + '.conf' 134 135 def add_cfg(self, cfg): 136 """Adds configuration to be used for launching hostapd/wpa_supplicant 137 instances""" 138 if cfg not in self.cfgs_to_run: 139 self.cfgs_to_run.append(cfg) 140 if cfg.is_ap() == True: 141 self.nof_aps += 1 142 else: 143 self.nof_stas += 1 144 145 def remove_cfg(self, cfg): 146 """Removes configuration previously added with add_cfg""" 147 if cfg in self.cfgs_to_run: 148 self.cfgs_to_run.remove(cfg) 149 if cfg.is_ap() == True: 150 self.nof_aps -= 1 151 else: 152 self.nof_stas -= 1 153 config_file = self.get_cfg_pathname(cfg) 154 if os.path.exists(config_file): 155 os.remove(config_file) 156 157 def run_hostapd(self): 158 """Lauches hostapd with interfaces configured according to 159 FstLauncherConfigAP configurations added""" 160 if self.nof_aps == 0: 161 raise Exception("No FST APs to start") 162 pidfile = self.fst_logpath + '/' + 'myhostapd.pid' 163 mylogfile = self.fst_logpath + '/' + 'fst-hostapd' 164 prg = os.path.join(self.fst_logpath, 165 'alt-hostapd/hostapd/hostapd') 166 if not os.path.exists(prg): 167 prg = '../../hostapd/hostapd' 168 cmd = [prg, '-B', '-dddt', 169 '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global] 170 for i in range(0, len(self.cfgs_to_run)): 171 cfg = self.cfgs_to_run[i] 172 if cfg.is_ap() == True: 173 cfgfile = self.get_cfg_pathname(cfg) 174 cfg.to_file(cfgfile) 175 cmd.append(cfgfile) 176 self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel()) 177 self.logger.debug("Starting fst hostapd: " + ' '.join(cmd)) 178 res = subprocess.call(cmd) 179 self.logger.debug("fst hostapd start result: %d" % res) 180 if res == 0: 181 self.reg_ctrl.start() 182 return res 183 184 def run_wpa_supplicant(self): 185 """Lauches wpa_supplicant with interfaces configured according to 186 FstLauncherConfigSTA configurations added""" 187 if self.nof_stas == 0: 188 raise Exception("No FST STAs to start") 189 pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid' 190 mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant' 191 prg = os.path.join(self.fst_logpath, 192 'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant') 193 if not os.path.exists(prg): 194 prg = '../../wpa_supplicant/wpa_supplicant' 195 cmd = [prg, '-B', '-dddt', 196 '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global] 197 sta_no = 0 198 for i in range(0, len(self.cfgs_to_run)): 199 cfg = self.cfgs_to_run[i] 200 if cfg.is_ap() == False: 201 cfgfile = self.get_cfg_pathname(cfg) 202 cfg.to_file(cfgfile) 203 cmd.append('-c' + cfgfile) 204 cmd.append('-i' + cfg.ifname()) 205 cmd.append('-Dnl80211') 206 if sta_no != self.nof_stas -1: 207 cmd.append('-N') # Next station configuration 208 sta_no += 1 209 self.logger.debug("Starting fst supplicant: " + ' '.join(cmd)) 210 res = subprocess.call(cmd) 211 self.logger.debug("fst supplicant start result: %d" % res) 212 return res 213 214 def cleanup(self): 215 """Terminates hostapd/wpa_supplicant processes previously launched with 216 run_hostapd/run_wpa_supplicant""" 217 pidfile = self.fst_logpath + '/' + 'myhostapd.pid' 218 self.kill_pid(pidfile, self.nof_aps > 0) 219 pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid' 220 self.kill_pid(pidfile, self.nof_stas > 0) 221 self.reg_ctrl.stop() 222 while len(self.cfgs_to_run) != 0: 223 cfg = self.cfgs_to_run[0] 224 self.remove_cfg(cfg) 225 fst_test_common.fst_clear_regdom() 226 227 def kill_pid(self, pidfile, try_again=False): 228 """Kills process by PID file""" 229 if not os.path.exists(pidfile): 230 if not try_again: 231 return 232 # It might take some time for the process to write the PID file, 233 # so wait a bit longer before giving up. 234 self.logger.info("kill_pid: pidfile %s does not exist - try again after a second" % pidfile) 235 time.sleep(1) 236 if not os.path.exists(pidfile): 237 self.logger.info("kill_pid: pidfile %s does not exist - could not kill the process" % pidfile) 238 return 239 pid = -1 240 try: 241 for i in range(3): 242 pf = open(pidfile, 'r') 243 pidtxt = pf.read().strip() 244 self.logger.debug("kill_pid: %s: '%s'" % (pidfile, pidtxt)) 245 pf.close() 246 try: 247 pid = int(pidtxt) 248 break 249 except Exception as e: 250 self.logger.debug("kill_pid: No valid PID found: %s" % str(e)) 251 time.sleep(1) 252 self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid)) 253 os.kill(pid, signal.SIGTERM) 254 for i in range(10): 255 try: 256 # Poll the pid (Is the process still existing?) 257 os.kill(pid, 0) 258 except OSError: 259 # No, already done 260 break 261 # Wait and check again 262 time.sleep(1) 263 except Exception as e: 264 self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e))) 265 266 267def parse_ies(iehex, el=-1): 268 """Parses the information elements hex string 'iehex' in format 269 "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity. 270 If 'el' is defined returns the list of hex values of the specific IE (or 271 empty list if the element is not in the string.""" 272 iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)] 273 for i in range(0, len(iel)): 274 iel[i] = int(iel[i], 16) 275 # Validity check 276 i = 0 277 res = [] 278 while i < len(iel): 279 logger.debug("IE found: %x" % iel[i]) 280 if el != -1 and el == iel[i]: 281 res = iel[i + 2:i + 2 + iel[i + 1]] 282 i += 2 + iel[i + 1] 283 if i != len(iel): 284 logger.error("Bad IE string: " + iehex) 285 res = [] 286 return res 287 288def scan_and_get_bss(dev, frq): 289 """Issues a scan on given device on given frequency, returns the bss info 290 dictionary ('ssid','ie','flags', etc.) or None. Note, the function 291 implies there is only one AP on the given channel. If not a case, 292 the function must be changed to call dev.get_bss() till the AP with the 293 [b]ssid that we need is found""" 294 dev.scan(freq=frq) 295 return dev.get_bss('0') 296 297 298# AP configuration tests 299 300def run_test_ap_configuration(apdev, test_params, 301 fst_group=fst_test_common.fst_test_def_group, 302 fst_pri=fst_test_common.fst_test_def_prio_high, 303 fst_llt=fst_test_common.fst_test_def_llt): 304 """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst 305 configuration is provided by the parameters. Returns the result of the run: 306 0 - no errors discovered, an error otherwise. The function is used for 307 simplek "bad configuration" tests.""" 308 logdir = test_params['logdir'] 309 with FstLauncher(logdir) as fst_launcher: 310 ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a', 311 fst_test_common.fst_test_def_chan_a, 312 fst_test_common.fst_test_def_group, 313 fst_test_common.fst_test_def_prio_low, 314 fst_test_common.fst_test_def_llt) 315 ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b', 316 fst_test_common.fst_test_def_chan_g, fst_group, 317 fst_pri, fst_llt) 318 fst_launcher.add_cfg(ap1) 319 fst_launcher.add_cfg(ap2) 320 res = fst_launcher.run_hostapd() 321 return res 322 323def run_test_sta_configuration(test_params, 324 fst_group=fst_test_common.fst_test_def_group, 325 fst_pri=fst_test_common.fst_test_def_prio_high, 326 fst_llt=fst_test_common.fst_test_def_llt): 327 """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the 328 2nd fst configuration is provided by the parameters. Returns the result of 329 the run: 0 - no errors discovered, an error otherwise. The function is used 330 for simple "bad configuration" tests.""" 331 logdir = test_params['logdir'] 332 with FstLauncher(logdir) as fst_launcher: 333 sta1 = FstLauncherConfigSTA('wlan5', 334 fst_test_common.fst_test_def_group, 335 fst_test_common.fst_test_def_prio_low, 336 fst_test_common.fst_test_def_llt) 337 sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt) 338 fst_launcher.add_cfg(sta1) 339 fst_launcher.add_cfg(sta2) 340 res = fst_launcher.run_wpa_supplicant() 341 return res 342 343def test_fst_ap_config_llt_neg(dev, apdev, test_params): 344 """FST AP configuration negative LLT""" 345 res = run_test_ap_configuration(apdev, test_params, fst_llt='-1') 346 if res == 0: 347 raise Exception("hostapd started with a negative llt") 348 349def test_fst_ap_config_llt_zero(dev, apdev, test_params): 350 """FST AP configuration zero LLT""" 351 res = run_test_ap_configuration(apdev, test_params, fst_llt='0') 352 if res == 0: 353 raise Exception("hostapd started with a zero llt") 354 355def test_fst_ap_config_llt_too_big(dev, apdev, test_params): 356 """FST AP configuration LLT is too big""" 357 res = run_test_ap_configuration(apdev, test_params, 358 fst_llt='4294967296') #0x100000000 359 if res == 0: 360 raise Exception("hostapd started with llt that is too big") 361 362def test_fst_ap_config_llt_nan(dev, apdev, test_params): 363 """FST AP configuration LLT is not a number""" 364 res = run_test_ap_configuration(apdev, test_params, fst_llt='nan') 365 if res == 0: 366 raise Exception("hostapd started with llt not a number") 367 368def test_fst_ap_config_pri_neg(dev, apdev, test_params): 369 """FST AP configuration Priority negative""" 370 res = run_test_ap_configuration(apdev, test_params, fst_pri='-1') 371 if res == 0: 372 raise Exception("hostapd started with a negative fst priority") 373 374def test_fst_ap_config_pri_zero(dev, apdev, test_params): 375 """FST AP configuration Priority zero""" 376 res = run_test_ap_configuration(apdev, test_params, fst_pri='0') 377 if res == 0: 378 raise Exception("hostapd started with a zero fst priority") 379 380def test_fst_ap_config_pri_large(dev, apdev, test_params): 381 """FST AP configuration Priority too large""" 382 res = run_test_ap_configuration(apdev, test_params, fst_pri='256') 383 if res == 0: 384 raise Exception("hostapd started with too large fst priority") 385 386def test_fst_ap_config_pri_nan(dev, apdev, test_params): 387 """FST AP configuration Priority not a number""" 388 res = run_test_ap_configuration(apdev, test_params, fst_pri='nan') 389 if res == 0: 390 raise Exception("hostapd started with fst priority not a number") 391 392def test_fst_ap_config_group_len(dev, apdev, test_params): 393 """FST AP configuration Group max length""" 394 res = run_test_ap_configuration(apdev, test_params, 395 fst_group='fstg5678abcd34567') 396 if res == 0: 397 raise Exception("hostapd started with fst_group length too big") 398 399def test_fst_ap_config_good(dev, apdev, test_params): 400 """FST AP configuration good parameters""" 401 res = run_test_ap_configuration(apdev, test_params) 402 if res != 0: 403 raise Exception("hostapd didn't start with valid config parameters") 404 405def test_fst_ap_config_default(dev, apdev, test_params): 406 """FST AP configuration default parameters""" 407 res = run_test_ap_configuration(apdev, test_params, fst_llt=None) 408 if res != 0: 409 raise Exception("hostapd didn't start with valid config parameters") 410 411 412# STA configuration tests 413 414def test_fst_sta_config_llt_neg(dev, apdev, test_params): 415 """FST STA configuration negative LLT""" 416 res = run_test_sta_configuration(test_params, fst_llt='-1') 417 if res == 0: 418 raise Exception("wpa_supplicant started with a negative llt") 419 420def test_fst_sta_config_llt_zero(dev, apdev, test_params): 421 """FST STA configuration zero LLT""" 422 res = run_test_sta_configuration(test_params, fst_llt='0') 423 if res == 0: 424 raise Exception("wpa_supplicant started with a zero llt") 425 426def test_fst_sta_config_llt_large(dev, apdev, test_params): 427 """FST STA configuration LLT is too large""" 428 res = run_test_sta_configuration(test_params, 429 fst_llt='4294967296') #0x100000000 430 if res == 0: 431 raise Exception("wpa_supplicant started with llt that is too large") 432 433def test_fst_sta_config_llt_nan(dev, apdev, test_params): 434 """FST STA configuration LLT is not a number""" 435 res = run_test_sta_configuration(test_params, fst_llt='nan') 436 if res == 0: 437 raise Exception("wpa_supplicant started with llt not a number") 438 439def test_fst_sta_config_pri_neg(dev, apdev, test_params): 440 """FST STA configuration Priority negative""" 441 res = run_test_sta_configuration(test_params, fst_pri='-1') 442 if res == 0: 443 raise Exception("wpa_supplicant started with a negative fst priority") 444 445def test_fst_sta_config_pri_zero(dev, apdev, test_params): 446 """FST STA configuration Priority zero""" 447 res = run_test_sta_configuration(test_params, fst_pri='0') 448 if res == 0: 449 raise Exception("wpa_supplicant started with a zero fst priority") 450 451def test_fst_sta_config_pri_big(dev, apdev, test_params): 452 """FST STA configuration Priority too large""" 453 res = run_test_sta_configuration(test_params, fst_pri='256') 454 if res == 0: 455 raise Exception("wpa_supplicant started with too large fst priority") 456 457def test_fst_sta_config_pri_nan(dev, apdev, test_params): 458 """FST STA configuration Priority not a number""" 459 res = run_test_sta_configuration(test_params, fst_pri='nan') 460 if res == 0: 461 raise Exception("wpa_supplicant started with fst priority not a number") 462 463def test_fst_sta_config_group_len(dev, apdev, test_params): 464 """FST STA configuration Group max length""" 465 res = run_test_sta_configuration(test_params, 466 fst_group='fstg5678abcd34567') 467 if res == 0: 468 raise Exception("wpa_supplicant started with fst_group length too big") 469 470def test_fst_sta_config_good(dev, apdev, test_params): 471 """FST STA configuration good parameters""" 472 res = run_test_sta_configuration(test_params) 473 if res != 0: 474 raise Exception("wpa_supplicant didn't start with valid config parameters") 475 476def test_fst_sta_config_default(dev, apdev, test_params): 477 """FST STA configuration default parameters""" 478 res = run_test_sta_configuration(test_params, fst_llt=None) 479 if res != 0: 480 raise Exception("wpa_supplicant didn't start with valid config parameters") 481 482def test_fst_scan_mb(dev, apdev, test_params): 483 """FST scan valid MB IE presence with normal start""" 484 logdir = test_params['logdir'] 485 486 # Test valid MB IE in scan results 487 with FstLauncher(logdir) as fst_launcher: 488 ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a', 489 fst_test_common.fst_test_def_chan_a, 490 fst_test_common.fst_test_def_group, 491 fst_test_common.fst_test_def_prio_high) 492 ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b', 493 fst_test_common.fst_test_def_chan_g, 494 fst_test_common.fst_test_def_group, 495 fst_test_common.fst_test_def_prio_low) 496 fst_launcher.add_cfg(ap1) 497 fst_launcher.add_cfg(ap2) 498 res = fst_launcher.run_hostapd() 499 if res != 0: 500 raise Exception("hostapd didn't start properly") 501 502 mbie1 = [] 503 flags1 = '' 504 mbie2 = [] 505 flags2 = '' 506 # Scan 1st AP 507 vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a) 508 if vals1 != None: 509 if 'ie' in vals1: 510 mbie1 = parse_ies(vals1['ie'], 0x9e) 511 if 'flags' in vals1: 512 flags1 = vals1['flags'] 513 # Scan 2nd AP 514 vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g) 515 if vals2 != None: 516 if 'ie' in vals2: 517 mbie2 = parse_ies(vals2['ie'], 0x9e) 518 if 'flags' in vals2: 519 flags2 = vals2['flags'] 520 521 if len(mbie1) == 0: 522 raise Exception("No MB IE created by 1st AP") 523 if len(mbie2) == 0: 524 raise Exception("No MB IE created by 2nd AP") 525 526def test_fst_scan_nomb(dev, apdev, test_params): 527 """FST scan no MB IE presence with 1 AP start""" 528 logdir = test_params['logdir'] 529 530 # Test valid MB IE in scan results 531 with FstLauncher(logdir) as fst_launcher: 532 ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a', 533 fst_test_common.fst_test_def_chan_a, 534 fst_test_common.fst_test_def_group, 535 fst_test_common.fst_test_def_prio_high) 536 fst_launcher.add_cfg(ap1) 537 res = fst_launcher.run_hostapd() 538 if res != 0: 539 raise Exception("Hostapd didn't start properly") 540 541 time.sleep(2) 542 mbie1 = [] 543 flags1 = '' 544 vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a) 545 if vals1 != None: 546 if 'ie' in vals1: 547 mbie1 = parse_ies(vals1['ie'], 0x9e) 548 if 'flags' in vals1: 549 flags1 = vals1['flags'] 550 551 if len(mbie1) != 0: 552 raise Exception("MB IE exists with 1 AP") 553