Files
paparazzi/sw/supervision/python/processes.py
T
Gautier Hattenberger 9763a70ca9 New paparazzicenter in Python (#1811)
This is the result of a student project to develop a new paparazzi center based on Python/Qt.
See http://wiki.paparazziuav.org/wiki/Paparazzi_Center/Evolutions for the main idea and development process.
It can be tested by running the main paparazzi launcher with the -python option. I more convenient way might be added later.
This PR is mostly a way to not loose this work, and other people are encourage to improve it in many aspects (HMI, functionalities, ...).
2016-07-19 22:52:15 +02:00

239 lines
8.4 KiB
Python

# Paparazzi center utilities
#
# Copyright (C) 2016 ENAC, Florian BITARD (intern student)
#
# This file is part of paparazzi.
#
# paparazzi is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# paparazzi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with paparazzi; see the file COPYING. If not, write to
# the Free Software Foundation, 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
###############################################################################
# [Imports]
import lib.environment as env
import lib.console as cs
import PyQt5.QtCore as Core
import logging
import os
import signal
###############################################################################
# [Constants]
LOGGER = logging.getLogger("[PROCESSES]")
AC_MAKEFILE_NAME = "Makefile.ac"
CONF_FLAG_NAME = "AIRCRAFT"
DEVICE_FLAG_NAME = "FLASH_MODE"
DEFAULT_EXIT_CODE = 2
INTERRUPTED_EXIT_CODE = -1
SUCCESS_EXIT_CODE = 0
CLEAN = "clean"
BUILD = "build"
UPLOAD = "upload"
PROGRAM = "program"
TOOL = "tool"
CLEAN_TARGET_KEY = "clean_ac"
BUILD_TARGET_KEY = ".compile"
UPLOAD_TARGET_KEY = ".upload"
CONF_FLAG = "@" + CONF_FLAG_NAME
TARGET_FLAG = "@TARGET"
CONF_ID_FLAG = "@AC_ID"
###############################################################################
# [Stream class]
class LoggerStream(Core.QObject):
""" Class to define a Stream object."""
logger_log_sent = Core.pyqtSignal(str, str, str)
def __init__(self):
super(LoggerStream, self).__init__()
# Reimplemented method in order to write the logs in the console :
def write(self, line):
log, flag = cs.analyse_log_line(line)
log_type = cs.APPLICATION_MESSAGE_TYPE
self.logger_log_sent.emit(log, flag, log_type)
###############################################################################
# [Process class]
class Process(Core.QObject):
"""Class to upload the built code to a device."""
process_killed = Core.pyqtSignal()
process_log_sent = Core.pyqtSignal(str, str, str)
def __init__(self, process_type,
configuration=None, target=None, device=None, program=None):
"""
:param process_type:
:param configuration:
:param target:
:param device:
:param program:
-> Declare a Process object as a QObject derivative.
-> Give it a name and the necessary parameters for its type.
-> Generate a command for the system call by the Popen object that
manages the subprocess and allows to redirect the output.
-> The process runs into an independent thread.
-> A queue is used to collect the logs from the Popen output and
an other one is used to send it to the QTextEdit integrated console.
-> Logs flags are collected for information.
-> Exit code is initialized to default value. Must change in case of
normal exit, error or user interruption.
"""
super(Process, self).__init__()
self.type = process_type
self.name = None
self.config = configuration
self.target = target
self.device = device
self.program = program
if self.type == CLEAN:
self.command = self.generate_make_command(CLEAN_TARGET_KEY)
self.name = " - ".join([self.type.upper(),
self.config.name])
elif self.type == BUILD:
self.command = self.generate_make_command(BUILD_TARGET_KEY)
self.name = " - ".join([self.type.upper(), self.config.name,
self.target.name])
elif self.type == UPLOAD:
self.command = self.generate_make_command(UPLOAD_TARGET_KEY)
self.name = " - ".join([self.type.upper(), self.config.name,
self.target.name, self.device.name])
else:
self.command = self.generate_program_command()
self.name = " - ".join([self.type.upper(),
self.program.name])
self.subprocess = Core.QProcess()
self.subprocess.setProcessChannelMode(Core.QProcess.MergedChannels)
self.subprocess.setReadChannel(Core.QProcess.StandardOutput)
self.process_killed.connect(self.emergency_stop)
self.exit_code = DEFAULT_EXIT_CODE
self.flags = {cs.ERROR_FLAG: 0,
cs.WARNING_FLAG: 0,
cs.INFO_FLAG: 0}
def generate_make_command(self, target_key):
"""
:param target_key:
-> Generate a system command to compile files by a Makefile and
putting the right arguments if given.
"""
if self.config is not None:
aircraft_term = CONF_FLAG_NAME + "=" + self.config.name
if self.target is not None:
target_key = self.target.name + target_key
command_terms = ["make", "-C", env.PAPARAZZI_HOME, "-f",
AC_MAKEFILE_NAME,
aircraft_term, target_key]
if self.device is not None and self.device.variable[1]:
device_term = DEVICE_FLAG_NAME + "=" + self.device.variable[1]
command_terms.insert(-1, device_term)
return " ".join(command_terms)
def generate_program_command(self):
"""
-> Generate a system command to run a program and add its options
if it has some.
"""
if self.program is not None:
full_command = os.path.join(env.PAPARAZZI_HOME,
self.program.command)
for option in self.program.options:
if type(option) is tuple:
flag, value = option
if value == CONF_FLAG:
full_command += " " + flag + " " + self.config.name
elif value == TARGET_FLAG:
full_command += " " + flag + " " + self.target.name
elif value == CONF_ID_FLAG:
full_command += " " + flag + " " + self.config.id
else:
full_command += " " + flag + " " + value
else:
full_command += " " + option
return full_command
def check_before_start(self):
# TODO IF NECESSARY !!!
return self == self
def start(self):
"""
-> Start the thread => start the worker => call the run method.
"""
LOGGER.info("'%s' process running ... (command='%s')",
self.name, self.command)
self.subprocess.start(self.command)
self.subprocess.readyReadStandardOutput.connect(self.send_text)
self.subprocess.finished.connect(self.finish_process)
def send_text(self):
"""
-> Analyse an process output line to find a flag in it.
-> Send the item by the sending queue object.
-> Collect the flag found.
"""
q_byte_array = self.subprocess.readAllStandardOutput()
string = str(q_byte_array, encoding="utf-8").strip()
for line in string.split("\n"):
log, flag = cs.analyse_log_line(line)
log_type = cs.PROCESS_MESSAGE_TYPE
self.process_log_sent.emit(log, flag, log_type)
if flag != cs.DEFAULT_FLAG:
self.flags[flag] += 1
def finish_process(self):
"""
-> If the process finished, get the exit code.
-> Else, the process crashed...
"""
if self.subprocess.exitStatus() == Core.QProcess.NormalExit:
self.exit_code = self.subprocess.exitCode()
LOGGER.info("'%s' process finished with exit code %s.\n",
self.name, self.exit_code)
else:
self.exit_code = INTERRUPTED_EXIT_CODE
LOGGER.error("'%s' process crashed, probably stopped by user !\n",
self.name)
def emergency_stop(self):
"""
-> Kill the subprocess by getting its ProcessID.
"""
os.kill(self.subprocess.pid(), signal.SIGKILL)