1# Host class 2# Copyright (c) 2016, Qualcomm Atheros, Inc. 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import logging 8import subprocess 9import threading 10import tempfile 11import os 12import traceback 13import select 14 15logger = logging.getLogger() 16 17def remote_compatible(func): 18 func.remote_compatible = True 19 return func 20 21def execute_thread(command, reply): 22 cmd = ' '.join(command) 23 logger.debug("thread run: " + cmd) 24 err = tempfile.TemporaryFile() 25 try: 26 status = 0 27 buf = subprocess.check_output(command, stderr=err, bufsize=0).decode() 28 except subprocess.CalledProcessError as e: 29 status = e.returncode 30 err.seek(0) 31 buf = err.read() 32 err.close() 33 34 logger.debug("thread cmd: " + cmd) 35 logger.debug("thread exit status: " + str(status)) 36 logger.debug("thread exit buf: " + str(buf)) 37 reply.append(status) 38 reply.append(buf) 39 40def gen_reaper_file(conf): 41 fd, filename = tempfile.mkstemp(dir='/tmp', prefix=conf + '-') 42 f = os.fdopen(fd, 'w') 43 44 f.write("#!/bin/sh\n") 45 f.write("name=\"$(basename $0)\"\n") 46 f.write("echo $$ > /tmp/$name.pid\n") 47 f.write("exec \"$@\"\n"); 48 49 return filename; 50 51class Host(): 52 def __init__(self, host=None, ifname=None, port=None, name="", user="root"): 53 self.host = host 54 self.name = name 55 self.user = user 56 self.monitors = [] 57 self.monitor_thread = None 58 self.logs = [] 59 self.ifname = ifname 60 self.port = port 61 self.dev = None 62 self.monitor_params = [] 63 if self.name == "" and host != None: 64 self.name = host 65 66 def local_execute(self, command): 67 logger.debug("execute: " + str(command)) 68 err = tempfile.TemporaryFile() 69 try: 70 status = 0 71 buf = subprocess.check_output(command, stderr=err) 72 except subprocess.CalledProcessError as e: 73 status = e.returncode 74 err.seek(0) 75 buf = err.read() 76 err.close() 77 78 logger.debug("status: " + str(status)) 79 logger.debug("buf: " + str(buf)) 80 return status, buf.decode() 81 82 def execute(self, command): 83 if self.host is None: 84 return self.local_execute(command) 85 86 if self.user: 87 cmd = ["ssh", self.user + "@" + self.host, ' '.join(command)] 88 else: 89 cmd = ["ssh", self.host, ' '.join(command)] 90 _cmd = self.name + " execute: " + ' '.join(cmd) 91 logger.debug(_cmd) 92 err = tempfile.TemporaryFile() 93 try: 94 status = 0 95 buf = subprocess.check_output(cmd, stderr=err) 96 except subprocess.CalledProcessError as e: 97 status = e.returncode 98 err.seek(0) 99 buf = err.read() 100 err.close() 101 102 logger.debug(self.name + " status: " + str(status)) 103 logger.debug(self.name + " buf: " + str(buf)) 104 return status, buf.decode() 105 106 # async execute 107 def thread_run(self, command, res, use_reaper=True): 108 if use_reaper: 109 filename = gen_reaper_file("reaper") 110 self.send_file(filename, filename) 111 self.execute(["chmod", "755", filename]) 112 _command = [filename] + command 113 else: 114 filename = "" 115 _command = command 116 117 if self.host is None: 118 cmd = _command 119 else: 120 if self.user: 121 cmd = ["ssh", self.user + "@" + self.host, ' '.join(_command)] 122 else: 123 cmd = ["ssh", self.host, ' '.join(_command)] 124 _cmd = self.name + " thread_run: " + ' '.join(cmd) 125 logger.debug(_cmd) 126 t = threading.Thread(target=execute_thread, name=filename, args=(cmd, res)) 127 t.start() 128 return t 129 130 def thread_stop(self, t): 131 if t.name.find("reaper") == -1: 132 raise Exception("use_reaper required") 133 134 pid_file = t.name + ".pid" 135 136 if t.is_alive(): 137 cmd = ["kill `cat " + pid_file + "`"] 138 self.execute(cmd) 139 140 # try again 141 self.thread_wait(t, 5) 142 if t.is_alive(): 143 cmd = ["kill `cat " + pid_file + "`"] 144 self.execute(cmd) 145 146 # try with -9 147 self.thread_wait(t, 5) 148 if t.is_alive(): 149 cmd = ["kill -9 `cat " + pid_file + "`"] 150 self.execute(cmd) 151 152 self.thread_wait(t, 5) 153 if t.is_alive(): 154 raise Exception("thread still alive") 155 156 self.execute(["rm", pid_file]) 157 self.execute(["rm", t.name]) 158 self.local_execute(["rm", t.name]) 159 160 def thread_wait(self, t, wait=None): 161 if wait == None: 162 wait_str = "infinite" 163 else: 164 wait_str = str(wait) + "s" 165 166 logger.debug(self.name + " thread_wait(" + wait_str + "): ") 167 if t.is_alive(): 168 t.join(wait) 169 170 def proc_pending(self, proc, timeout=0): 171 [r, w, e] = select.select([proc.stdout], [], [], timeout) 172 if r: 173 return True 174 return False 175 176 def proc_run(self, command): 177 filename = gen_reaper_file("reaper") 178 self.send_file(filename, filename) 179 self.execute(["chmod", "755", filename]) 180 _command = [filename] + command 181 182 if self.host: 183 if self.user: 184 cmd = ["ssh", self.user + "@" + self.host, ' '.join(_command)] 185 else: 186 cmd = ["ssh", self.host, ' '.join(_command)] 187 else: 188 cmd = _command 189 190 _cmd = self.name + " proc_run: " + ' '.join(cmd) 191 logger.debug(_cmd) 192 err = tempfile.TemporaryFile() 193 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=err, 194 bufsize=0) 195 proc.reaper_file = filename 196 return proc 197 198 def proc_wait_event(self, proc, events, timeout=10): 199 if not isinstance(events, list): 200 raise Exception("proc_wait_event() events not a list") 201 202 logger.debug(self.name + " proc_wait_event: " + ' '.join(events) + " timeout: " + str(timeout)) 203 start = os.times()[4] 204 try: 205 while True: 206 while self.proc_pending(proc): 207 line = proc.stdout.readline() 208 if not line: 209 return None 210 line = line.decode() 211 logger.debug(line.strip('\n')) 212 for event in events: 213 if event in line: 214 return line 215 now = os.times()[4] 216 remaining = start + timeout - now 217 if remaining <= 0: 218 break 219 if not self.proc_pending(proc, timeout=remaining): 220 break 221 except: 222 logger.debug(traceback.format_exc()) 223 pass 224 return None 225 226 def proc_write(self, proc, cmd): 227 return proc.stdout.write(cmd) 228 229 def proc_read(self, proc, timeout=0): 230 if not self.proc_pending(proc): 231 return None 232 res = proc.stdout.read(4094).decode() 233 try: 234 r = str(res) 235 except UnicodeDecodeError as e: 236 r = res 237 return r 238 239 def proc_stop(self, proc): 240 if not proc: 241 return 242 243 self.execute(["kill `cat " + proc.reaper_file + ".pid`"]) 244 self.execute(["rm", proc.reaper_file + ".pid"]) 245 self.execute(["rm", proc.reaper_file]) 246 self.local_execute(["rm", proc.reaper_file]) 247 proc.kill() 248 249 def proc_dump(self, proc): 250 if not proc: 251 return "" 252 return proc.stdout.read() 253 254 def execute_and_wait_event(self, command, events, timeout=10): 255 proc = None 256 ev = None 257 258 try: 259 proc = self.proc_run(command) 260 ev = self.proc_wait_event(proc, events, timeout) 261 except: 262 pass 263 264 self.proc_stop(proc) 265 return ev 266 267 def add_log(self, log_file): 268 self.logs.append(log_file) 269 270 def get_logs(self, local_log_dir=None): 271 for log in self.logs: 272 if local_log_dir: 273 if self.user: 274 self.local_execute(["scp", self.user + "@[" + self.host + "]:" + log, local_log_dir]) 275 else: 276 self.local_execute(["scp", "[" + self.host + "]:" + log, local_log_dir]) 277 self.execute(["rm", log]) 278 del self.logs[:] 279 280 def send_file(self, src, dst): 281 if self.host is None: 282 return 283 if self.user: 284 self.local_execute(["scp", src, 285 self.user + "@[" + self.host + "]:" + dst]) 286 else: 287 self.local_execute(["scp", src, "[" + self.host + "]:" + dst]) 288