Add Python ground station apps (messages, plotter, udp link, dashboard)

This commit is contained in:
Allen Ibara
2009-04-15 22:07:16 +00:00
parent 294bd197cd
commit 0c3c09a1e8
16 changed files with 1809 additions and 0 deletions
+20
View File
@@ -0,0 +1,20 @@
#!/usr/bin/env python
import wx
import getopt
import sys
import dashboardframe
class DashboardApp(wx.App):
def OnInit(self):
self.main = dashboardframe.DashboardFrame()
self.main.Show()
self.SetTopWindow(self.main)
return True
def main():
application = DashboardApp(0)
application.MainLoop()
if __name__ == '__main__':
main()
@@ -0,0 +1,84 @@
import wx
import sys
import os
import time
import threading
import math
import pynotify
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
import messages_tool
WIDTH = 120
HEIGHT = 120
UPDATE_INTERVAL = 250
WARN_INTERVAL = 3000
BATTERY_WARN = 6.5
RADS_SEC_TO_RPM = 60 / (2 * math.pi)
class DashboardFrame(wx.Frame):
def message_recv(self, ac_id, name, values):
if name == "BAT":
self.batteryVolts = float(values[1]) / 10
elif name == "ACTUATOR_WHIRLY":
self.throttle = (float(values[2]) - 1000) / 10.0
self.uptime = float(values[0])
elif name == "XSENS_DATA":
self.rpmYaw = (float(values[9]) + float(values[19])) / 2
self.rpmYaw = -self.rpmYaw * RADS_SEC_TO_RPM
def gui_update(self):
self.batteryText.SetLabel("%0.1f V" % self.batteryVolts)
self.rpmTextYaw.SetLabel("%0.1f RPM" % self.rpmYaw)
self.throttleText.SetLabel("%0.1f %%" % self.throttle)
self.uptimeText.SetLabel("%02i:%02i:%02i" % (self.uptime / (60 * 60), (self.uptime / 60) % 60, self.uptime % 60))
self.update_timer.Restart(UPDATE_INTERVAL)
def battery_notify(self):
if (self.batteryVolts < BATTERY_WARN and self.batteryVolts > 0):
self.notification = pynotify.Notification("Battery Warning",
"Paparazzi battery is at %s." % self.batteryText.GetLabel(),
"dialog-warning")
self.notification.show()
self.warn_timer.Restart(WARN_INTERVAL)
def setFont(self, control):
font = control.GetFont()
size = font.GetPointSize()
font.SetPointSize(size * 1.4)
control.SetFont(font)
def __init__(self):
wx.Frame.__init__(self, id=-1, parent=None, name=u'DashboardFrame',
size=wx.Size(WIDTH, HEIGHT), title=u'Dashboard')
self.aircrafts = {}
self.batteryText = wx.StaticText(self, -1, "V")
self.rpmTextYaw = wx.StaticText(self, -1, "RPM")
self.throttleText = wx.StaticText(self, -1, "%")
self.uptimeText = wx.StaticText(self, -1, "::")
self.setFont(self.batteryText)
self.setFont(self.rpmTextYaw)
self.setFont(self.throttleText)
self.setFont(self.uptimeText)
self.batteryVolts = -1
self.rpmYaw = -1
self.throttle = -1
self.uptime = 0
pynotify.init("Paparazzi Dashboard")
sizer = wx.BoxSizer(wx.VERTICAL)
sizer.Add(self.batteryText, 1, wx.EXPAND)
sizer.Add(self.rpmTextYaw, 1, wx.EXPAND)
sizer.Add(self.throttleText, 1, wx.EXPAND)
sizer.Add(self.uptimeText, 1, wx.EXPAND)
self.SetSizer(sizer)
sizer.Layout()
self.interface = messages_tool.IvyMessagesInterface(self.message_recv)
self.update_timer = wx.CallLater(UPDATE_INTERVAL, self.gui_update)
self.warn_timer = wx.CallLater(WARN_INTERVAL, self.battery_notify)
+26
View File
@@ -0,0 +1,26 @@
#!/usr/bin/env python
import wx
import getopt
import sys
import messagesframe
class MessagesApp(wx.App):
def OnInit(self):
self.main = messagesframe.MessagesFrame()
self.main.Show()
self.SetTopWindow(self.main)
#opts, args = getopt.getopt(sys.argv[1:], "p:",
#["plot"])
#for o,a in opts:
#if o in ("-p", "--plot"):
#[ac_id, message, field, color, use_x] = a.split(':')
#self.main.AddPlot(int(ac_id), message, field, color, bool(int(use_x)))
return True
def main():
application = MessagesApp(0)
application.MainLoop()
if __name__ == '__main__':
main()
@@ -0,0 +1,137 @@
import wx
import sys
import os
import time
import threading
import math
PPRZ_HOME = os.getenv("PAPARAZZI_HOME")
sys.path.append(PPRZ_HOME + "/sw/lib/python")
import messages_tool
WIDTH = 450
LABEL_WIDTH = 166
DATA_WIDTH = 100
HEIGHT = 800
BORDER = 1
class MessagesFrame(wx.Frame):
def message_recv(self, ac_id, name, values):
if self.aircrafts.has_key(ac_id):
if self.aircrafts[ac_id].messages.has_key(name):
if time.time() - self.aircrafts[ac_id].messages[name].last_seen < 0.2:
return
wx.CallAfter(self.gui_update, ac_id, name, values)
def find_page(self, book, name):
if book.GetPageCount() < 1:
return 0
start = 0
end = book.GetPageCount()
while (start < end):
if book.GetPageText(start) > name:
return start
start = start + 1
return start
def update_leds(self):
wx.CallAfter(self.update_leds_real)
def update_leds_real(self):
for ac_id in self.aircrafts:
aircraft = self.aircrafts[ac_id]
for msg_str in aircraft.messages:
message = aircraft.messages[msg_str]
if message.last_seen + 0.2 < time.time():
aircraft.messages_book.SetPageImage(message.index, 0)
self.timer = threading.Timer(0.1, self.update_leds)
self.timer.start()
def setup_image_list(self, notebook):
imageList = wx.ImageList(24,24)
image = wx.Image(PPRZ_HOME + "/data/pictures/gray_led24.png")
bitmap = wx.BitmapFromImage(image)
imageList.Add(bitmap)
image = wx.Image(PPRZ_HOME + "/data/pictures/green_led24.png")
bitmap = wx.BitmapFromImage(image)
imageList.Add(bitmap)
notebook.AssignImageList(imageList)
def add_new_aircraft(self, ac_id):
self.aircrafts[ac_id] = messages_tool.Aircraft(ac_id)
ac_panel = wx.Panel(self.notebook, -1)
self.notebook.AddPage(ac_panel, str(ac_id))
messages_book = wx.Notebook(ac_panel, style=wx.NB_LEFT)
self.setup_image_list(messages_book)
sizer = wx.BoxSizer(wx.VERTICAL)
sizer.Add(messages_book, 1, wx.EXPAND)
ac_panel.SetSizer(sizer)
sizer.Layout()
self.aircrafts[ac_id].messages_book = messages_book
def add_new_message(self, aircraft, name):
messages_book = aircraft.messages_book
aircraft.messages[name] = messages_tool.Message(name)
field_panel = wx.Panel(messages_book)
grid_sizer = wx.FlexGridSizer(len(aircraft.messages[name].field_names), 2)
index = self.find_page(messages_book, name)
messages_book.InsertPage(index, field_panel, name, imageId = 1)
aircraft.messages[name].index = index
# update indexes of pages which are to be moved
for message_name in aircraft.messages:
aircraft.messages[message_name].index = self.find_page(messages_book, message_name)
for field_name in aircraft.messages[name].field_names:
name_text = wx.StaticText(field_panel, -1, field_name)
size = name_text.GetSize()
size.x = LABEL_WIDTH
name_text.SetMinSize(size)
grid_sizer.Add(name_text, 1, wx.ALL, BORDER)
value_control = wx.StaticText(field_panel, -1, "42", style=wx.ST_NO_AUTORESIZE)
size = value_control.GetSize()
size.x = LABEL_WIDTH
value_control.SetMinSize(size)
grid_sizer.Add(value_control, 1, wx.ALL, BORDER)
aircraft.messages[name].field_controls.append(value_control)
field_panel.SetAutoLayout(True)
field_panel.SetSizer(grid_sizer)
field_panel.Layout()
def gui_update(self, ac_id, name, values):
if not self.aircrafts.has_key(ac_id):
self.add_new_aircraft(ac_id)
aircraft = self.aircrafts[ac_id]
if not aircraft.messages.has_key(name):
self.add_new_message(aircraft, name)
aircraft.messages_book.SetPageImage(aircraft.messages[name].index, 1)
self.aircrafts[ac_id].messages[name].last_seen = time.time()
for index in range(0, len(values)):
aircraft.messages[name].field_controls[index].SetLabel(values[index])
def __init__(self):
wx.Frame.__init__(self, id=-1, parent=None, name=u'MessagesFrame', size=wx.Size(WIDTH, HEIGHT), style=wx.DEFAULT_FRAME_STYLE, title=u'Messages')
self.notebook = wx.Notebook(self)
self.aircrafts = {}
sizer = wx.BoxSizer(wx.HORIZONTAL)
sizer.Add(self.notebook, 1, wx.EXPAND)
self.SetSizer(sizer)
sizer.Layout()
self.timer = threading.Timer(0.1, self.update_leds)
self.timer.start()
self.interface = messages_tool.IvyMessagesInterface(self.message_recv)
+106
View File
@@ -0,0 +1,106 @@
#!/usr/bin/env python
import wx
import getopt
import sys
import os
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
import messages_tool
class MessagePicker(wx.Frame):
def __init__(self, parent, callback, initIvy = True):
wx.Frame.__init__(self, parent, name="MessagePicker", title=u'Message Picker', size=wx.Size(320,640))
self.aircrafts = {}
self.callback = callback
self.tree = wx.TreeCtrl(self)
self.root = self.tree.AddRoot("Telemetry")
self.tree.Bind(wx.EVT_LEFT_DCLICK, self.OnDoubleClick)
self.tree.Bind(wx.EVT_CHAR, self.OnKeyChar)
self.Bind( wx.EVT_CLOSE, self.OnClose)
self.message_interface = messages_tool.IvyMessagesInterface(self.msg_recv, initIvy)
def OnClose(self, event):
self.message_interface.Stop()
self.Destroy()
def msg_recv(self, ac_id, name, values):
self.tree.Expand(self.root)
if not self.aircrafts.has_key(ac_id):
ac_node = self.tree.AppendItem(self.root, str(ac_id))
self.aircrafts[ac_id] = messages_tool.Aircraft(ac_id)
self.aircrafts[ac_id].messages_book = ac_node
aircraft = self.aircrafts[ac_id]
ac_node = aircraft.messages_book
if not aircraft.messages.has_key(name):
msg_node = self.tree.AppendItem(ac_node, str(name))
self.tree.SortChildren(ac_node)
aircraft.messages[name] = messages_tool.Message(name)
for field in aircraft.messages[name].field_names:
item = self.tree.AppendItem(msg_node, field)
def OnKeyChar(self, event):
if event.GetKeyCode() != 13:
return False
node = self.tree.GetSelection()
field_name = self.tree.GetItemText(node)
parent = self.tree.GetItemParent(node)
message_name = self.tree.GetItemText(parent)
grandparent = self.tree.GetItemParent(parent)
ac_id = self.tree.GetItemText(grandparent)
if node == self.root or parent == self.root or grandparent == self.root:
# if not leaf, double click = expand
if self.tree.IsExpanded(node):
self.tree.Collapse(node)
else:
self.tree.Expand(node)
return
self.callback(int(ac_id), message_name, field_name)
def OnDoubleClick(self, event):
node = self.tree.GetSelection()
field_name = self.tree.GetItemText(node)
parent = self.tree.GetItemParent(node)
message_name = self.tree.GetItemText(parent)
grandparent = self.tree.GetItemParent(parent)
ac_id = self.tree.GetItemText(grandparent)
if node == self.root or parent == self.root or grandparent == self.root:
# if not leaf, double click = expand
if self.tree.IsExpanded(node):
self.tree.Collapse(node)
else:
self.tree.Expand(node)
return
self.callback(int(ac_id), message_name, field_name)
class TestApp(wx.App):
def OnInit(self):
self.main = MessagePicker(None, callback)
self.main.Show()
self.SetTopWindow(self.main)
return True
def test():
application = TestApp(0)
application.MainLoop()
def callback(ac_id, message, field):
print ac_id, message, field
if __name__ == '__main__':
test()
@@ -0,0 +1,253 @@
#Boa:Frame:PlotFrame
import wx
import plotpanel
_INITIAL_TIME_VALUE_ = 0.2 # initial refresh rate in seconds
def create(parent):
return PlotFrame(parent)
[wxID_PLOTFRAME, wxID_PLOTFRAMECHECKAUTOSCALE, wxID_PLOTFRAMEEDITMAX, wxID_PLOTFRAMEEDITMIN, wxID_PLOTFRAMEEDITTIME, wxID_PLOTFRAMEPANEL1, wxID_PLOTFRAMESLIDERTIME, wxID_PLOTFRAMESTATICTEXT1, wxID_PLOTFRAMESTATICTEXT2, wxID_PLOTFRAMESTATICTEXT3] = [wx.NewId() for _init_ctrls in range(10)]
[wxID_PLOTFRAMEMENU1ITEM_ADD, wxID_PLOTFRAMEMENU1ITEM_PAUSE, wxID_PLOTFRAMEMENU1ITEM_RESET] = [wx.NewId() for _init_coll_menuPlot_Items in range(3)]
class PlotFrame(wx.Frame):
def _init_coll_boxSizer1_Items(self, parent):
# generated method, don't edit
parent.AddSizer(self.boxSizer2, 0, border=0, flag=0)
parent.AddWindow(self.panel1, 1, border=0, flag=wx.EXPAND)
def _init_coll_boxSizer2_Items(self, parent):
# generated method, don't edit
parent.AddWindow(self.checkAutoScale, 0, border=0, flag=wx.ALIGN_CENTER_VERTICAL)
parent.AddWindow(self.staticText1, 0, border=0, flag=wx.ALIGN_CENTER_VERTICAL)
parent.AddWindow(self.editMin, 0, border=0, flag=0)
parent.AddWindow(self.staticText2, 0, border=0, flag=wx.ALIGN_CENTER_VERTICAL)
parent.AddWindow(self.editMax, 0, border=0, flag=0)
parent.AddWindow(self.staticText3, 0, border=0, flag=wx.ALIGN_CENTER_VERTICAL)
parent.AddWindow(self.sliderTime, 0, border=0, flag=wx.ALIGN_CENTER_VERTICAL)
parent.AddWindow(self.editTime, 0, border=0, flag=0)
def _init_coll_menuBar1_Menus(self, parent):
# generated method, don't edit
parent.Append(menu=self.menuPlot, title=u'Plot')
parent.Append(menu=self.menuCurves, title=u'Curves')
def _init_coll_menuPlot_Items(self, parent):
# generated method, don't edit
parent.Append(help=u'Add plots', id=wxID_PLOTFRAMEMENU1ITEM_ADD, kind=wx.ITEM_NORMAL, text=u'&Add\tCtrl+A')
parent.Append(help=u'Reset plot scale', id=wxID_PLOTFRAMEMENU1ITEM_RESET, kind=wx.ITEM_NORMAL, text=u'&Reset\tCtrl+L')
parent.Append(help=u'Pause the plot', id=wxID_PLOTFRAMEMENU1ITEM_PAUSE, kind=wx.ITEM_CHECK, text=u'&Pause\tCtrl+P')
self.Bind(wx.EVT_MENU, self.OnMenu1Item_addMenu, id=wxID_PLOTFRAMEMENU1ITEM_ADD)
self.Bind(wx.EVT_MENU, self.OnMenu1Item_resetMenu, id=wxID_PLOTFRAMEMENU1ITEM_RESET)
self.Bind(wx.EVT_MENU, self.OnMenu1Item_pauseMenu, id=wxID_PLOTFRAMEMENU1ITEM_PAUSE)
def _init_sizers(self):
# generated method, don't edit
self.boxSizer1 = wx.BoxSizer(orient=wx.VERTICAL)
self.boxSizer2 = wx.BoxSizer(orient=wx.HORIZONTAL)
self._init_coll_boxSizer1_Items(self.boxSizer1)
self._init_coll_boxSizer2_Items(self.boxSizer2)
self.SetSizer(self.boxSizer1)
def _init_utils(self):
# generated method, don't edit
self.menuPlot = wx.Menu(title='')
self.menuCurves = wx.Menu(title='')
self.menuBar1 = wx.MenuBar()
self._init_coll_menuPlot_Items(self.menuPlot)
self._init_coll_menuBar1_Menus(self.menuBar1)
def _init_ctrls(self, prnt):
# generated method, don't edit
wx.Frame.__init__(self, id=wxID_PLOTFRAME, name=u'PlotFrame', parent=prnt, pos=wx.Point(476, 365), size=wx.Size(800, 225), style=wx.DEFAULT_FRAME_STYLE, title=u'Real Time Plot')
self._init_utils()
self.SetMenuBar(self.menuBar1)
self.SetClientSize(wx.Size(800, 225))
self.checkAutoScale = wx.CheckBox(id=wxID_PLOTFRAMECHECKAUTOSCALE, label=u'Auto scale', name=u'checkAutoScale', parent=self, pos=wx.Point(0, 2), size=wx.Size(93, 22), style=0)
self.checkAutoScale.SetValue(True)
self.checkAutoScale.Bind(wx.EVT_CHECKBOX, self.OnCheckAutoScaleCheckbox, id=wxID_PLOTFRAMECHECKAUTOSCALE)
self.staticText1 = wx.StaticText(id=wxID_PLOTFRAMESTATICTEXT1, label=u'min', name='staticText1', parent=self, pos=wx.Point(93, 5), size=wx.Size(68, 17), style=wx.ALIGN_RIGHT)
self.editMin = wx.TextCtrl(id=wxID_PLOTFRAMEEDITMIN, name=u'editMin', parent=self, pos=wx.Point(161, 0), size=wx.Size(80, 27), style=0, value=u'')
self.editMin.Enable(False)
self.editMin.Bind(wx.EVT_TEXT, self.OnEditMinText, id=wxID_PLOTFRAMEEDITMIN)
self.staticText2 = wx.StaticText(id=wxID_PLOTFRAMESTATICTEXT2, label=u'max', name='staticText2', parent=self, pos=wx.Point(241, 5), size=wx.Size(68, 17), style=wx.ALIGN_RIGHT)
self.editMax = wx.TextCtrl(id=wxID_PLOTFRAMEEDITMAX, name=u'editMax', parent=self, pos=wx.Point(309, 0), size=wx.Size(80, 27), style=0, value=u'')
self.editMax.Enable(False)
self.editMax.Bind(wx.EVT_TEXT, self.OnEditMaxText, id=wxID_PLOTFRAMEEDITMAX)
self.staticText3 = wx.StaticText(id=wxID_PLOTFRAMESTATICTEXT3, label=u'interval', name='staticText3', parent=self, pos=wx.Point(389, 5), size=wx.Size(68, 17), style=wx.ALIGN_RIGHT)
self.sliderTime = wx.Slider(id=wxID_PLOTFRAMESLIDERTIME, maxValue=1000, minValue=1, name=u'sliderTime', parent=self, pos=wx.Point(457, 4), size=wx.Size(200, 19), style=wx.SL_HORIZONTAL, value=_INITIAL_TIME_VALUE_ * 1000)
self.sliderTime.SetLabel(u'')
self.sliderTime.Bind(wx.EVT_COMMAND_SCROLL, self.OnSliderTimeCommandScroll, id=wxID_PLOTFRAMESLIDERTIME)
self.editTime = wx.TextCtrl(id=wxID_PLOTFRAMEEDITTIME, name=u'editTime', parent=self, pos=wx.Point(657, 0), size=wx.Size(80, 27), style=wx.TE_PROCESS_ENTER, value="%0.2f" % _INITIAL_TIME_VALUE_)
self.editTime.Bind(wx.EVT_TEXT_ENTER, self.OnEditTimeTextEnter, id=wxID_PLOTFRAMEEDITTIME)
self.panel1 = wx.Panel(id=wxID_PLOTFRAMEPANEL1, name='panel1', parent=self, pos=wx.Point(0, 27), size=wx.Size(800, 200), style=wx.TAB_TRAVERSAL)
self._init_sizers()
def __init__(self, parent):
self._init_ctrls(parent)
self.canvas = plotpanel.create(self.panel1, self)
self.dynamic_menus = {}
self.Bind( wx.EVT_CLOSE, self.OnClose)
self.Bind( wx.EVT_ERASE_BACKGROUND, self.OnErase)
self.panel1.Bind( wx.EVT_RIGHT_DOWN, self.OnRightDown)
self.panel1.Bind( wx.EVT_SIZE, self.OnSize)
def OnRightDown(self, event):
self.PopupMenu(self.menuPlot, event.GetPosition())
def AddPlot(self, ac_id, message, field, color = None, x_axis = False):
self.canvas.BindCurve(ac_id, message, field, color, x_axis)
def SetMinMax(self, min_, max_):
self.editMin.SetValue(str(min_))
self.editMax.SetValue(str(max_))
def OnClose(self, event):
# need to forward close to canvas so that ivy is shut down, otherwise ivy hangs the shutdown
self.canvas.OnClose()
self.Destroy()
def OnErase(self, event):
pass
def OnSize(self, event):
self.canvas.OnSize( event.GetSize())
def OnSliderTimeCommandScroll(self, event):
value = event.GetPosition()
self.canvas.SetPlotInterval(value)
self.editTime.SetValue( '%.3f' % (value/1000.0))
def OnEditTimeTextEnter(self, event):
try:
value = int(float(event.GetString()) * 1000.0)
except:
value = 0
if value < 1 or value > 1000:
value = '%.3f' % (self.sliderTime.GetValue() / 1000.0)
self.editTime.SetValue( value)
return
self.canvas.SetPlotInterval(value)
self.sliderTime.SetValue(value)
def OnCheckAutoScaleCheckbox(self, event):
value = self.checkAutoScale.GetValue()
self.editMin.Enable( not value)
self.editMax.Enable( not value)
self.canvas.SetAutoScale(value)
def OnMenu1Item_addMenu(self, event):
self.canvas.ShowMessagePicker(self)
def OnMenu1Item_resetMenu(self, event):
self.canvas.ResetScale()
def OnMenu1Item_pauseMenu(self, event):
self.canvas.Pause(event.IsChecked())
def AddCurve(self, menu_id, title, use_as_x = False):
curveMenu = wx.Menu(title='')
curveMenu.Append(help=u'Delete plot', id=menu_id*10, kind=wx.ITEM_NORMAL, text=u'&Delete')
curveMenu.Append(help=u'Offset plot', id=menu_id*10+1, kind=wx.ITEM_NORMAL, text=u'&Offset')
curveMenu.Append(help=u'Scale plot', id=menu_id*10+2, kind=wx.ITEM_NORMAL, text=u'&Scale')
curveMenu.Append(help=u'Plot data as messages are received rather than async', id=menu_id*10+3, kind=wx.ITEM_CHECK, text=u'&Real time plot')
curveMenu.Append(help=u'Use this curve as the X-axis rather than a time based scale', id=menu_id*10+4, kind=wx.ITEM_CHECK, text=u'&Use as X-axis')
curveMenu.Check(id=menu_id*10+4, check=bool(use_as_x))
self.Bind(wx.EVT_MENU, self.OnMenuDeleteCurve, id=menu_id*10)
self.Bind(wx.EVT_MENU, self.OnMenuOffsetCurve, id=menu_id*10+1)
self.Bind(wx.EVT_MENU, self.OnMenuScaleCurve, id=menu_id*10+2)
self.Bind(wx.EVT_MENU, self.OnMenuRealTime, id=menu_id*10+3)
self.Bind(wx.EVT_MENU, self.OnMenuUseAsXAxis, id=menu_id*10+4)
self.dynamic_menus[menu_id] = self.menuCurves.AppendSubMenu(submenu=curveMenu, text=title)
def OnMenuDeleteCurve(self, event):
menu_id = event.GetId() / 10
item = self.dynamic_menus[ menu_id]
self.canvas.RemovePlot( menu_id)
self.menuCurves.DestroyItem(item)
del self.dynamic_menus[menu_id]
def OnMenuOffsetCurve(self, event):
menu_id = (event.GetId()-1) / 10
default_value = str(self.canvas.FindPlot(menu_id).offset)
value = wx.GetTextFromUser("Enter a value to offset the plot", "Offset", default_value)
try:
value = float(value)
self.canvas.OffsetPlot( menu_id, value)
except:
pass
def OnMenuScaleCurve(self, event):
menu_id = (event.GetId()-2) / 10
default_value = str(self.canvas.FindPlot(menu_id).scale)
value = wx.GetTextFromUser("Enter a factor to scale the plot", "Scale", default_value)
try:
value = float(value)
self.canvas.ScalePlot( menu_id, value)
except:
pass
def OnMenuRealTime(self,event):
menu_id = (event.GetId()-3) / 10
self.canvas.SetRealTime( menu_id, event.IsChecked())
def OnMenuUseAsXAxis(self,event):
event_id = event.GetId()
menu_id = (event_id-4) / 10
value = event.IsChecked()
if value:
# go through and clear the checks from any other curves
for i in self.dynamic_menus:
for item in self.dynamic_menus[i].GetSubMenu().GetMenuItems():
if item.GetText() == u'_Use as X-axis' and event_id != item.GetId():
item.Check(False)
self.canvas.SetXAxis(menu_id)
else:
self.canvas.ClearXAxis()
def OnEditMinText(self, event):
try:
value = float(event.GetString())
self.canvas.SetMin(value)
except:
pass
def OnEditMaxText(self, event):
try:
value = float(event.GetString())
self.canvas.SetMax(value)
except:
pass
@@ -0,0 +1,438 @@
import wx
from ivy.std_api import *
import logging
from textdroptarget import *
import math
import random
import sys
import os
import messagepicker
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
import messages_xml_map
class plot_data:
def __init__(self, ivy_msg_id, title, width, color = None):
self.id = ivy_msg_id
self.title = title
self.SetPlotSize(width)
self.x_min = 1e32
self.x_max = 1e-32
self.avg = 0.0
self.std_dev = 0.0
self.real_time = False
self.scale = 1.0
self.offset = 0.0
if (color != None):
self.color = color
else:
r,g,b = random.randint(0,255),random.randint(0,255),random.randint(0,255)
self.color = wx.Color(r,g,b)
def SetRealTime(self, value):
self.real_time = value
def SetOffset(self, value):
self.offset = value
def SetScale(self, value):
self.scale = value
def SetPlotSize(self, size):
self.size = size
self.index = size-1 # holds the index of the next point to add and the first point to draw
self.data = [] # holds the list of points to plot
for i in range(size):
self.data.append(None)
self.avg = 0.0
self.std_dev = 0.0
def AddPoint(self, point, x_axis):
self.data[self.index] = point
if self.real_time or (x_axis != None):
self.index = (self.index + 1) % self.size # increment index to next point
self.data[self.index] = None
def DrawTitle(self, dc, margin, width, height):
text ='avg:%.2f std:%.2f %s' % (self.avg, self.std_dev, self.title)
(w,h) = dc.GetTextExtent(text)
dc.SetBrush(wx.Brush(self.color))
dc.DrawRectangle( width-h-margin, height, h, h)
dc.DrawText(text, width-2*margin-w-h, height)
return h
def DrawCurve(self, dc, width, height, margin, _max_, _min_, x_axis):
if width != self.size:
self.SetPlotSize(width)
return
if (not self.real_time) and (x_axis == None):
self.index = (self.index + 1) % self.size # increment index to next point
self.data[self.index] = None
if x_axis != None:
(x_min, x_max) = x_axis.GetXMinMax()
dc.SetPen(wx.Pen(self.color,1))
if _max_ < _min_: (_min_, _max_) = (-1,1) #prevent divide by zero or inversion
if _max_ == _min_: (_min_, _max_) = (_max_-0.5, _max_+0.5)
delta = _max_-_min_
dy = (height - margin*2) / delta
n = 0
sums = 0.0
sum_squares = 0.0
lines = []
point_1 = None
for i in range(self.size):
ix = (i+self.index) % self.size
point = self.data[ix]
if point == None: continue
n += 1
sums = sums + point
sum_squares = sum_squares + (point*point)
if x_axis != None:
x = x_axis.data[ix]
if x == None: continue
dx = (width-1) / (x_max-x_min)
x = int((x-x_min) * dx)
else:
x = i * width / self.size
scaled_point = (point + self.offset) * self.scale
y = height - margin - int((scaled_point - _min_)*dy)
if point_1 != None:
line = (point_1[0], point_1[1], x, y)
lines.append( line)
point_1 = (x,y)
dc.DrawLineList(lines)
if n > 0:
self.avg = sums / n
self.std_dev = math.sqrt(math.fabs((sum_squares / n) - (self.avg * self.avg)))
def GetXMinMax(self):
x_min = 1e32
x_max = -1e32
for i in range(self.size):
point = self.data[i]
if point == None: continue
x_min = min( x_min, point)
x_max = max( x_max, point)
if x_max < x_min: (x_min, x_max) = (-1,1) #prevent divide by zero or inversion
if x_max == x_min: (x_min, x_max) = (x_max-0.5, x_max+0.5)
self.x_max = x_max
self.x_min = x_min
return (x_min, x_max)
_IVY_APPNAME='JobyPlot'
_IVY_STRING = '(%s %s .*$)'
#_IVY_STRING = '^([^ ]*) +(%s( .*|$))' ## <-- from original ocaml (doesn't work here, just returns Sender field...)
def create(parent, frame):
return PlotPanel(parent, frame)
class PlotPanel():
def __init__(self, parent, frame):
self.parent = parent # we are drawing on our parent, so dc comes from this
self.frame = frame # the frame owns any controls we might need to update
parent.SetDropTarget( TextDropTarget(self)) # calls self.OnDropText when drag and drop complete
self.InitIvy()
self.width = 800
self.height = 200
self.margin = min(self.height / 10, 20)
self.font = wx.Font(self.margin/2, wx.DEFAULT, wx.FONTFLAG_DEFAULT, wx.FONTWEIGHT_NORMAL)
self.pixmap = wx.EmptyBitmap(self.width, self.height)
self.plot_size = self.width
self.max = -1e32
self.min = 1e32
self.plot_interval = 200
self.plots = {}
self.auto_scale = True
self.offset = 0.0
self.scale = 1.0
self.x_axis = None
messages_xml_map.ParseMessages()
# start the timer
self.timer = wx.FutureCall( self.plot_interval, self.OnTimer)
def SetPlotInterval(self, value):
self.plot_interval = value
self.timer.Restart(self.plot_interval)
self.timer = wx.FutureCall( self.plot_interval, self.OnTimer)
def SetAutoScale(self, value):
self.auto_scale = value
def SetMin(self, value):
self.min = value
def SetMax(self, value):
self.max = value
def Pause(self, pause):
if pause:
self.timer.Stop()
else:
self.timer = wx.FutureCall( self.plot_interval, self.OnTimer)
def ResetScale(self):
self.max = -1e32
self.min = 1e32
def OnClose(self):
self.timer.Stop()
IvyStop()
def OnErase(self, event):
pass
def ShowMessagePicker(self, parent):
frame = messagepicker.MessagePicker(parent, self.BindCurve, False)
frame.Show()
def InitIvy(self):
# initialising the bus
IvyInit(_IVY_APPNAME, # application name for Ivy
"",#"[%s is ready]" % IVYAPPNAME, # ready message
0, # main loop is local (ie. using IvyMainloop)
lambda x,y: y, # handler called on connection/deconnection
lambda x,y: y # handler called when a diemessage is received
)
# starting the bus
# Note: env variable IVYBUS will be used if no parameter or empty string
# is given ; this is performed by IvyStart (C)
try:
logging.getLogger('Ivy').setLevel(logging.WARN)
IvyStart("")
# binding to every message
#IvyBindMsg(self.OnIvyMsg, "(.*)")
except:
IvyStop()
def OnDropText(self, data):
[ac_id, category, message, field] = data.encode('ASCII').split(':')
self.BindCurve(int(ac_id), message, field)
def OnIvyMsg(self, agent, *larg):
#print larg[0]
data = larg[0].split(' ')
ac_id = int(data[0])
message = data[1]
if not self.plots.has_key(ac_id):
return
if not self.plots[ac_id].has_key(message):
return
for field in self.plots[ac_id][message]:
plot = self.plots[ac_id][message][field]
ix = messages_xml_map.message_dictionary[message].index(field)
point = float(data[ix+2])
if self.x_axis == None or self.x_axis.id != plot.id:
if self.auto_scale:
scaled_point = (point + plot.offset) * plot.scale
self.max = max( self.max, scaled_point)
self.min = min( self.min, scaled_point)
if self.x_axis != None:
plot.index = self.x_axis.index
plot.AddPoint(point, self.x_axis)
def BindCurve(self, ac_id, message, field, color = None, use_as_x = False):
# -- add this telemetry to our list of things to plot ...
message_string = _IVY_STRING % (ac_id, message)
#print 'Binding to %s' % message_string
if not self.plots.has_key(ac_id):
self.plots[ac_id] = {}
if not self.plots[ac_id].has_key(message):
self.plots[ac_id][message] = {}
if self.plots[ac_id][message].has_key(field):
self.plots[ac_id][message][field].color = wx.Color(random.randint(0,255),random.randint(0,255),random.randint(0,255))
return
ivy_id = IvyBindMsg(self.OnIvyMsg, str(message_string))
title = '%i:%s:%s' % (ac_id, message, field)
self.plots[ac_id][message][field] = plot_data( ivy_id, title, self.plot_size, color)
self.frame.AddCurve( ivy_id, title, use_as_x)
if (use_as_x):
self.x_axis = self.plots[ac_id][message][field]
def CalcMinMax(self, plot):
if not self.auto_scale: return
for x in plot.data:
self.max = max(self.max, x)
self.min = min(self.min, x)
self.frame.SetMinMax(self.min, self.max)
def FindPlotName(self, ivy_id):
for ac_id in self.plots:
for msg in self.plots[ac_id]:
for field in self.plots[ac_id][msg]:
if self.plots[ac_id][msg][field].id == ivy_id:
return (ac_id, msg, field)
return (None, None, None)
def FindPlot(self, ivy_id):
(ac_id, msg, field) = self.FindPlotName( ivy_id)
if (ac_id == None):
return None
return self.plots[ac_id][msg][field]
def RemovePlot(self, ivy_id):
(ac_id, msg, field) = self.FindPlotName( ivy_id)
if ac_id == None: return
if (self.x_axis != None) and (self.x_axis.id == ivy_id):
self.x_axis = None
IvyUnBindMsg( ivy_id)
del self.plots[ac_id][msg][field]
if len(self.plots[ac_id][msg]) == 0:
del self.plots[ac_id][msg]
def OffsetPlot(self, ivy_id, offset):
plot = self.FindPlot( ivy_id)
if plot == None: return
plot.SetOffset(offset)
print 'panel value: %.2f' % value
CalcMinMax(plot)
def ScalePlot(self, ivy_id, offset):
plot = self.FindPlot( ivy_id)
if plot == None: return
plot.SetScale(offset)
CalcMinMax(plot)
def SetRealTime(self, ivy_id, value):
plot = self.FindPlot( ivy_id)
if plot == None: return
plot.SetRealTime(value)
def SetXAxis(self, ivy_id):
plot = self.FindPlot( ivy_id)
if plot == None: return
self.x_axis = plot
def ClearXAxis(self):
self.x_axis = None
def OnSize(self, size):
(width, height) = size
if( self.width == width and self.height == height):
return
self.pixmap = wx.EmptyBitmap(width, height)
self.width = width
self.height = height
self.plot_size = width
self.margin = min(self.height / 10, 20)
self.font = wx.Font(self.margin/2, wx.DEFAULT, wx.FONTFLAG_DEFAULT, wx.FONTWEIGHT_NORMAL)
def OnTimer(self):
self.timer.Restart(self.plot_interval)
self.frame.SetMinMax(self.min, self.max)
self.DrawFrame()
def DrawFrame(self):
dc = wx.ClientDC(self.parent)
bdc = wx.BufferedDC( dc, self.pixmap)
bdc.SetBackground( wx.Brush( "White"))
bdc.Clear()
self.DrawBackground(bdc, self.width, self.height)
title_y = 2
for ac_id in self.plots:
for message in self.plots[ac_id]:
for field in self.plots[ac_id][message]:
plot = self.plots[ac_id][message][field]
if (self.x_axis != None) and (self.x_axis.id == plot.id): continue
title_height = plot.DrawTitle(bdc, 2, self.width, title_y)
plot.DrawCurve(bdc, self.width, self.height, self.margin, self.max, self.min, self.x_axis)
title_y += title_height + 2
def DrawBackground(self, dc, width, height):
# Time Graduations
dc.SetFont(self.font)
if self.x_axis == None:
t = self.plot_interval * width
t1 = "0.0s"
t2 = "-%.1fs" % (t/2000.0)
t3 = "-%.1fs" % (t/1000.0)
else:
x_max = self.x_axis.x_max
x_min = self.x_axis.x_min
t1 = "%.2f" % x_max
t2 = "%.2f" % (x_min + (x_max-x_min)/2.0)
t3 = "%.2f" % x_min
(w,h) = dc.GetTextExtent(t1)
dc.DrawText(t1, width-w, height-h)
#(w,h) = dc.GetTextExtent(t2) #save time since h will be the same
dc.DrawText(t2, width/2, height-h)
#(w,h) = dc.GetTextExtent(t3) #save time since h will be the same
dc.DrawText(t3, 0, height-h)
# Y graduations
if self.max == -1e32: return
(_min_, _max_) = (self.min, self.max)
if _max_ < _min_: #prevent divide by zero or inversion
(_min_, _max_) = (-1, 1)
if _max_ == _min_:
(_min_, _max_) = (_max_-0.5, _max_+0.5)
delta = _max_-_min_
dy = (height - self.margin*2) / delta
scale = math.log10( delta)
d = math.pow(10.0, math.floor(scale))
u = d
if delta < 2*d: u=d/5
elif delta < 5*d: u=d/2
tick_min =_min_ - math.fmod(_min_, u)
for i in range( int(delta/u) + 1):
tick = tick_min + float(i)*u
s = str(tick)
(w,h) = dc.GetTextExtent(s)
y = height-self.margin-int((tick-_min_)*dy)-h/2
dc.DrawText(s, 0, y)
+33
View File
@@ -0,0 +1,33 @@
#!/usr/bin/env python
#Boa:App:BoaApp
import wx
import getopt
import sys
import plotframe
modules ={u'PlotFrame': [1, 'Main frame of Application', u'plotframe.py'],
u'messages_xml_map': [0, '', u'messages_xml_map.py'],
u'plotpanel': [0, '', u'plotpanel.py'],
u'realtimeplotapp': [0, '', u'realtimeplotapp.py'],
u'textdroptarget': [0, '', u'textdroptarget.py']}
class RealTimePlotApp(wx.App):
def OnInit(self):
self.main = plotframe.create(None)
self.main.Show()
self.SetTopWindow(self.main)
opts, args = getopt.getopt(sys.argv[1:], "p:",
["plot"])
for o,a in opts:
if o in ("-p", "--plot"):
[ac_id, message, field, color, use_x] = a.split(':')
self.main.AddPlot(int(ac_id), message, field, color, bool(int(use_x)))
return True
def main():
application = RealTimePlotApp(0)
application.MainLoop()
if __name__ == '__main__':
main()
@@ -0,0 +1,13 @@
import wx
class TextDropTarget(wx.TextDropTarget):
""" This object implements Drop Target functionality for Text """
def __init__(self, reference):
""" Initialize the Drop Target, passing in the Object Reference to
indicate what should receive the dropped text """
wx.TextDropTarget.__init__(self)
self.reference = reference
def OnDropText(self, x, y, data):
""" When text is dropped, send it to the object specified """
self.reference.OnDropText(data)
+58
View File
@@ -0,0 +1,58 @@
#!/usr/bin/env python
#Boa:App:BoaApp
import wx
import os
import settingsframe
import getopt
import sys
sys.path.append(os.getenv("JOBY_PYTHON") + "/joby_lib")
import settings_xml_parse
DEFAULT_AC_IDS = [ 11 ] # biwhirly
def Usage(scmd):
lpathitem = scmd.split('/')
fmt = '''Usage: %s [-h | --help] [-a AC_ID | --ac_id=AC_ID]
where
\t-h | --help print this message
\t-a AC_ID | --ac_id=AC_ID where AC_ID is an aircraft ID to use for settings (multiple IDs may passed)
'''
print fmt % lpathitem[-1]
def GetOptions():
options = {'ac_id':[]}
try:
optlist, left_args = getopt.getopt(sys.argv[1:],'h:a:', ['help', 'ac_id='])
except getopt.GetoptError:
# print help information and exit:
Usage(sys.argv[0])
sys.exit(2)
for o, a in optlist:
if o in ("-h", "--help"):
Usage(sys.argv[0])
sys.exit()
elif o in ("-a", "--ac_id"):
options['ac_id'].append(int(a))
return options
class SettingsApp(wx.App):
def OnInit(self):
options = GetOptions()
if len(options['ac_id']) > 0:
ac_ids = options['ac_id']
else:
ac_ids = DEFAULT_AC_IDS
self.main = settingsframe.create(None, ac_ids)
self.main.Show()
self.SetTopWindow(self.main)
return True
def main():
application = SettingsApp(0)
application.MainLoop()
if __name__ == '__main__':
main()
@@ -0,0 +1,159 @@
#Boa:Frame:PlotFrame
import wx
import sys
import os
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
from settings_tool import IvySettingsInterface
def create(parent, ac_ids):
return SettingsFrame(parent, ac_ids)
SLIDER_ID_OFFSET = 250000
BUTTON_ID_OFFSET = 2 * 250000
SLIDER_FACTOR = 100
class SettingsFrame(wx.Frame):
edits = []
sliders = []
buttons = []
def __init__(self, parent, ac_ids):
self.settings = IvySettingsInterface(ac_ids)
title = "Settings %s (%s)" % (ac_ids, self.settings.GetACName())
wx.Frame.__init__(self, name=u'SettingsFrame', parent=parent, title=title, size=(480, 320))
self.book = wx.Notebook(self)
self.updates = []
self.Bind( wx.EVT_CLOSE, self.OnClose)
for setting_group in self.settings.groups:
page = wx.Panel(self.book)
vert_box = wx.BoxSizer(orient=wx.VERTICAL)
for setting in setting_group.member_list:
horz_box = wx.BoxSizer(orient=wx.HORIZONTAL)
text = wx.StaticText(page, label=setting.shortname, size=(100,30))
edit = wx.TextCtrl(page, name=setting.shortname, id=setting.index)
self.setBold(edit, True)
edit.SetValue(str(setting.value))
max_v = int(setting.max_value) * SLIDER_FACTOR
min_v = int(setting.min_value) * SLIDER_FACTOR
if (min_v >= max_v):
max_v = max_v + 1
slider = wx.Slider(page, minValue=min_v, maxValue=max_v, style=wx.SL_HORIZONTAL | wx.SL_AUTOTICKS, size=(200, 30), id=setting.index + SLIDER_ID_OFFSET)
slider.SetLineSize(setting.step * SLIDER_FACTOR)
button = wx.Button(page, id=setting.index + BUTTON_ID_OFFSET, label="Apply")
self.Bind(wx.EVT_SLIDER, self.sliderUpdate, slider)
slider.Bind(wx.EVT_MOUSEWHEEL, self.sliderWheel, slider)
self.Bind(wx.EVT_TEXT, self.editUpdate, edit)
self.Bind(wx.EVT_BUTTON, self.onButton)
self.edits.append(edit)
self.sliders.append(slider)
self.buttons.append(button)
horz_box.AddWindow(text)
horz_box.AddWindow(edit)
horz_box.AddWindow(slider)
horz_box.AddWindow(button)
vert_box.AddWindow(horz_box)
page.SetSizer(vert_box)
self.book.AddPage(page, setting_group.name)
self.settings.RegisterCallback(self.onUpdate)
# Called on mouse wheel events (default handler seems backwards?)
def sliderWheel(self, event):
slider = event.GetEventObject()
if (event.GetWheelRotation() > 0):
slider.SetValue(slider.GetValue() + slider.GetLineSize())
else:
slider.SetValue(slider.GetValue() - slider.GetLineSize())
self.updateEditFromSlider(slider)
# Called on slider update
def sliderUpdate(self, event):
slider = event.GetEventObject()
self.updateEditFromSlider(slider)
# Copy slider value into associated edit box
def updateEditFromSlider(self, slider):
index = int(slider.GetId())
if index >= SLIDER_ID_OFFSET:
index = index - SLIDER_ID_OFFSET
if (self.settings.lookup[index].step < 1):
value = float(slider.GetValue()) / SLIDER_FACTOR
else:
value = int(slider.GetValue()) / SLIDER_FACTOR
self.edits[index].ChangeValue(str(value))
self.setBold(self.edits[index], True)
# Called on edit box update
def editUpdate(self, event):
edit = event.GetEventObject()
index = int(edit.GetId())
try:
value = float(edit.GetValue())
except:
return
self.sliders[index].SetValue(value * SLIDER_FACTOR)
self.setBold(self.edits[index], True)
# Called on button push
def onButton(self, event):
button = event.GetEventObject()
index = int(button.GetId())
if index >= BUTTON_ID_OFFSET:
index = index - BUTTON_ID_OFFSET
self.settings.lookup[index].value = float(self.sliders[index].GetValue()) / SLIDER_FACTOR
self.settings.SendSetting(index)
def SetAndSendValue(self, index, value):
self.edits[index].SetValue("%.2f" % value)
button = self.buttons[index]
click_event = wx.CommandEvent(wx.wxEVT_COMMAND_BUTTON_CLICKED, button.GetId())
click_event.SetEventObject(button)
button.Command(click_event)
# Called for remote settings updates
def onUpdate(self, index, value, fromRemote):
# Schedule the call for later via wx (run after events)
# to prevent crashy crashy
wx.CallAfter(self.update_value, index, value, fromRemote)
# helper function to toggle edit box boldness (bold = user-set, normal=downlink-received)
def setBold(self, editCtrl, bold):
font = editCtrl.GetFont()
if (bold):
font.SetWeight(wx.FONTWEIGHT_BOLD)
else:
font.SetWeight(wx.FONTWEIGHT_NORMAL)
editCtrl.SetFont(font)
# Called to update GUI with new values
def update_value(self, index, value, fromRemote):
editCtrl = self.edits[index]
slider = self.sliders[index]
if fromRemote and editCtrl.FindFocus() == editCtrl:
# don't process remote updates if the control is focused
return
if (self.settings.lookup[index].step < 1):
editCtrl.SetValue("%.2f" % float(value))
else:
editCtrl.SetValue("%i" % int(float(value)))
self.setBold(editCtrl, not fromRemote)
slider.SetValue(int(float(value) * SLIDER_FACTOR))
def OnClose(self, event):
# need to forward close to canvas so that ivy is shut down, otherwise ivy hangs the shutdown
self.settings.OnClose()
self.Destroy()
+166
View File
@@ -0,0 +1,166 @@
#!/usr/bin/env python
from ivy.std_api import *
import socket
import struct
import os
import logging
import sys
import threading
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
import messages_xml_map
class IvyUdpLink():
def __init__(self):
self.InitIvy()
self.status_timer = threading.Timer(1.0, self.sendStatus)
self.run_time = 0
self.rx_bytes = 0
self.rx_msgs = 0
self.last_rx_bytes = 0
self.last_rx_msgs = 0
self.rx_err = 0
messages_xml_map.ParseMessages()
def ProcessMessage(self, message_values, fromRemote):
# Extract aircraft id from message and ignore if not matching
msg_ac_id = int(message_values[0])
if (msg_ac_id != self.ac_ids[0]):
return
# Extract setting value
setting_index = int(message_values[1])
setting_value = message_values[2]
# Called for DL_VALUE (from aircraft)
def OnValueMsg(self, agent, *larg):
# Extract field values
message_values = larg[0].split(' ')
message_values = message_values[0:1] + message_values[2:]
self.ProcessMessage(message_values, True)
def InitIvy(self):
# initialising the bus
IvyInit("UdpLink", # application name for Ivy
"READY", # ready message
0, # main loop is local (ie. using IvyMainloop)
lambda x,y: y, # handler called on connection/deconnection
lambda x,y: y # handler called when a diemessage is received
)
# starting the bus
logging.getLogger('Ivy').setLevel(logging.WARN)
IvyStart("")
#IvyBindMsg(self.OnValueMsg, "(^.* DL_VALUE .*)")
def sendStatus(self):
self.run_time = self.run_time + 1
IvySendMsg("11 DOWNLINK_STATUS %i %i %i %i %i %i %i" % (
self.run_time,
self.rx_bytes - self.last_rx_bytes,
self.rx_msgs - self.last_rx_msgs,
self.rx_err,
self.rx_bytes,
self.rx_msgs,
0 ))
self.last_rx_bytes = self.rx_bytes
self.last_rx_msgs = self.rx_msgs
self.status_timer = threading.Timer(1.0, self.sendStatus)
self.status_timer.start()
def ProcessPacket(self, msg):
if len(msg) < 4:
self.rx_err = self.rx_err + 1
return
msg_offset = 0
start_byte = ord(msg[msg_offset])
if start_byte != 0x99:
self.rx_err = self.rx_err + 1
return
msg_offset = msg_offset + 1
ac_id = ord(msg[msg_offset])
msg_offset = msg_offset + 1
while msg_offset < len(msg):
msg_id = ord(msg[msg_offset])
msg_offset = msg_offset + 1
start_byte2 = ord(msg[msg_offset])
if start_byte2 != 0x66:
self.rx_err = self.rx_err + 1
return
msg_offset = msg_offset + 1
msg_name = messages_xml_map.message_dictionary_id_name[msg_id]
msg_fields = messages_xml_map.message_dictionary_types[msg_id]
ivy_msg = "%i %s" % (ac_id, msg_name)
for field in msg_fields:
if field == "float":
value = struct.unpack('f', str(msg[msg_offset:msg_offset + 4]))[0]
ivy_msg = "%s %f" % (ivy_msg, value)
msg_offset = msg_offset + 4
elif field == "uint8":
value = struct.unpack('B', str(msg[msg_offset:msg_offset + 1]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 1
elif field == "uint16":
value = struct.unpack('H', str(msg[msg_offset:msg_offset + 2]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 2
elif field == "uint32":
value = struct.unpack('L', str(msg[msg_offset:msg_offset + 4]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 4
elif field == "int8":
value = struct.unpack('b', str(msg[msg_offset:msg_offset + 1]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 1
elif field == "int16":
value = struct.unpack('h', str(msg[msg_offset:msg_offset + 2]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 2
elif field == "int32":
value = struct.unpack('l', str(msg[msg_offset:msg_offset + 4]))[0]
ivy_msg = "%s %i" % (ivy_msg, value)
msg_offset = msg_offset + 4
elif field == "uint8[]":
value = struct.unpack('B', str(msg[msg_offset:msg_offset + 1]))[0]
msg_offset = msg_offset + 1
for count in range(0, value):
array_value = struct.unpack('B', str(msg[msg_offset:msg_offset + 1]))[0]
msg_offset = msg_offset + 1
ivy_msg = "%s,%i" % (ivy_msg, array_value)
else:
print "Unknown field type %s" % field
self.rx_msgs = self.rx_msgs + 1
self.rx_bytes = self.rx_bytes + len(msg)
IvySendMsg(ivy_msg)
#print ivy_msg
def Run(self):
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg_count = 0
server.bind(('0.0.0.0' , 51914))
self.status_timer.start()
while True:
msg = server.recv(2048)
msg_count = msg_count + 1
self.ProcessPacket(msg)
#if (msg_count % 120 == 0):
# print msg_count
def main():
udp_interface = IvyUdpLink()
udp_interface.Run()
if __name__ == '__main__':
main()
+63
View File
@@ -0,0 +1,63 @@
import messages_xml_map
from ivy.std_api import *
import logging
import time
import os
class Message:
def __init__(self, name):
messages_xml_map.ParseMessages()
self.field_value = []
self.field_names = messages_xml_map.message_dictionary[name]
self.field_controls = []
self.index = None
self.last_seen = time.clock()
self.name = name
class Aircraft:
def __init__(self, id):
self.ac_id = id
self.messages = {}
self.messages_book = None
class IvyMessagesInterface():
def __init__(self, callback, initIvy = True):
self.callback = callback
self.ivy_id = 0
self.InitIvy(initIvy)
def Stop(self):
IvyUnBindMsg(self.ivy_id)
def __del__(self):
try:
IvyUnBindMsg(self.ivy_id)
except:
pass
def InitIvy(self, initIvy):
if initIvy:
IvyInit("Messages %i" % os.getpid(), "READY", 0, lambda x,y: y, lambda x,y: y)
logging.getLogger('Ivy').setLevel(logging.WARN)
IvyStart("")
self.ivy_id = IvyBindMsg(self.OnIvyMsg, "(.*)")
def OnIvyMsg(self, agent, *larg):
data = larg[0].split(' ')
try:
ac_id = int(data[0])
name = data[1]
values = data[2:]
self.callback(ac_id, name, values)
except ValueError:
pass
except:
raise
def test():
message = Message("WHIRLY")
print message
print message.field_names
if __name__ == '__main__':
test()
+76
View File
@@ -0,0 +1,76 @@
#!/usr/bin/env python
import os
import sys
import getopt
messages_path = '%s/conf/messages.xml' % os.getenv("PAPARAZZI_HOME")
message_dictionary = {}
message_dictionary_types = {}
message_dictionary_id_name = {}
def Usage(scmd):
lpathitem = scmd.split('/')
fmt = '''Usage: %s [-h | --help] [-f FILE | --file=FILE]
where
\t-h | --help print this message
\t-f FILE | --file=FILE where FILE is path to messages.xml
'''
print fmt % lpathitem[-1]
def GetOptions():
try:
optlist, left_args = getopt.getopt(sys.argv[1:],'hf:', ['help','file='])
except getopt.GetoptError:
# print help information and exit:
Usage(sys.argv[0])
sys.exit(2)
for o, a in optlist:
if o in ("-h", "--help"):
Usage(sys.argv[0])
sys.exit()
elif o in ("-f", "--file"):
messages_path = a
def ParseMessages():
from lxml import etree
tree = etree.parse( messages_path)
for the_message in tree.xpath("//class[@name='telemetry']/message[@name]"):
message_name = the_message.attrib['name']
if the_message.attrib.has_key('id'):
message_id = the_message.attrib['id']
else:
message_id = the_message.attrib['ID']
if (message_id[0:2] == "0x"):
message_id = int(message_id, 16)
else:
message_id = int(message_id)
message_dictionary_id_name[message_id] = message_name
# insert this message into our dictionary as a list with room for the fields
message_dictionary[message_name] = []
message_dictionary_types[message_id] = []
for the_field in the_message.xpath('field[@name]'):
# for now, just save the field names -- in the future maybe expand this to save a struct?
message_dictionary[message_name].append( the_field.attrib['name'])
message_dictionary_types[message_id].append( the_field.attrib['type'])
def test():
GetOptions()
ParseMessages()
print message_dictionary['WHIRLY']
print message_dictionary['WHIRLY'].index('plane2_pitch')
print message_dictionary['ACTUATORS']
print message_dictionary['ACTUATORS'].index('actuator0')
print message_dictionary_types[53]
print message_dictionary_id_name[53]
if __name__ == '__main__':
test()
+84
View File
@@ -0,0 +1,84 @@
#!/usr/bin/env python
from ivy.std_api import *
import os
import logging
import sys
sys.path.append(os.getenv("PAPARAZZI_HOME") + "/sw/lib/python")
from settings_xml_parse import PaparazziACSettings
_SHOW_IVY_MSGS_ = False
class IvySettingsInterface(PaparazziACSettings):
def __init__(self, ac_ids):
PaparazziACSettings.__init__(self, ac_ids[0])
self.update_callback = None
self.InitIvy()
self.ac_ids = ac_ids
def ProcessMessage(self, message_values, fromRemote):
# Extract aircraft id from message and ignore if not matching
msg_ac_id = int(message_values[0])
if (msg_ac_id != self.ac_ids[0]):
return
# Extract setting value
setting_index = int(message_values[1])
setting_value = message_values[2]
# Store value from message
self.lookup[setting_index].value = setting_value
# Callback (if present)
if self.update_callback != None:
self.update_callback(setting_index, setting_value, fromRemote)
if _SHOW_IVY_MSGS_:
print "index: %s value %s " % (setting_index, setting_value)
# Called for DL_VALUE (from aircraft)
def OnValueMsg(self, agent, *larg):
# Extract field values
message_values = larg[0].split(' ')
message_values = message_values[0:1] + message_values[2:]
self.ProcessMessage(message_values, True)
# Called for DL_SETTING (from ground)
def OnSettingMsg(self, agent, *larg):
# Extract field values
message_values = larg[0].split(' ')
self.ProcessMessage(message_values, False)
def RegisterCallback(self, callback_function):
self.update_callback = callback_function
def InitIvy(self):
# initialising the bus
IvyInit("settings_app", # application name for Ivy
"", # ready message
0, # main loop is local (ie. using IvyMainloop)
lambda x,y: y, # handler called on connection/deconnection
lambda x,y: y # handler called when a diemessage is received
)
# starting the bus
logging.getLogger('Ivy').setLevel(logging.WARN)
IvyStart("")
IvyBindMsg(self.OnValueMsg, "(^.* DL_VALUE .*)")
IvyBindMsg(self.OnSettingMsg, "dl DL_SETTING (.*)")
def SendSetting(self, setting_index):
for ac_id in self.ac_ids:
IvySendMsg("dl DL_SETTING %s %s %s" % (ac_id, setting_index, self.lookup[setting_index].value))
def OnClose(self):
IvyStop()
def main():
ac_id = [ 11 ]
ivy_interface = IvySettingsInterface(ac_id)
if __name__ == '__main__':
main()
+93
View File
@@ -0,0 +1,93 @@
#!/usr/bin/env python
import os
import sys
from lxml import etree
# Class for all settings
class PaparazziACSettings:
"Paparazzi Settings Class"
ac_id = 0
groups = []
lookup = []
name_lookup = {}
# Takes a string file path for settings XML file and
# returns a settings AC object
def __init__(self, ac_id):
self.ac_id = ac_id
paparazzi_home = os.getenv("PAPARAZZI_HOME")
conf_xml_path = "%s/conf/conf.xml" % paparazzi_home
conf_tree = etree.parse(conf_xml_path)
# extract aircraft node from conf.xml file
ac_node = conf_tree.xpath('/conf/aircraft[@ac_id=%i]' % ac_id)
if (len(ac_node) != 1):
print "Aircraft ID %i not found." % ac_id
# get settings file path from aircraft xml node
settings_xml_path = "%s/conf/%s" % (paparazzi_home, ac_node[0].attrib['settings'])
# save AC name for reference
self.name = ac_node[0].attrib['name']
tree = etree.parse(settings_xml_path)
index = 0 # keep track of index/id of setting starting at 0
for the_tab in tree.xpath("//dl_settings[@NAME]"):
setting_group = PaparazziSettingsGroup(the_tab.attrib['NAME'])
for the_setting in the_tab.xpath('dl_setting[@VAR]'):
if the_setting.attrib.has_key('shortname'):
name = the_setting.attrib['shortname']
else:
name = the_setting.attrib['VAR']
settings = PaparazziSetting(name)
settings.index = index
settings.min_value = float(the_setting.attrib['MIN'])
settings.max_value = float(the_setting.attrib['MAX'])
settings.step = float(the_setting.attrib['STEP'])
setting_group.member_list.append(settings)
self.lookup.append(settings)
self.name_lookup[name] = settings
index = index + 1
self.groups.append(setting_group)
def GetACName(self):
return self.name
# Class for named group of settings
class PaparazziSettingsGroup:
"Paparazzi Setting Group Class"
name = 0
member_list = []
def __init__(self, name):
self.name = name
self.member_list = []
# Class for a single paparazzi setting
class PaparazziSetting:
"Paparazzi Setting Class"
shortname = ""
min_value = 0
max_value = 1
step = 1
index = 0
value = None
def __init__(self, shortname):
self.shortname = shortname
def test():
ac_id = 11
ac_settings = PaparazziACSettings(ac_id)
for setting_group in ac_settings.groups:
print setting_group.name
for setting in setting_group.member_list:
print " " + setting.shortname
if __name__ == '__main__':
test()