from PyQt5.QtWidgets import * from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import QProcess import utils from generated.ui_log_widget import Ui_LogWidget from conf import * import re import shutil TMP_DIR = "/tmp/pprz" ENVS = [ "PAPARAZZI_HOME", "PAPARAZZI_SRC", "HOME" ] DATE_TAGS = ["YY", "MM", "DD", "hh", "mm", "ss"] OTHER_TAGS = ["AIRCRAFT", "AC_ID"] class LogWidget(QWidget, Ui_LogWidget): def __init__(self, parent=None, *args, **kwargs): QWidget.__init__(self, parent=parent, *args, **kwargs) self.setupUi(self) self.process = QProcess(self) self.date = None self.basename = None self.conf: Conf = None self.substitutions = {} self.reset_output() self.input_button.clicked.connect(self.select_input) self.output_button.clicked.connect(self.select_output) self.reset_output_button.clicked.connect(self.reset_output) self.process_button.clicked.connect(self.start_processing) self.help_button.clicked.connect(self.show_help) self.process.finished.connect(self.handle_finished) self.process.readyReadStandardOutput.connect(self.read_stdout) self.process.readyReadStandardError.connect(self.read_stderr) def set_conf(self, conf): self.conf = conf def select_input(self): home = os.path.expandvars("$HOME") (path, _) = QFileDialog().getOpenFileName(self, "Select log file", home, "Log (*.LOG);; All (*.*)") self.input_edit.setText(path) def select_output(self): start_path = os.path.expandvars(self.output_edit.text()) path = QFileDialog().getExistingDirectory(self, "Select log file", start_path, QFileDialog.ShowDirsOnly) for env in ENVS: res = os.getenv(env) if res in path: path_sub = path.replace(res, f"${env}") if os.path.expandvars(path_sub) == path: # check that the substitution is valid path = path_sub self.output_edit.setText(path) def reset_output(self): settings = utils.get_settings() output_path = settings.value("default_log_path", "$PAPARAZZI_HOME/var/logs/$YY-$MM/$DD/$AIRCRAFT", str) self.output_edit.setText(output_path) def start_processing(self): if (self.process.state() == QProcess.ProcessState.NotRunning and self.input_edit.text() != ""): program = os.path.expandvars("$PAPARAZZI_SRC/sw/logalizer/sd2log") input = self.input_edit.text() output = os.path.expandvars(self.output_edit.text()) if not os.path.exists(TMP_DIR): os.makedirs(TMP_DIR) self.process.start(program, [input, TMP_DIR]) self.status_label.setText("processing...") def handle_finished(self, exit_code: int, exit_status: QProcess.ExitStatus): if exit_status != QProcess.ExitStatus.NormalExit or exit_code != 0: self.status_label.setText(f"exited with code {exit_code}") return self.status_label.setText("Log processed, copying files...") ac_id = self.get_ac_id(f"{TMP_DIR}/{self.basename}.data") ac = self.conf[ac_id] self.substitutions["AC_ID"] = str(ac_id) self.substitutions["AIRCRAFT"] = ac.name out = os.path.expandvars(self.output_edit.text()) for tag in DATE_TAGS + OTHER_TAGS: try: value = self.substitutions[tag] out = out.replace(f"${tag}", value) except KeyError: print(f"no tag {tag}") self.status_label.setText(f"Error: unknown tag {tag}!") return if not os.path.exists(out): os.makedirs(out) shutil.move(f"{TMP_DIR}/{self.basename}.data", f"{out}/{self.basename}.data") shutil.move(f"{TMP_DIR}/{self.basename}.log", f"{out}/{self.basename}.log") shutil.move(f"{TMP_DIR}/{self.basename}.tlm", f"{out}/{self.basename}.tlm") self.status_label.setText(f"Files extracted in {out}") def get_ac_id(self, filename) -> int: with open(filename, 'r') as data: ac_id = int(data.readline().split()[1]) return ac_id def read_stdout(self): out = self.process.readAllStandardOutput() out = out.data().decode() def read_stderr(self): err = self.process.readAllStandardError() err = err.data().decode() for line in err.splitlines(): m = re.match(r'(.*)\.data', line) if m is not None: self.basename = m.group(1) m = re.match(r'(\d{2})_(\d{2})_(\d{2})__(\d{2})_(\d{2})_(\d{2})', self.basename) if len(m.groups()) == 6: for tag, value in zip(DATE_TAGS, m.groups()): self.substitutions[tag] = value def show_help(self): QMessageBox.information(self, "logs extractor help", "