mirror of
https://github.com/paparazzi/paparazzi.git
synced 2026-02-07 03:52:47 +08:00
329 lines
13 KiB
Python
329 lines
13 KiB
Python
# Copyright (C) 2008-2022 The Paparazzi Team
|
|
# released under GNU GPLv2 or later. See COPYING file.
|
|
import os.path
|
|
|
|
import console_widget
|
|
from generated.ui_session import Ui_Session
|
|
from PyQt5.QtWidgets import *
|
|
from PyQt5 import QtCore
|
|
import utils
|
|
import lxml.etree as ET
|
|
import paparazzi
|
|
from typing import List, Optional, Tuple, Dict
|
|
from program_widget import ProgramWidget, TabProgramsState
|
|
from tools_menu import ToolMenu
|
|
from programs_conf import *
|
|
from conf import *
|
|
from console_widget import ConsoleWidget
|
|
|
|
|
|
class SessionWidget(QWidget, Ui_Session):
|
|
|
|
programs_all_stopped = QtCore.pyqtSignal()
|
|
program_spawned = QtCore.pyqtSignal()
|
|
program_state_changed = QtCore.pyqtSignal(TabProgramsState)
|
|
tools_changed = QtCore.pyqtSignal(dict) # Dict[str, Tool]
|
|
|
|
def __init__(self, parent=None):
|
|
QWidget.__init__(self, parent=parent)
|
|
self.setupUi(self)
|
|
self.program_widgets: List[ProgramWidget] = []
|
|
self.console: ConsoleWidget = None
|
|
self.ac: Aircraft = None
|
|
self.sessions = []
|
|
self.tools: Dict[str, Tool] = {}
|
|
self.tools_menu = ToolMenu()
|
|
self.programs_state: TabProgramsState = TabProgramsState.IDLE
|
|
self.control_panel_combo.currentTextChanged.connect(self.on_control_panel_changed)
|
|
self.menu_button.addAction(self.save_session_action)
|
|
self.menu_button.addAction(self.save_as_action)
|
|
self.menu_button.addAction(self.rename_session_action)
|
|
self.menu_button.addAction(self.remove_session_action)
|
|
self.tools_menu.tool_clicked.connect(self.handle_new_tool)
|
|
self.start_session_button.clicked.connect(self.start_session)
|
|
self.startall_button.clicked.connect(self.start_all)
|
|
self.removeall_button.clicked.connect(self.remove_all)
|
|
self.stopall_button.clicked.connect(self.stop_all)
|
|
self.add_tool_button.clicked.connect(self.open_tools)
|
|
self.save_session_action.triggered.connect(self.handle_save)
|
|
self.save_as_action.triggered.connect(self.handle_save_as)
|
|
self.rename_session_action.triggered.connect(self.handle_rename)
|
|
self.remove_session_action.triggered.connect(self.remove_session)
|
|
|
|
def set_console(self, console: console_widget.ConsoleWidget):
|
|
self.console = console
|
|
|
|
def set_aircraft(self, ac: Aircraft):
|
|
self.ac = ac
|
|
|
|
def init(self):
|
|
self.update_control_panels()
|
|
self.on_control_panel_changed()
|
|
|
|
def on_control_panel_changed(self):
|
|
current_cp = self.control_panel_combo.currentText()
|
|
self.sessions = sorted(parse_sessions(current_cp), key=lambda session: session.name)
|
|
self.tools = parse_tools(current_cp)
|
|
self.tools_changed.emit(self.tools)
|
|
self.init_tools_menu()
|
|
sessions_names = [session.name for session in self.sessions]
|
|
self.sessions_combo.clear()
|
|
self.sessions_combo.addItems(["Simulation", "Replay"])
|
|
self.sessions_combo.insertSeparator(2)
|
|
self.sessions_combo.addItems(sessions_names)
|
|
last_session = utils.get_settings().value("ui/last_session", None, str)
|
|
if last_session is not None and last_session in sessions_names:
|
|
self.sessions_combo.setCurrentText(last_session)
|
|
else:
|
|
self.sessions_combo.setCurrentIndex(0)
|
|
|
|
def update_control_panels(self):
|
|
cpfs = paparazzi.get_list_of_controlpanel_files()
|
|
self.control_panel_combo.addItems(cpfs)
|
|
last_cp = utils.get_settings().value("ui/last_control_panel", None, str)
|
|
if last_cp is not None and last_cp in cpfs:
|
|
self.control_panel_combo.setCurrentText(last_cp)
|
|
|
|
def get_current_session(self) -> str:
|
|
"""
|
|
:return: current session name in comboBox.
|
|
"""
|
|
return self.sessions_combo.currentText()
|
|
|
|
def get_current_control_panel(self) -> str:
|
|
return self.control_panel_combo.currentText()
|
|
|
|
def start_session(self):
|
|
self.reset_programs_status()
|
|
combo_text = self.sessions_combo.currentText()
|
|
if combo_text == "Simulation":
|
|
self.start_simulation()
|
|
elif combo_text == "Replay":
|
|
self.start_replay()
|
|
else:
|
|
for session in self.sessions:
|
|
if session.name == combo_text:
|
|
for program in session.programs:
|
|
self.launch_program(program)
|
|
|
|
def start_replay(self):
|
|
lfp = Program.from_tool(self.tools["Log File Player"])
|
|
server = Program.from_tool(self.tools["Server"])
|
|
server.args.append(Arg("-n", None))
|
|
gcs = Program.from_tool(self.tools["PprzGCS"])
|
|
self.launch_program(lfp)
|
|
self.launch_program(server)
|
|
self.launch_program(gcs)
|
|
|
|
def start_simulation(self):
|
|
if "nps" not in self.ac.boards and "sim" not in self.ac.boards:
|
|
self.console.post_message(None, "No simulation target for {}.".format(self.ac.name))
|
|
return
|
|
|
|
elif "nps" in self.ac.boards and "sim" in self.ac.boards:
|
|
simulator, ok = QInputDialog.getItem(self, "Simulator", "Please choose the simulator:",
|
|
["nps", "sim"], editable=False)
|
|
if not ok:
|
|
return
|
|
elif "nps" in self.ac.boards:
|
|
simulator = "nps"
|
|
else:
|
|
# simulator is "sim"
|
|
simulator = "sim"
|
|
|
|
sim = Program.from_tool(self.tools["Simulator"])
|
|
sim.args.append(Arg("-t", simulator))
|
|
self.launch_program(sim)
|
|
datalink = Program.from_tool(self.tools["Data Link"])
|
|
datalink.args = [Arg("-udp", None), Arg("-udp_broadcast", None)]
|
|
self.launch_program(datalink)
|
|
server = Program.from_tool(self.tools["Server"])
|
|
server.args.append(Arg("-n", None))
|
|
gcs = Program.from_tool(self.tools["PprzGCS"])
|
|
self.launch_program(server)
|
|
self.launch_program(gcs)
|
|
|
|
def launch_program(self, program: Program):
|
|
if self.console is None:
|
|
raise Exception("Console not set!")
|
|
tool = self.tools[program.name]
|
|
args = [arg.args(self.ac) for arg in program.args]
|
|
flat_args = [item for sublist in args for item in sublist]
|
|
if tool.command.startswith("$"):
|
|
cmd = [tool.command[1:]] + flat_args
|
|
else:
|
|
cmd = [os.path.join(utils.PAPARAZZI_SRC, tool.command)] + flat_args
|
|
|
|
pw = ProgramWidget(tool.name, cmd, tool.icon, self.programs_widget)
|
|
self.program_widgets.append(pw)
|
|
lay: QVBoxLayout = self.programs_widget.layout()
|
|
lay.insertWidget(lay.count()-1, pw)
|
|
pw.ready_read_stderr.connect(lambda: self.console.handle_stderr(pw))
|
|
pw.ready_read_stdout.connect(lambda: self.console.handle_stdout(pw))
|
|
pw.finished.connect(lambda c, s: self.handle_program_finished(pw, c, s))
|
|
pw.started.connect(self.handle_program_started)
|
|
pw.remove.connect(lambda: self.remove_program(pw))
|
|
# if REMOVE_PROGRAMS_FINISHED:
|
|
# pw.finished.connect(lambda: self.remove_program(pw))
|
|
pw.start_program()
|
|
self.console.new_program(pw)
|
|
self.program_spawned.emit()
|
|
|
|
def remove_program(self, pw: ProgramWidget):
|
|
self.console.remove_program(pw)
|
|
pw.setParent(None)
|
|
# self.programs_widget.layout().removeWidget(pw)
|
|
self.program_widgets.remove(pw)
|
|
if len(self.program_widgets) == 0:
|
|
self.programs_all_stopped.emit()
|
|
# pw.deleteLater()
|
|
|
|
def any_program_running(self):
|
|
return any([pw.state() == QtCore.QProcess.Running for pw in self.program_widgets])
|
|
|
|
def handle_program_finished(self, pw, c, s):
|
|
self.console.handle_program_finished(pw, c, s)
|
|
if not self.any_program_running():
|
|
self.programs_all_stopped.emit()
|
|
|
|
if c != 0 and c != 15:
|
|
self.programs_state = TabProgramsState.ERROR
|
|
else:
|
|
if not self.any_program_running() and self.programs_state != TabProgramsState.ERROR:
|
|
self.programs_state = TabProgramsState.IDLE
|
|
self.program_state_changed.emit(self.programs_state)
|
|
|
|
def handle_program_started(self):
|
|
self.programs_state = TabProgramsState.RUNNING
|
|
self.program_state_changed.emit(self.programs_state)
|
|
def reset_programs_status(self):
|
|
self.programs_state = TabProgramsState.IDLE
|
|
self.program_state_changed.emit(self.programs_state)
|
|
|
|
def stop_all(self):
|
|
for pw in self.program_widgets:
|
|
pw.terminate()
|
|
self.reset_programs_status()
|
|
|
|
def start_all(self):
|
|
for pw in self.program_widgets:
|
|
pw.start_program()
|
|
self.reset_programs_status()
|
|
|
|
def remove_all(self):
|
|
for pw in list(self.program_widgets):
|
|
pw.handle_remove()
|
|
self.reset_programs_status()
|
|
|
|
def init_tools_menu(self):
|
|
self.tools_menu.clear()
|
|
for t in self.tools.values():
|
|
self.tools_menu.add_tool(t)
|
|
|
|
def handle_new_tool(self, name):
|
|
p = Program.from_tool(self.tools[name])
|
|
self.launch_program(p)
|
|
|
|
def open_tools(self):
|
|
if self.tools_menu.isVisible():
|
|
self.tools_menu.close()
|
|
else:
|
|
bottomLeft = self.mapToGlobal(self.add_tool_button.geometry().bottomLeft())
|
|
self.tools_menu.move(bottomLeft)
|
|
self.tools_menu.show()
|
|
self.tools_menu.setFocus(QtCore.Qt.PopupFocusReason)
|
|
|
|
def handle_save(self):
|
|
session_name = self.sessions_combo.currentText()
|
|
programs = self.get_programs()
|
|
session = Session(session_name, programs)
|
|
self.replace_session(session)
|
|
self.save_sessions()
|
|
|
|
def handle_save_as(self):
|
|
session_name, ok = QInputDialog.getText(self, "Session name", "enter the session name:")
|
|
if not ok:
|
|
return
|
|
for session in self.sessions:
|
|
if session.name == session_name:
|
|
QMessageBox.warning(self, "Error", "A session with this name already exits.\nTry again with a new name.")
|
|
return
|
|
programs = self.get_programs()
|
|
session = Session(session_name, programs)
|
|
self.sessions_combo.addItem(session_name)
|
|
self.sessions_combo.setCurrentText(session_name)
|
|
self.sessions.append(session)
|
|
self.save_sessions()
|
|
|
|
def handle_rename(self):
|
|
for session_orig in self.sessions:
|
|
if session_orig.name == self.sessions_combo.currentText():
|
|
break
|
|
else:
|
|
print("session not found")
|
|
return
|
|
session_name, ok = QInputDialog.getText(self, "Session name", "enter the session name:")
|
|
if not ok:
|
|
return
|
|
for session in self.sessions:
|
|
if session.name == session_name:
|
|
QMessageBox.warning(self, "Error", "A session with this name already exits.\nTry again with a new name.")
|
|
return
|
|
session_orig.name = session_name
|
|
self.save_sessions()
|
|
|
|
def remove_session(self):
|
|
for session in self.sessions:
|
|
if session.name == self.sessions_combo.currentText():
|
|
self.sessions.remove(session)
|
|
i = self.sessions_combo.currentIndex()
|
|
self.sessions_combo.removeItem(i)
|
|
self.save_sessions()
|
|
return
|
|
print("session {} not found".format(self.sessions_combo.currentText()))
|
|
|
|
def replace_session(self, session):
|
|
for i, s in enumerate(self.sessions):
|
|
if s.name == session.name:
|
|
self.sessions[i] = session
|
|
break
|
|
|
|
def save_sessions(self):
|
|
ctrl_panel_path = os.path.join(utils.CONF_DIR, self.get_current_control_panel())
|
|
parser = ET.XMLParser(remove_blank_text=True)
|
|
control_panel = ET.parse(ctrl_panel_path, parser)
|
|
xml_sessions = ET.Element("section")
|
|
xml_sessions.set("name", "sessions")
|
|
for session in self.sessions:
|
|
xml_sessions.append(session.to_xml())
|
|
for xml_section in control_panel.getroot().findall("section"):
|
|
if xml_section.get("name") == "sessions":
|
|
control_panel.getroot().replace(xml_section, xml_sessions)
|
|
break
|
|
control_panel.write(ctrl_panel_path, pretty_print=True)
|
|
print("sessions saved to {}".format(ctrl_panel_path))
|
|
|
|
def get_programs(self):
|
|
programs = []
|
|
for p in self.program_widgets:
|
|
name = p.shortname
|
|
args = []
|
|
if len(p.cmd) > 0:
|
|
arg = None
|
|
for param in p.cmd[1:]:
|
|
if param.startswith("-"):
|
|
# if it start with "-", make a new arg
|
|
arg = Arg(param, None)
|
|
args.append(arg)
|
|
else:
|
|
if arg is not None and arg.flag.startswith("-") and arg.constant is None:
|
|
# if it don't starts with -, but the previous did, and the constant is not yet filled, fill the constant
|
|
arg.constant = param
|
|
else:
|
|
# if it don't starts with -, nor the previous, its probably a mandatory argument
|
|
arg = Arg(param, None)
|
|
args.append(arg)
|
|
program = Program(name, args)
|
|
programs.append(program)
|
|
return programs
|