[natnet] add a NatNet3 to IVY bridge in Python (#2198)

The new version of the Optitrack software (Motive 2.0) comes with NatNet
version 3. A Python module is provided with NatNet for
serialization/deserialization of the data streamed over the network.
This module is slightly modified here for a better handling of the
initialization and for stopping the threads.
The PPRZLINK message sent is REMOTE_GPS_LOCAL, so the transformation to
global coordinate system is left to the airborne part.
This commit is contained in:
Gautier Hattenberger
2017-12-07 16:53:32 +01:00
committed by GitHub
parent 555eed9f75
commit ef144a21e7
4 changed files with 693 additions and 2 deletions
+1
View File
@@ -47,6 +47,7 @@
<program name="Attitude Visualizer" command="sw/tools/attitude_viz.py"/>
<program name="App Server" command="sw/ground_segment/tmtc/app_server"/>
<program name="NatNet" command="sw/ground_segment/misc/natnet2ivy"/>
<program name="NatNet3" command="sw/ground_segment/python/natnet3.x/natnet2ivy.py"/>
<program name="Ivy2Nmea" command="sw/ground_segment/tmtc/ivy2nmea">
<arg flag="--port" constant="/dev/ttyUSB1"/>
<arg flag="--id" constant="1"/>
+5 -2
View File
@@ -723,8 +723,11 @@ static gboolean sample_data(GIOChannel *chan, GIOCondition cond, gpointer data)
bytes_data += udp_socket_recv(&natnet_data, buffer_data, MAX_PACKETSIZE);
// Parse NatNet data
if (bytes_data >= 2 && bytes_data >= buffer_data[1]) {
natnet_parse(buffer_data);
if (bytes_data >= 2) {
uint16_t packet_size = ((uint16_t)buffer_data[3])<<8 | (uint16_t)buffer_data[2];
if( bytes_data - 4 >= packet_size) { // 4 bytes for message id and packet size
natnet_parse(buffer_data);
}
bytes_data = 0;
}
File diff suppressed because it is too large Load Diff
+177
View File
@@ -0,0 +1,177 @@
#!/usr/bin/env python3
#
# Copyright (C) 2017 Gautier Hattenberger <gautier.hattenberger@enac.fr>
#
# This file is part of paparazzi.
#
# paparazzi is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# paparazzi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with paparazzi; see the file COPYING. If not, see
# <http://www.gnu.org/licenses/>.
#
'''
Forward rigid body position from NatNet (Optitrack positioning system)
to the IVY bus as a REMOTE_GPS_LOCAL message
As the NatNetClient is only compatible with Python 3.x, the Ivy python
should be installed for this version, eventually by hand as paparazzi
packages are only providing an install for Python 2.x (although the
source code itself is compatile for both version)
Manual installation of Ivy:
1. git clone https://gitlab.com/ivybus/ivy-python.git
2. cd ivy-python
3. sudo python3 setup.py install
Otherwise, you can use PYTHONPATH if you don't want to install the code
in your system
'''
from __future__ import print_function
import sys
from os import path, getenv
from time import time, sleep
import numpy as np
from collections import deque
import argparse
# import NatNet client
from NatNetClient import NatNetClient
# if PAPARAZZI_HOME not set, then assume the tree containing this
# file is a reasonable substitute
PPRZ_HOME = getenv("PAPARAZZI_HOME", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
sys.path.append(PPRZ_HOME + "/var/lib/python")
sys.path.append(PPRZ_HOME + "/var/lib/python/pprzlink") # seems needed for messages_xml_map file
from pprzlink.ivy import IvyMessagesInterface
from pprzlink.message import PprzMessage
# parse args
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-ac', action='append', nargs=2,
metavar=('rigid_id','ac_id'), help='pair of rigid body and A/C id (multiple possible)')
parser.add_argument('-b', '--ivy_bus', dest='ivy_bus', help="Ivy bus address and port")
parser.add_argument('-s', '--server', dest='server', default="127.0.0.1", help="NatNet server IP address")
parser.add_argument('-m', '--multicast_addr', dest='multicast', default="239.255.42.99", help="NatNet server multicast address")
parser.add_argument('-dp', '--data_port', dest='data_port', type=int, default=1511, help="NatNet server data socket UDP port")
parser.add_argument('-cp', '--command_port', dest='command_port', type=int, default=1510, help="NatNet server command socket UDP port")
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help="display debug messages")
parser.add_argument('-f', '--freq', dest='freq', default=10, type=int, help="transmit frequency")
parser.add_argument('-vs', '--vel_samples', dest='vel_samples', default=4, type=int, help="amount of samples to compute velocity (should be greater than 2)")
args = parser.parse_args()
if args.ac is None:
print("At least one pair of rigid boby / AC id must be declared")
exit()
# dictionary of ID associations
id_dict = dict(args.ac)
# initial time per AC
timestamp = dict([(ac_id, time()) for ac_id in id_dict.keys()])
period = 1. / args.freq
# initial track per AC
track = dict([(ac_id, deque()) for ac_id in id_dict.keys()])
# start ivy interface
if args.ivy_bus is not None:
ivy = IvyMessagesInterface("natnet2ivy", ivy_bus=args.ivy_bus)
else:
ivy = IvyMessagesInterface("natnet2ivy")
# store track function
def store_track(ac_id, pos, t):
if ac_id in id_dict.keys():
track[ac_id].append((pos, t))
if len(track[ac_id]) > args.vel_samples:
track[ac_id].popleft()
# compute velocity from track
# returns zero if not enough samples
def compute_velocity(ac_id):
vel = [ 0., 0., 0. ]
if len(track[ac_id]) >= args.vel_samples:
nb = -1
for (p2, t2) in track[ac_id]:
nb = nb + 1
if nb == 0:
p1 = p2
t1 = t2
else:
dt = t2 - t1
vel[0] = (p2[0] - p1[0]) / dt
vel[1] = (p2[1] - p1[1]) / dt
vel[2] = (p2[2] - p1[2]) / dt
p1 = p2
t1 = t2
if nb > 0:
vel[0] / nb
vel[1] / nb
vel[2] / nb
return vel
# This is a callback function that gets connected to the NatNet client. It is called once per rigid body per frame
def receiveRigidBodyFrame( ac_id, pos, quat ):
t = time()
i = str(ac_id)
if i not in id_dict.keys():
return
store_track(i, pos, t)
if abs(t - timestamp[i]) < period:
return # too early for next message
timestamp[i] = t
msg = PprzMessage("datalink", "REMOTE_GPS_LOCAL")
msg['ac_id'] = id_dict[i]
msg['pad'] = 0
msg['enu_x'] = pos[0]
msg['enu_y'] = pos[1]
msg['enu_z'] = pos[2]
vel = compute_velocity(i)
msg['enu_xd'] = vel[0]
msg['enu_yd'] = vel[1]
msg['enu_zd'] = vel[2]
msg['tow'] = int(timestamp[i]) # TODO convert to GPS itow ?
# convert quaternion to psi euler angle
dcm_0_0 = 1.0 - 2.0 * (quat[1] * quat[1] + quat[2] * quat[2])
dcm_1_0 = 2.0 * (quat[0] * quat[1] - quat[3] * quat[2])
msg['course'] = 180. * np.arctan2(dcm_1_0, dcm_0_0) / 3.14
ivy.send(msg)
# start natnet interface
natnet = NatNetClient(
server=args.server,
rigidBodyListener=receiveRigidBodyFrame,
dataPort=args.data_port,
commandPort=args.command_port,
verbose=args.verbose)
print("Starting Natnet3.x to Ivy interface at %s" % (args.server))
try:
# Start up the streaming client.
# This will run perpetually, and operate on a separate thread.
natnet.run()
while True:
sleep(1)
except (KeyboardInterrupt, SystemExit):
print("Shutting down ivy and natnet interfaces...")
natnet.stop()
ivy.shutdown()