[tools] Added tcp_aircraft_server

re-broadcasts the ivy telemetry stream from a specified vehicle over tcp to
a specified remote ip. Committed on request and behalf of Antoine Drouin
(poine)
This commit is contained in:
Antoine Drouin
2014-06-11 06:37:19 +05:30
committed by Felix Ruess
parent 4b93158da2
commit db72c28685
7 changed files with 4537 additions and 0 deletions
+2800
View File
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,163 @@
#Copyright 2014, Antoine Drouin
"""
Phoenix is a Python library for interacting with Paparazzi
"""
import math
"""
Unit convertions
"""
def rad_of_deg(d): return d/180.*math.pi
def deg_of_rad(r): return r*180./math.pi
def rps_of_rpm(r): return r*2.*math.pi/60.
def rpm_of_rps(r): return r/2./math.pi*60.
def m_of_inch(i): return i*0.0254
"""
Plotting
"""
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
my_title_spec = {'color' : 'k', 'fontsize' : 20 }
def save_if(filename):
if filename: matplotlib.pyplot.savefig(filename, dpi=80)
def prepare_fig(fig=None, window_title=None, figsize=(20.48, 10.24), margins=None):
if fig == None:
fig = plt.figure(figsize=figsize)
# else:
# plt.figure(fig.number)
if margins:
left, bottom, right, top, wspace, hspace = margins
fig.subplots_adjust(left=left, right=right, bottom=bottom, top=top,
hspace=hspace, wspace=wspace)
if window_title:
fig.canvas.set_window_title(window_title)
return fig
def decorate(ax, title=None, xlab=None, ylab=None, legend=None, xlim=None, ylim=None):
ax.xaxis.grid(color='k', linestyle='-', linewidth=0.2)
ax.yaxis.grid(color='k', linestyle='-', linewidth=0.2)
if xlab:
ax.xaxis.set_label_text(xlab)
if ylab:
ax.yaxis.set_label_text(ylab)
if title:
ax.set_title(title, my_title_spec)
if legend <> None:
ax.legend(legend, loc='best')
if xlim <> None:
ax.set_xlim(xlim[0], xlim[1])
if ylim <> None:
ax.set_ylim(ylim[0], ylim[1])
"""
Messages
"""
#: dictionary mapping the C type to its length in bytes (e.g char -> 1)
TYPE_TO_LENGTH_MAP = {
"char" : 1,
"uint8" : 1,
"int8" : 1,
"uint16" : 2,
"int16" : 2,
"uint32" : 4,
"int32" : 4,
"float" : 4,
"double" : 8,
}
#: dictionary mapping the C type to correct format string
TYPE_TO_PRINT_MAP = {
float : "%f",
str : "%s",
chr : "%c",
int : "%d"
}
ACID_ALL = 0xFF
ACID_TEST = 0xFE
ACID_GROUNDSTATION = 0xFD
#: dictionary mapping debug types to format characters
DEBUG_MESSAGES = {
"DEBUG_UINT8" : "%d",
"DEBUG_INT32" : "%d",
"DEBUG_FLOAT" : "%#f"
}
"""
Binary logs
See format description in sw/airborne/subsystems/datalink/fms_link.c
"""
import struct
def hex_of_bin(b): return ' '.join( [ "%02X" % ord( x ) for x in b ] )
import pdb
def read_binary_log(filename, tick_freq = 2*512.):
f = open(filename, "rb")
d = f.read()
packet_header_len = 6
msg_header_len = 2
def read_packet(d, packet_start):
payload_start = packet_start+packet_header_len
timestamp, payload_len = struct.unpack("IH", d[packet_start:payload_start])
msgs = read_packet_payload(d, payload_start, payload_len)
next_packet = payload_start+payload_len+2
return timestamp, msgs, next_packet
def read_packet_payload(d, s, l):
msgs = []
packet_end = s+l; msg_start = s
while msg_start<packet_end:
payload_start = msg_start+msg_header_len
msg_len, msg_id = struct.unpack("BB", d[msg_start:payload_start])
payload_end = payload_start+msg_len
msg_payload = d[payload_start:payload_end]
msgs.append([msg_id, msg_payload])
#print msg_id, msg_len, hex_of_bin(msg_payload)
msg_start = payload_end
return msgs
packets = []
packet_start=0
while packet_start<len(d):
timestamp, msgs, next_packet = read_packet(d, packet_start)
packets.append([timestamp/tick_freq, msgs])
#print timestamp, msgs
packet_start = next_packet
f.close()
return packets
def extract_from_binary_log(protocol, packets, msg_names, t_min=None, t_max=None):
ret = [{'time':[], 'data':[]} for m in msg_names]
if t_min == None: t_min = packets[0][0]
if t_max == None: t_max = packets[-1][0]
for t, msgs in packets:
if t>= t_min and t<= t_max:
for id, payload in msgs:
m = protocol.get_message_by_id('telemetry', id)
try: i = msg_names.index(m.name)
except: pass
finally: ret[i]['time'].append(t); ret[i]['data'].append(m.unpack_scaled_values(payload))
return ret
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,209 @@
#Copyright 2014, Antoine Drouin
import array
STX = 0x99
#6 non payload bytes; STX, LEN, MSGID, ACID, CK_A, CK_B
NUM_NON_PAYLOAD_BYTES = 6
class TransportHeaderFooter:
"""
The header/footer of a message
**Attributes:**
- stx: start byte
- length: length in bytes (total length, i.e. including header/footer
- acid: aircraft ID of sender/destination
- msgid: message ID
- ck_a: checksum high byte
- ck_b: checksum low byte
"""
def __init__(self, stx=STX, length=NUM_NON_PAYLOAD_BYTES, acid=0, msgid=0, ck_a=0, ck_b=0):
self.stx = stx
self.length = length
self.acid = acid
self.msgid = msgid
self.ck_a = ck_a
self.ck_b = ck_b
class Transport:
"""
Class that extracts a wasp payload from a string or sequence of
characters (data is sent in little endian byte order)
Data is expected in the following form ::
|STX|length|AC_ID|MESSAGE_ID|... payload=(length-6) bytes ...|Checksum A|Checksum B|
Payload ::
|... MESSAGE DATA ...|
There are 6 non payload bytes in a packet (described in :mod:`TransportHeaderFooter`
- STX
- length
- AC_ID
- MESSAGE_ID
- Checksum A
- Checksum B
"""
STATE_UNINIT, \
STATE_GOT_STX, \
STATE_GOT_LENGTH, \
STATE_GOT_ACID, \
STATE_GOT_MSGID, \
STATE_GOT_PAYLOAD, \
STATE_GOT_CRC1 = range(0,7)
def __init__(self, check_crc=True, debug=False):
self._check_crc = check_crc
self._debug = debug
self._buf = array.array('c','\0'*256)
self._state = self.STATE_UNINIT
self._total_len = 0
self._payload_len = 0
self._payload_idx = 0
self._ck_a = 0
self._ck_b = 0
self._error = 0
self._acid = 0
self._msgid = 0
def _debug_msg(self, msg):
if self._debug:
print msg
def pack_message_with_values(self, header, message, *values):
return self.pack_one(
header,
message,
message.pack_values(*values))
def pack_one(self, header, message, payload):
payload_len = len(payload)
total_len = payload_len + NUM_NON_PAYLOAD_BYTES
#create an array big enough to hold data before the payload,
#i.e. exclude the checksum
buf = array.array('c','\0'*(NUM_NON_PAYLOAD_BYTES - 2))
buf[0] = chr(header.stx)
buf[1] = chr(total_len)
buf[2] = chr(header.acid)
buf[3] = chr(message.id)
buf.fromstring(payload)
ck_a = total_len
ck_b = total_len
for i in range(2,len(buf)):
ck_a = (ck_a + ord(buf[i])) % 256
ck_b = (ck_b + ck_a) % 256
buf.append(chr(ck_a))
buf.append(chr(ck_b))
return buf
def parse_many(self, string):
"""
Similar to parse_one, but operates on a string, returning
multiple payloads if successful
:returns: A list of payloads strings
"""
payloads = []
for c in string:
ok,h,p = self.parse_one(c)
if ok:
payloads.append((h,p))
return payloads
def parse_one(self, c):
"""
Attempts to parse one character. Returns just the payload, and
not the data in the transport layer, i.e. it does not return
STX, the length, or the checksums
:returns: The payload string, or an empty string if insuficcient data is available
"""
def update_checksum(d):
#wrap to 8bit (simulate 8 bit addition)
self._ck_a = (self._ck_a + d) % 256
self._ck_b = (self._ck_b + self._ck_a) % 256
def add_to_buf(char, uint8):
self._buf[self._payload_idx] = char
self._payload_idx += 1
update_checksum(uint8)
payload = ""
error = False
received = False
#convert to 8bit int
d = ord(c)
if self._state == self.STATE_UNINIT:
if d == STX:
self._state += 1
self._ck_a = STX
self._ck_b = STX
self._debug_msg("-- STX")
elif self._state == self.STATE_GOT_STX:
self._total_len = d
self._payload_len = d - NUM_NON_PAYLOAD_BYTES
self._payload_idx = 0
update_checksum(d)
self._state += 1
self._debug_msg("-- SIZE: PL (%s) TOT (%s)" % (self._payload_len, self._total_len))
elif self._state == self.STATE_GOT_LENGTH:
self._debug_msg("-- ACID: %x" % d)
self._acid = d
update_checksum(d)
self._state += 1
elif self._state == self.STATE_GOT_ACID:
self._debug_msg("-- MSGID: %x" % d)
self._msgid = d
update_checksum(d)
if self._payload_len == 0:
self._state = self.STATE_GOT_PAYLOAD
else:
self._state += 1
elif self._state == self.STATE_GOT_MSGID:
add_to_buf(c, d)
if self._payload_idx == self._payload_len:
self._state += 1
self._debug_msg("-- PL")
elif self._state == self.STATE_GOT_PAYLOAD:
if d != self._ck_a and self._check_crc:
error = True
self._debug_msg("-- CRC_A ERROR %x v %x" % (d, self._ck_a))
else:
self._state += 1
self._debug_msg("-- CRC_A OK")
elif self._state == self.STATE_GOT_CRC1:
if d != self._ck_b and self._check_crc:
error = True
self._debug_msg("-- CRC_B ERROR")
else:
payload = self._buf[:self._payload_len].tostring()
received = True
self._state = self.STATE_UNINIT
self._debug_msg("-- CRC_B OK")
if error:
self._error += 1
self._state = self.STATE_UNINIT
elif received:
header = TransportHeaderFooter(
length=self._total_len,
acid=self._acid,
msgid=self._msgid,
ck_a=self._ck_a,
ck_b=self._ck_b)
return True, header, payload
return False, None, None
@@ -0,0 +1,107 @@
#Copyright 2014, Antoine Drouin
import os, serial, logging
from gi.repository import GLib, Gio, GObject
import phoenix.pprz_transport as transp
LOG = logging.getLogger('serial_link')
LOG.setLevel(logging.ERROR)
#LOG.setLevel(logging.DEBUG)
import pdb
class PhoenixCommunication(GObject.GObject):
DEFAULT_PORT = "/dev/ttyO4"
DEFAULT_SPEED = 57600
__gsignals__ = {
"message-received" : (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE, [
GObject.TYPE_PYOBJECT, #header
GObject.TYPE_PYOBJECT]), #payload
"status-changed" : (GObject.SIGNAL_RUN_LAST, GObject.TYPE_NONE, [])
}
def __init__(self, pprz_protocol):
GObject.GObject.__init__(self)
self._port = PhoenixCommunication.DEFAULT_PORT
self._speed = PhoenixCommunication.DEFAULT_SPEED
self._watch = None
self._is_opened = False;
self._transp = transp.Transport(False, False)
self._pprz_protocol = pprz_protocol
self._serial = None
self._available_ports = []
self.update_available_ports()
self._nb_rx_msgs = 0
self._nb_rx_bytes = 0
self._nb_tx_msgs = 0
self._nb_tx_bytes = 0
def open(self):
"""
Open serial port
"""
LOG.debug("Opening Port: %s @ %d" % (self._port, self._speed))
try:
self._serial = serial.Serial(self._port, self._speed, timeout=0)
self._is_opened = True;
except serial.SerialException:
self._is_opened = False;
self.on_connection_changed()
def send_msg(self, msg_class, msg_name, *fields_values):
"""
send a message
"""
m = self._pprz_protocol.get_message_by_name(msg_class, msg_name)
h = transp.TransportHeaderFooter(acid=0)
bin_msg = self._transp.pack_message_with_values(h, m, *fields_values)
self._serial.write(bin_msg)
self._nb_tx_msgs += 1
self._nb_tx_bytes += len(bin_msg)
def on_connection_changed(self):
if self._watch:
GObject.source_remove(self._watch)
if self._is_opened:
self.watch = GObject.io_add_watch(
self._serial.fileno(),
GObject.IO_IN | GObject.IO_PRI,
self.on_serial_data_available,
priority=GObject.PRIORITY_HIGH)
self.update_available_ports()
self.emit("status-changed")
def get_status(self):
return self._is_opened, self._port
def get_stats(self):
return self._nb_rx_msgs, self._nb_rx_bytes, self._nb_tx_msgs, self._nb_tx_bytes
def on_serial_data_available(self, fd, condition):
try:
data = self._serial.read(4096)
LOG.debug("read serial data : %d" % (len(data)))
self._nb_rx_bytes += len(data)
# print data.encode("hex")
msgs = self._transp.parse_many(data)
LOG.debug("parsed msg : %d" % (len(msgs)))
# print msgs
for header, payload in msgs:
self.emit("message-received", header, payload)
self._nb_rx_msgs += 1
return True
except serial.SerialException:
self._is_opened = False;
self.on_connection_changed()
return False
def update_available_ports(self):
# self.available_ports = filter(lambda x: x.startswith("ttyUSB") or x.startswith("ttyS") , os.listdir("/dev"))
self.available_ports = filter(lambda x:x.startswith("ttyUSB") , os.listdir("/dev"))
self.available_ports.sort()
def get_available_ports(self):
return self.available_ports
File diff suppressed because it is too large Load Diff
+71
View File
@@ -0,0 +1,71 @@
#!/usr/bin/env python
#-*- coding: utf-8 -*-
#Copyright 2014, Antoine Drouin
import logging, os, base64, socket
from gi.repository import GLib, GObject
import ivy.ivy as ivy
ivylogger = logging.getLogger('Ivy')
ivylogger.setLevel(logging.CRITICAL)
import phoenix.messages
import phoenix.pprz_transport
default_ivybus = '127.255.255.255:2010'
class Server(ivy.IvyServer):
def __init__(self, bus, tcp_port=4242):
ivy.IvyServer.__init__(self, 'TCP_aircraft_server', usesDaemons=True)
self.nb_msgs = 0
self.nb_bytes = 0
cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.bind(('', tcp_port))
cs.listen(1)
GObject.io_add_watch(cs, GObject.IO_IN, self.handle_conn)
print "server listening on {:d}".format(tcp_port)
self.transp = phoenix.pprz_transport.Transport(check_crc=False, debug=False)
self.protocol = phoenix.messages.Protocol(path=os.path.abspath("../../../conf/messages_ng.xml"), debug=True)
self.start(bus)
GObject.timeout_add(500, self.periodic, priority=GObject.PRIORITY_HIGH)
def handle_conn(self, sock, cond):
conn, addr = sock.accept()
print "Connection from {}".format(addr)
GObject.io_add_watch(conn, GObject.IO_IN, self.handle_data)
return True
def handle_data(self, sock, cond):
buf = sock.recv(4096)
if not len(buf):
print "Connection closed."
return False
else:
#print phoenix.hex_of_bin(buf)
msgs = self.transp.parse_many(buf)
for hdr, payload in msgs:
msg = self.protocol.get_message_by_id("telemetry", hdr.msgid)
try:
ivy_str = '{} {} {}'.format(hdr.acid, msg.name, ' '.join([str(v) for v in msg.unpack_values(payload)]))
#print ivy_str
self.send_msg(ivy_str)
self.nb_msgs += 1
self.nb_bytes += len(payload)
except:
print 'FAILED', msg.name
return True
def periodic(self):
print 'msgs {} ({} bytes)'.format(self.nb_msgs, self.nb_bytes)
return True
if __name__ == '__main__':
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
server = Server(default_ivybus)
GLib.MainLoop().run()