mirror of
https://gitee.com/xiaohuolufeihua/bizhang_-obav.git
synced 2026-05-22 01:12:31 +00:00
mavsdk_tests: satisfy mypy type checks
This commit is contained in:
@@ -9,11 +9,14 @@ import subprocess
|
||||
import threading
|
||||
import errno
|
||||
from logger_helper import color, maybe_strip_color, colorize
|
||||
from typing import Dict, List
|
||||
from typing import Dict, List, TextIO, Optional
|
||||
|
||||
|
||||
class Runner:
|
||||
def __init__(self, log_dir: str, config: dict, verbose: bool):
|
||||
def __init__(self,
|
||||
log_dir: str,
|
||||
config: Dict[str, str],
|
||||
verbose: bool):
|
||||
self.name = ""
|
||||
self.cmd = ""
|
||||
self.cwd = ""
|
||||
@@ -22,9 +25,9 @@ class Runner:
|
||||
self.log_dir = log_dir
|
||||
self.config = config
|
||||
self.log_filename = ""
|
||||
self.log_fd = None
|
||||
self.log_fd: TextIO
|
||||
self.verbose = verbose
|
||||
self.output_queue: queue.Queue = queue.Queue()
|
||||
self.output_queue: queue.Queue[str] = queue.Queue()
|
||||
self.start_time = time.time()
|
||||
|
||||
def create_log_filename(self, model: str, test_filter: str) -> str:
|
||||
@@ -36,7 +39,7 @@ class Runner:
|
||||
datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%SZ"))
|
||||
# TODO: improve logfilename, create folder, create merged log.
|
||||
|
||||
def start(self):
|
||||
def start(self) -> None:
|
||||
if self.verbose:
|
||||
print("Running: {}".format(" ".join([self.cmd] + self.args)))
|
||||
|
||||
@@ -62,7 +65,8 @@ class Runner:
|
||||
self.thread = threading.Thread(target=self.process_output)
|
||||
self.thread.start()
|
||||
|
||||
def process_output(self):
|
||||
def process_output(self) -> None:
|
||||
assert self.process.stdout is not None
|
||||
while not self.stop_thread.is_set():
|
||||
line = self.process.stdout.readline()
|
||||
if line == "\n":
|
||||
@@ -71,10 +75,10 @@ class Runner:
|
||||
self.log_fd.write(line)
|
||||
self.log_fd.flush()
|
||||
|
||||
def poll(self):
|
||||
def poll(self) -> Optional[int]:
|
||||
return self.process.poll()
|
||||
|
||||
def wait(self, timeout_min):
|
||||
def wait(self, timeout_min: float) -> Optional[int]:
|
||||
try:
|
||||
return self.process.wait(timeout=timeout_min*60)
|
||||
except subprocess.TimeoutExpired:
|
||||
@@ -84,21 +88,21 @@ class Runner:
|
||||
print("stopped.")
|
||||
return errno.ETIMEDOUT
|
||||
|
||||
def get_output(self):
|
||||
def get_output(self) -> Optional[str]:
|
||||
try:
|
||||
output = self.output_queue.get(block=True, timeout=0.1)
|
||||
return maybe_strip_color(output)
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def print_output(self):
|
||||
def print_output(self) -> None:
|
||||
output = self.get_output()
|
||||
if not output:
|
||||
return
|
||||
print(colorize("[" + self.name.ljust(11) + "] " + output, color.RESET),
|
||||
end="")
|
||||
|
||||
def stop(self):
|
||||
def stop(self) -> int:
|
||||
atexit.unregister(self.stop)
|
||||
|
||||
self.stop_thread.set()
|
||||
@@ -131,7 +135,7 @@ class Runner:
|
||||
|
||||
return self.process.returncode
|
||||
|
||||
def time_elapsed_s(self):
|
||||
def time_elapsed_s(self) -> float:
|
||||
return time.time() - self.start_time
|
||||
|
||||
|
||||
@@ -175,8 +179,12 @@ class Px4Runner(Runner):
|
||||
|
||||
|
||||
class GzserverRunner(Runner):
|
||||
def __init__(self, workspace_dir, log_dir, config,
|
||||
speed_factor, verbose):
|
||||
def __init__(self,
|
||||
workspace_dir: str,
|
||||
log_dir: str,
|
||||
config: Dict[str, str],
|
||||
speed_factor: float,
|
||||
verbose: bool):
|
||||
super().__init__(log_dir, config, verbose)
|
||||
self.name = "gzserver"
|
||||
self.cwd = workspace_dir
|
||||
@@ -195,7 +203,11 @@ class GzserverRunner(Runner):
|
||||
|
||||
|
||||
class GzclientRunner(Runner):
|
||||
def __init__(self, workspace_dir, log_dir, config, verbose):
|
||||
def __init__(self,
|
||||
workspace_dir: str,
|
||||
log_dir: str,
|
||||
config: Dict[str, str],
|
||||
verbose: bool):
|
||||
super().__init__(log_dir, config, verbose)
|
||||
self.name = "gzclient"
|
||||
self.cwd = workspace_dir
|
||||
@@ -211,8 +223,13 @@ class GzclientRunner(Runner):
|
||||
|
||||
|
||||
class TestRunner(Runner):
|
||||
def __init__(self, workspace_dir, log_dir, config, test,
|
||||
mavlink_connection, verbose):
|
||||
def __init__(self,
|
||||
workspace_dir: str,
|
||||
log_dir: str,
|
||||
config: Dict[str, str],
|
||||
test: str,
|
||||
mavlink_connection: str,
|
||||
verbose: bool):
|
||||
super().__init__(log_dir, config, verbose)
|
||||
self.name = "test_runner"
|
||||
self.cwd = workspace_dir
|
||||
|
||||
Reference in New Issue
Block a user