mirror of
https://github.com/paparazzi/paparazzi.git
synced 2026-05-24 22:05:58 +08:00
[python] first draft of python_serial using pprz messages
This commit is contained in:
@@ -6,8 +6,71 @@ Paparazzi message representation
|
||||
from __future__ import print_function
|
||||
import sys
|
||||
import json
|
||||
import struct
|
||||
import messages_xml_map
|
||||
from enum import Enum
|
||||
|
||||
STX = 0x99
|
||||
STX_TS = 0x98
|
||||
|
||||
class PprzParserState(Enum):
|
||||
WaitSTX = 1
|
||||
GotSTX = 2
|
||||
GotLength = 3
|
||||
GotPayload = 4
|
||||
GotCRC1 = 5
|
||||
|
||||
class PprzParser(object):
|
||||
"""parser for binary Paparazzi messages"""
|
||||
def __init__(self):
|
||||
self.reset_parser()
|
||||
|
||||
def parse_byte(self, c):
|
||||
"""parse new byte, return True when a new full message is available"""
|
||||
b = ord(c)
|
||||
if self.state == PprzParserState.WaitSTX:
|
||||
if b == STX:
|
||||
self.state = PprzParserState.GotSTX
|
||||
elif self.state == PprzParserState.GotSTX:
|
||||
self.length = b - 4
|
||||
self.buf = []
|
||||
self.ck_a = b % 256
|
||||
self.ck_b = b % 256
|
||||
self.idx = 0
|
||||
self.state = PprzParserState.GotLength
|
||||
elif self.state == PprzParserState.GotLength:
|
||||
self.buf.append(c);
|
||||
self.ck_a = (self.ck_a + b) % 256
|
||||
self.ck_b = (self.ck_b + self.ck_a) % 256
|
||||
self.idx += 1
|
||||
if self.idx == self.length:
|
||||
self.state = PprzParserState.GotPayload
|
||||
elif self.state == PprzParserState.GotPayload:
|
||||
if self.ck_a == b:
|
||||
self.state = PprzParserState.GotCRC1
|
||||
else:
|
||||
self.state = PprzParserState.WaitSTX
|
||||
elif self.state == PprzParserState.GotCRC1:
|
||||
self.state = PprzParserState.WaitSTX
|
||||
if self.ck_b == b:
|
||||
"""New message available"""
|
||||
return True
|
||||
else:
|
||||
self.state = PprzParserState.WaitSTX
|
||||
return False
|
||||
|
||||
def get_buffer(self):
|
||||
return self.buf
|
||||
|
||||
def reset_parser(self):
|
||||
self.state = PprzParserState.WaitSTX
|
||||
self.length = 0
|
||||
self.buf = []
|
||||
self.ck_a = 0
|
||||
self.ck_b = 0
|
||||
self.idx = 0
|
||||
|
||||
|
||||
|
||||
class PprzMessageError(Exception):
|
||||
def __init__(self, message, inner_exception=None):
|
||||
@@ -61,6 +124,23 @@ class PprzMessage(object):
|
||||
"""Get list of field types."""
|
||||
return self._fieldtypes
|
||||
|
||||
def fieldbintypes(self, t):
|
||||
"""Get type and lenfth for binary format"""
|
||||
data_types = {
|
||||
'float': ['f', 4],
|
||||
'uint8': ['B', 1],
|
||||
'uint16': ['H', 2],
|
||||
'uint32': ['L', 4],
|
||||
'int8': ['b', 1],
|
||||
'int16': ['h', 2],
|
||||
'int32': ['l', 4],
|
||||
'char': ['c', 1]
|
||||
}
|
||||
baseType = t
|
||||
if t[-2:] == "[]":
|
||||
baseType = t[:-2]
|
||||
return data_types[baseType]
|
||||
|
||||
def get_field(self, idx):
|
||||
"""Get field value by index."""
|
||||
return self._fieldvalues[idx]
|
||||
@@ -76,8 +156,17 @@ class PprzMessage(object):
|
||||
if len(values) == len(self.fieldnames):
|
||||
self._fieldvalues = values
|
||||
else:
|
||||
print("set values %i %i" %(len(values), len(self.fieldnames)))
|
||||
raise PprzMessageError("Error: fields not matching")
|
||||
|
||||
def set_value_by_name(self, name, value):
|
||||
# Try to set a value from its name
|
||||
for idx, f in enumerate(self.fieldnames):
|
||||
if f == name:
|
||||
self._fieldvalues[idx] = value
|
||||
return
|
||||
raise AttributeError("No such attribute %s" % name)
|
||||
|
||||
def __str__(self):
|
||||
ret = '%s.%s {' % (self.msg_class, self.name)
|
||||
for idx, f in enumerate(self.fieldnames):
|
||||
@@ -109,6 +198,64 @@ class PprzMessage(object):
|
||||
ivy_str += ' '
|
||||
return ivy_str
|
||||
|
||||
def payload_to_binary(self):
|
||||
struct_string = "="
|
||||
data = []
|
||||
length = 0
|
||||
for idx, t in enumerate(self.fieldtypes):
|
||||
binType = self.fieldbintypes(t)
|
||||
struct_string += binType[0]
|
||||
array_length = 1
|
||||
if "char[" in t:
|
||||
array_length = len(self.fieldvalues[idx])
|
||||
for c in self.fieldvalues[idx]:
|
||||
data.append(int(c))
|
||||
elif '[' in t:
|
||||
array_length = len(self.fieldvalues[idx])
|
||||
for x in self.fieldvalues[idx]:
|
||||
data.append(x)
|
||||
else:
|
||||
data.append(self.fieldvalues[idx])
|
||||
length += binType[1] * array_length
|
||||
msg = struct.pack(struct_string, *data)
|
||||
return msg
|
||||
|
||||
def binary_to_payload(self, data):
|
||||
msg_offset = 0
|
||||
values = []
|
||||
for idx, t in enumerate(self.fieldtypes):
|
||||
binType = self.fieldbintypes(t)
|
||||
if t[-2:] == "[]":
|
||||
array_length = int(struct.unpack('B', data[msg_offset])[0])
|
||||
msg_offset = msg_offset + 1
|
||||
array_value = []
|
||||
for count in range(0, array_length):
|
||||
array_value.append(struct.unpack(binType[0], "".join(data[msg_offset:msg_offset+binType[1]])))
|
||||
msg_offset = msg_offset + binType[1]
|
||||
values.append(array_value)
|
||||
else:
|
||||
value = struct.unpack(binType[0], "".join(data[msg_offset:msg_offset+binType[1]]))
|
||||
msg_offset = msg_offset + binType[1]
|
||||
values.append(value)
|
||||
self.set_values(values)
|
||||
|
||||
def calculate_checksum(self, msg):
|
||||
ck_a = 0
|
||||
ck_b = 0
|
||||
# start char not included in checksum for pprz protocol
|
||||
for c in msg[1:]:
|
||||
ck_a = (ck_a + ord(c)) % 256
|
||||
ck_b = (ck_b + ck_a) % 256
|
||||
return (ck_a, ck_b)
|
||||
|
||||
def payload_to_pprz_msg(self, sender):
|
||||
stx = STX
|
||||
data = self.payload_to_binary()
|
||||
length = 6 + len(data)
|
||||
msg = struct.pack("=BBBB", STX, length, sender, self._id) + data
|
||||
(ck_a, ck_b) = self.calculate_checksum(msg)
|
||||
msg = msg + struct.pack('=BB', ck_a, ck_b)
|
||||
return msg
|
||||
|
||||
def test():
|
||||
import argparse
|
||||
|
||||
@@ -72,6 +72,17 @@ def get_msgs(msg_class):
|
||||
print("Error: msg_class %s not found." % msg_class)
|
||||
return []
|
||||
|
||||
def get_msg_name(msg_class, msg_id):
|
||||
if not message_dictionary:
|
||||
parse_messages()
|
||||
if msg_class in message_dictionary:
|
||||
if msg_id in message_dictionary_id_name[msg_class]:
|
||||
return message_dictionary_id_name[msg_class][msg_id]
|
||||
else:
|
||||
print("Error: msg_id %d not found in msg_class %s." % (msg_id, msg_class))
|
||||
else:
|
||||
print("Error: msg_class %s not found." % msg_class)
|
||||
return ""
|
||||
|
||||
def get_msg_fields(msg_class, msg_name):
|
||||
if not message_dictionary:
|
||||
|
||||
Reference in New Issue
Block a user