1#!/usr/bin/python 2# 3# wpa_supplicant/hostapd control interface using Python 4# Copyright (c) 2024, Jouni Malinen <j@w1.fi> 5# 6# This software may be distributed under the terms of the BSD license. 7# See README for more details. 8 9from wpaspy import Ctrl 10import remotehost 11 12class RemoteCtrl(Ctrl): 13 def __init__(self, path, port=9877, hostname=None, ifname=None): 14 self.started = False 15 self.attached = False 16 self.path = path 17 self.port = port 18 self.ifname = ifname 19 self.hostname = hostname 20 self.proc = None 21 22 self.host = remotehost.Host(hostname) 23 self.started = True 24 25 def __del__(self): 26 self.close() 27 28 def close(self): 29 if self.attached: 30 try: 31 self.detach() 32 except Exception as e: 33 # Need to ignore this allow the socket to be closed 34 self.attached = False 35 pass 36 37 if self.host and self.started: 38 self.started = False 39 40 def request(self, cmd, timeout=10): 41 if self.host: 42 cmd = '\'' + cmd + '\'' 43 if self.ifname: 44 _cmd = ['wpa_cli', '-p', self.path, '-i', self.ifname, "raw " + cmd] 45 else: 46 _cmd = ['wpa_cli', '-g', self.path, "raw " + cmd] 47 status, buf = self.host.execute(_cmd) 48 return buf 49 50 def attach(self): 51 if self.attached: 52 return 53 54 if self.host: 55 if self.ifname: 56 _cmd = [ "wpa_cli", "-p", self.path, "-i", self.ifname ] 57 else: 58 _cmd = [ "wpa_cli", '-g', self.path] 59 self.proc = self.host.proc_run(_cmd) 60 self.attached = True 61 62 def detach(self): 63 if not self.attached: 64 return 65 66 if self.hostname and self.proc: 67 self.request("DETACH") 68 self.request("QUIT") 69 self.host.proc_stop(self.proc) 70 self.attached = False 71 self.proc = None 72 73 def terminate(self): 74 if self.attached: 75 try: 76 self.detach() 77 except Exception as e: 78 # Need to ignore this to allow the socket to be closed 79 self.attached = False 80 self.request("TERMINATE") 81 self.close() 82 83 def pending(self, timeout=0): 84 if self.host and self.proc: 85 return self.host.proc_pending(self.proc, timeout=timeout) 86 return False 87 88 def recv(self): 89 if self.host and self.proc: 90 res = self.host.proc_read(self.proc) 91 return res 92 return "" 93