Files
paparazzi/sw/supervision/python/log_widget.py
2023-11-27 08:52:30 +01:00

138 lines
5.6 KiB
Python

from PyQt5.QtWidgets import *
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QProcess
import utils
from generated.ui_log_widget import Ui_LogWidget
from conf import *
import re
import shutil
TMP_DIR = "/tmp/pprz"
ENVS = [
"PAPARAZZI_HOME",
"PAPARAZZI_SRC",
"HOME"
]
DATE_TAGS = ["YY", "MM", "DD", "hh", "mm", "ss"]
OTHER_TAGS = ["AIRCRAFT", "AC_ID"]
class LogWidget(QWidget, Ui_LogWidget):
def __init__(self, parent=None, *args, **kwargs):
QWidget.__init__(self, parent=parent, *args, **kwargs)
self.setupUi(self)
self.process = QProcess(self)
self.date = None
self.basename = None
self.conf: Conf = None
self.substitutions = {}
self.reset_output()
self.input_button.clicked.connect(self.select_input)
self.output_button.clicked.connect(self.select_output)
self.reset_output_button.clicked.connect(self.reset_output)
self.process_button.clicked.connect(self.start_processing)
self.help_button.clicked.connect(self.show_help)
self.process.finished.connect(self.handle_finished)
self.process.readyReadStandardOutput.connect(self.read_stdout)
self.process.readyReadStandardError.connect(self.read_stderr)
def set_conf(self, conf):
self.conf = conf
def select_input(self):
home = os.path.expandvars("$HOME")
(path, _) = QFileDialog().getOpenFileName(self, "Select log file", home, "Log (*.LOG);; All (*.*)")
self.input_edit.setText(path)
def select_output(self):
start_path = os.path.expandvars(self.output_edit.text())
path = QFileDialog().getExistingDirectory(self, "Select log file", start_path, QFileDialog.ShowDirsOnly)
for env in ENVS:
res = os.getenv(env)
if res in path:
path_sub = path.replace(res, f"${env}")
if os.path.expandvars(path_sub) == path:
# check that the substitution is valid
path = path_sub
self.output_edit.setText(path)
def reset_output(self):
settings = utils.get_settings()
output_path = settings.value("default_log_path", "$PAPARAZZI_HOME/var/logs/$YY-$MM/$DD/$AIRCRAFT", str)
self.output_edit.setText(output_path)
def start_processing(self):
if (self.process.state() == QProcess.ProcessState.NotRunning and
self.input_edit.text() != ""):
program = os.path.expandvars("$PAPARAZZI_SRC/sw/logalizer/sd2log")
input = self.input_edit.text()
output = os.path.expandvars(self.output_edit.text())
if not os.path.exists(TMP_DIR):
os.makedirs(TMP_DIR)
self.process.start(program, [input, TMP_DIR])
self.status_label.setText("processing...")
def handle_finished(self, exit_code: int, exit_status: QProcess.ExitStatus):
if exit_status != QProcess.ExitStatus.NormalExit or exit_code != 0:
self.status_label.setText(f"exited with code {exit_code}")
return
self.status_label.setText("Log processed, copying files...")
ac_id = self.get_ac_id(f"{TMP_DIR}/{self.basename}.data")
ac = self.conf[ac_id]
self.substitutions["AC_ID"] = str(ac_id)
self.substitutions["AIRCRAFT"] = ac.name
out = os.path.expandvars(self.output_edit.text())
for tag in DATE_TAGS + OTHER_TAGS:
try:
value = self.substitutions[tag]
out = out.replace(f"${tag}", value)
except KeyError:
print(f"no tag {tag}")
self.status_label.setText(f"Error: unknown tag {tag}!")
return
if not os.path.exists(out):
os.makedirs(out)
shutil.move(f"{TMP_DIR}/{self.basename}.data", f"{out}/{self.basename}.data")
shutil.move(f"{TMP_DIR}/{self.basename}.log", f"{out}/{self.basename}.log")
shutil.move(f"{TMP_DIR}/{self.basename}.tlm", f"{out}/{self.basename}.tlm")
self.status_label.setText(f"Files extracted in {out}")
def get_ac_id(self, filename) -> int:
with open(filename, 'r') as data:
ac_id = int(data.readline().split()[1])
return ac_id
def read_stdout(self):
out = self.process.readAllStandardOutput()
out = out.data().decode()
def read_stderr(self):
err = self.process.readAllStandardError()
err = err.data().decode()
for line in err.splitlines():
m = re.match(r'(.*)\.data', line)
if m is not None:
self.basename = m.group(1)
m = re.match(r'(\d{2})_(\d{2})_(\d{2})__(\d{2})_(\d{2})_(\d{2})', self.basename)
if len(m.groups()) == 6:
for tag, value in zip(DATE_TAGS, m.groups()):
self.substitutions[tag] = value
def show_help(self):
QMessageBox.information(self, "logs extractor help",
"<h1>SD log extractor</h1><br/>"
"Extract logs from flight recorder.<br/>"
"You can use substitution variables in the output directory:<br/>"
"<b>$AIRCRAFT</b>: aircraft name<br/>"
"<b>$AC_ID</b>: aircraft id<br/>"
"<b>$YY</b>: year<br/>"
"<b>$MM</b>: month<br/>"
"<b>$DD</b>: day<br/>"
"<b>$hh</b>: hour<br/>"
"<b>$mm</b>: minutes<br/>"
"<b>$ss</b>: seconds<br/>")