Circular formations. More elegant script and examples (#2206)

This commit is contained in:
Hector Garcia de Marina
2017-12-20 18:39:24 +01:00
committed by Gautier Hattenberger
parent 1a506e84e5
commit 695917a095
5 changed files with 327 additions and 227 deletions
@@ -0,0 +1,207 @@
#!/usr/bin/env python
#
# Copyright (C) 2017 Hector Garcia de Marina <hgdemarina@gmail.com>
# Gautier Hattenberger <gautier.hattenberger@enac.fr>
#
# This file is part of paparazzi.
#
# paparazzi is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# paparazzi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with paparazzi; see the file COPYING. If not, see
# <http://www.gnu.org/licenses/>.
'''
Centralized circular formations employing guidance vector fields (gvf)
'''
import sys
import numpy as np
import json
from time import sleep
from os import path, getenv
PPRZ_HOME = getenv("PAPARAZZI_HOME", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
PPRZ_SRC = getenv("PAPARAZZI_SRC", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
sys.path.append(PPRZ_HOME + "/var/lib/python/")
sys.path.append(PPRZ_SRC + "/sw/lib/python")
from pprzlink.ivy import IvyMessagesInterface
from pprzlink.message import PprzMessage
from settings_xml_parse import PaparazziACSettings
class Aircraft:
def __init__(self, ac_id):
self.initialized_gvf = False
self.initialized_nav = False
self.id = ac_id
self.XY = np.zeros(2)
self.XYc = np.zeros(2)
self.a = 0
self.b = 0
self.s = 1
self.sigma = 0
self.a_index = 0
self.b_index = 0
class FormationControl:
def __init__(self, config, freq=10., verbose=False):
self.config = config
self.step = 1. / freq
self.verbose = verbose
self.ids = self.config['ids']
self.B = np.array(self.config['topology'])
self.Zdesired = np.array(self.config['desired_intervehicle_angles'])
self.k = np.array(self.config['gain'])
self.radius = np.array(self.config['desired_stationary_radius'])
self.aircraft = [Aircraft(i) for i in self.ids]
self.sigmas = np.zeros(len(self.aircraft))
for ac in self.aircraft:
settings = PaparazziACSettings(ac.id)
list_of_indexes = ['ell_a', 'ell_b']
for setting_ in list_of_indexes:
try:
index = settings.name_lookup[setting_].index
if setting_ == 'ell_a':
ac.a_index = index
if setting_ == 'ell_b':
ac.b_index = index
except Exception as e:
print(e)
print(setting_ + " setting not found, \
have you forgotten to check gvf.xml for your settings?")
# Start IVY interface
self._interface = IvyMessagesInterface("Circular Formation")
# bind to NAVIGATION message
def nav_cb(ac_id, msg):
if ac_id in self.ids and msg.name == "NAVIGATION":
ac = self.aircraft[self.ids.index(ac_id)]
ac.XY[0] = float(msg.get_field(2))
ac.XY[1] = float(msg.get_field(3))
ac.initialized_nav = True
self._interface.subscribe(nav_cb, PprzMessage("telemetry", "NAVIGATION"))
def gvf_cb(ac_id, msg):
if ac_id in self.ids and msg.name == "GVF":
if int(msg.get_field(1)) == 1:
ac = self.aircraft[self.ids.index(ac_id)]
param = msg.get_field(4)
ac.XYc[0] = float(param[0])
ac.XYc[1] = float(param[1])
ac.a = float(param[2])
ac.b = float(param[3])
ac.s = float(msg.get_field(2))
ac.initialized_gvf = True
self._interface.subscribe(gvf_cb, PprzMessage("telemetry", "GVF"))
def __del__(self):
self.stop()
def stop(self):
# Stop IVY interface
if self._interface is not None:
self._interface.shutdown()
def circular_formation(self):
'''
circular formation control algorithm
'''
ready = True
for ac in self.aircraft:
if (not ac.initialized_nav) or (not ac.initialized_gvf):
if self.verbose:
print("Waiting for state of aircraft ", ac.id)
ready = False
if not ready:
return
i = 0
for ac in self.aircraft:
ac.sigma = np.arctan2(ac.XY[1]-ac.XYc[1], ac.XY[0]-ac.XYc[0])
self.sigmas[i] = ac.sigma
i = i + 1
inter_sigma = self.B.transpose().dot(self.sigmas)
error_sigma = inter_sigma - self.Zdesired
if np.size(error_sigma) > 1:
for i in range(0, np.size(error_sigma)):
if error_sigma[i] > np.pi:
error_sigma[i] = error_sigma[i] - 2*np.pi
elif error_sigma[i] <= -np.pi:
error_sigma[i] = error_sigma[i] + 2*np.pi
else:
if error_sigma > np.pi:
error_sigma = error_sigma - 2*np.pi
elif error_sigma <= -np.pi:
error_sigma = error_sigma + 2*np.pi
u = -self.aircraft[0].s*self.k*self.B.dot(error_sigma)
if self.verbose:
print("Inter-vehicle errors: ", str(error_sigma*180.0/np.pi).replace('[','').replace(']',''))
i = 0
for ac in self.aircraft:
msga = PprzMessage("ground", "DL_SETTING")
msga['ac_id'] = ac.id
msga['index'] = ac.a_index
msga['value'] = self.radius + u[i]
msgb = PprzMessage("ground", "DL_SETTING")
msgb['ac_id'] = ac.id
msgb['index'] = ac.b_index
msgb['value'] = self.radius + u[i]
self._interface.send(msga)
self._interface.send(msgb)
i = i + 1
def run(self):
try:
# The main loop
while True:
# TODO: make better frequency managing
sleep(self.step)
# Run the formation algorithm
self.circular_formation()
except KeyboardInterrupt:
self.stop()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Circular formation")
parser.add_argument('config_file', help="JSON configuration file")
parser.add_argument('-f', '--freq', dest='freq', default=5, type=int, help="control frequency")
parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true', help="display debug messages")
args = parser.parse_args()
with open(args.config_file, 'r') as f:
conf = json.load(f)
if args.verbose:
print(json.dumps(conf))
fc = FormationControl(conf, freq=args.freq, verbose=args.verbose)
fc.run()
+99 -56
View File
@@ -1,81 +1,124 @@
#!/usr/bin/env python
from __future__ import print_function
#!/usr/bin/env python
#
# Copyright (C) 2017 Hector Garcia de Marina <hgdemarina@gmail.com>
# Gautier Hattenberger <gautier.hattenberger@enac.fr>
#
# This file is part of paparazzi.
#
# paparazzi is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# paparazzi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with paparazzi; see the file COPYING. If not, see
# <http://www.gnu.org/licenses/>.
'''
Init the aircraft's tables for the decentralized circular formations module
'''
import time
import sys
import wx
import numpy as np
import sys
import json
from os import path, getenv
PPRZ_HOME = getenv("PAPARAZZI_HOME", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
PPRZ_SRC = getenv("PAPARAZZI_SRC", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
sys.path.append(PPRZ_SRC + "/sw/lib/python")
sys.path.append(PPRZ_HOME + "/var/lib/python")
from pprzlink.ivy import IvyMessagesInterface
from pprzlink.message import PprzMessage
from settings_xml_parse import PaparazziACSettings
from pprzlink.message import PprzMessage
list_ids = []
interface = IvyMessagesInterface("DCF")
class initTable:
def __init__(self, config, verbose=False):
self.verbose = verbose
self.config = config
self.ids = np.array(self.config['ids'])
self.B = np.array(self.config['topology'])
self.Zdesired = np.array(self.config['desired_intervehicle_angles'])
self.list_ids = np.ndarray.tolist(self.ids)
if len(sys.argv) != 4:
print("Usage: dcfInitTables topology.txt desired_sigma.txt ids.txt")
interface.shutdown()
exit()
# Start IVY interface
self._interface = IvyMessagesInterface("Init DCF tables")
B = np.loadtxt(sys.argv[1])
desired_sigmas = np.loadtxt(sys.argv[2])*np.pi/180.0
ids = np.loadtxt(sys.argv[3])
def __del__(self):
self.stop()
list_ids = np.ndarray.tolist(ids)
def stop(self):
# Stop IVY interface
if self._interface is not None:
self._interface.shutdown()
if np.size(ids) != np.size(B,0):
print("The ammount of aircrafts in the topology and ids do not match")
interface.shutdown()
exit()
time.sleep(2)
def init_dcftables(self):
time.sleep(2)
if len(list_ids) == 2:
B.shape = (2,1)
for count, column in enumerate(self.B.T):
index = np.nonzero(column)
i = index[0]
for count, column in enumerate(B.T):
index = np.nonzero(column)
i = index[0]
msg_clean_a = PprzMessage("datalink", "DCF_REG_TABLE")
msg_clean_a['ac_id'] = int(self.list_ids[i[0]])
msg_clean_a['nei_id'] = 0
msg_clean_b = PprzMessage("datalink", "DCF_REG_TABLE")
msg_clean_b['ac_id'] = int(self.list_ids[i[1]])
msg_clean_b['nei_id'] = 0
msg_clean_a = PprzMessage("datalink", "DCF_REG_TABLE")
msg_clean_a['ac_id'] = int(list_ids[i[0]])
msg_clean_a['nei_id'] = 0
msg_clean_b = PprzMessage("datalink", "DCF_REG_TABLE")
msg_clean_b['ac_id'] = int(list_ids[i[1]])
msg_clean_b['nei_id'] = 0
self._interface.send(msg_clean_a)
self._interface.send(msg_clean_b)
interface.send(msg_clean_a)
interface.send(msg_clean_b)
if self.verbose:
print(msg_clean_a)
print(msg_clean_b)
for count, column in enumerate(B.T):
index = np.nonzero(column)
i = index[0]
for count, column in enumerate(self.B.T):
index = np.nonzero(column)
i = index[0]
msga = PprzMessage("datalink", "DCF_REG_TABLE")
msga['ac_id'] = int(list_ids[i[0]])
msga['nei_id'] = int(list_ids[i[1]])
if len(list_ids) == 2:
msga['desired_sigma'] = int(desired_sigmas)
else:
msga['desired_sigma'] = int(desired_sigmas[count])
interface.send(msga)
msga = PprzMessage("datalink", "DCF_REG_TABLE")
msga['ac_id'] = int(self.list_ids[i[0]])
msga['nei_id'] = int(self.list_ids[i[1]])
if len(self.list_ids) == 2:
msga['desired_sigma'] = int(self.Zdesired)
else:
msga['desired_sigma'] = int(self.Zdesired[count])
self._interface.send(msga)
msgb = PprzMessage("datalink", "DCF_REG_TABLE")
msgb['ac_id'] = int(list_ids[i[1]])
msgb['nei_id'] = int(list_ids[i[0]])
if len(list_ids) == 2:
msgb['desired_sigma'] = int(desired_sigmas)
else:
msgb['desired_sigma'] = int(desired_sigmas[count])
interface.send(msgb)
msgb = PprzMessage("datalink", "DCF_REG_TABLE")
msgb['ac_id'] = int(self.list_ids[i[1]])
msgb['nei_id'] = int(self.list_ids[i[0]])
if len(self.list_ids) == 2:
msgb['desired_sigma'] = int(self.Zdesired)
else:
msgb['desired_sigma'] = int(self.Zdesired[count])
self._interface.send(msgb)
print(msga)
print(msgb)
if self.verbose:
print(msga)
print(msgb)
time.sleep(2)
interface.shutdown()
time.sleep(2)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="Init tables for DCF module")
parser.add_argument('config_file', help="JSON configuration file")
parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true', help="display debug messages")
args = parser.parse_args()
with open(args.config_file, 'r') as f:
conf = json.load(f)
if args.verbose:
print(json.dumps(conf))
try:
initTableDCF = initTable(conf, verbose=args.verbose)
initTableDCF.init_dcftables()
except KeyboardInterrupt:
initTableDCF.stop()
initTableDCF.stop()
@@ -0,0 +1,11 @@
{
"ids": [102, 103, 104],
"topology": [
[ 1, 0],
[-1, 1],
[ 0,-1]
],
"desired_intervehicle_angles": [0, 0],
"gain": 10,
"desired_stationary_radius": 80
}
@@ -0,0 +1,10 @@
{
"ids": [102, 103],
"topology": [
[ 1],
[-1]
],
"desired_intervehicle_angles": [0],
"gain": 10,
"desired_stationary_radius": 80
}
@@ -1,171 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import time
import sys
import wx
import numpy as np
import sys
from os import path, getenv
PPRZ_HOME = getenv("PAPARAZZI_HOME", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
PPRZ_SRC = getenv("PAPARAZZI_SRC", path.normpath(path.join(path.dirname(path.abspath(__file__)), '../../../../')))
sys.path.append(PPRZ_SRC + "/sw/lib/python")
sys.path.append(PPRZ_HOME + "/var/lib/python")
from pprzlink.ivy import IvyMessagesInterface
from pprzlink.message import PprzMessage
from settings_xml_parse import PaparazziACSettings
class aircraft:
def __init__(self, ac_id):
self.id = ac_id
self.XY = np.array([-999.0, -999.0])
self.XYc = np.array([-999.0, -999.0])
self.a = -999.0
self.b = -999.0
self.s = -999
self.sigma = -999.0
self.a_index = -999
self.b_index = -999
self.time = -999
list_ids = []
list_aircraft = []
interface = IvyMessagesInterface("GVF Formation")
def message_recv(ac_id, msg):
if ac_id in list_ids:
ac = list_aircraft[list_ids.index(ac_id)]
if msg.name == 'NAVIGATION':
ac.XY[0] = float(msg.get_field(2))
ac.XY[1] = float(msg.get_field(3))
if msg.name == 'GVF':
if int(msg.get_field(1)) == 1:
param = msg.get_field(4)
ac.XYc[0] = float(param[0])
ac.XYc[1] = float(param[1])
ac.a = float(param[2])
ac.b = float(param[3])
ac.s = float(msg.get_field(2))
if msg.name == 'BAT':
ac.time = float(msg.get_field(3))
return
def formation(B, ds, radius, k):
waiting_for_msgs = 0
for ac in list_aircraft:
if ac.a == -999.0:
print("Waiting for GVF msg of aircraft ", ac.id)
waiting_for_msgs = 1
if ac.XY[0] == -999.0:
print("Waiting for NAV msg of aircraft ", ac.id)
waiting_for_msgs = 1
if waiting_for_msgs == 1:
return
sigma = np.zeros(len(list_aircraft))
i = 0
for ac in list_aircraft:
ac.sigma = np.arctan2(ac.XY[1]-ac.XYc[1], ac.XY[0]-ac.XYc[0])
sigma[i] = ac.sigma
i = i + 1
inter_sigma = B.transpose().dot(sigma)
error_sigma = inter_sigma - ds
if np.size(error_sigma) > 1:
for i in range(0, np.size(error_sigma)):
if error_sigma[i] > np.pi:
error_sigma[i] = error_sigma[i] - 2*np.pi
elif error_sigma[i] <= -np.pi:
error_sigma[i] = error_sigma[i] + 2*np.pi
else:
if error_sigma > np.pi:
error_sigma = error_sigma - 2*np.pi
elif error_sigma <= -np.pi:
error_sigma = error_sigma + 2*np.pi
u = -list_aircraft[0].s*k*B.dot(error_sigma)
print(list_aircraft[0].time, " ", str(error_sigma*180.0/np.pi).replace('[','').replace(']',''))
i = 0
for ac in list_aircraft:
msga = PprzMessage("ground", "DL_SETTING")
msga['ac_id'] = ac.id
msga['index'] = ac.a_index
msga['value'] = radius + u[i]
msgb = PprzMessage("ground", "DL_SETTING")
msgb['ac_id'] = ac.id
msgb['index'] = ac.b_index
msgb['value'] = radius + u[i]
interface.send(msga)
interface.send(msgb)
i = i + 1
return
def main():
if len(sys.argv) != 6:
print("Usage: gvfFormationApp topology.txt desired_sigma.txt ids.txt radius k")
return
B = np.loadtxt(sys.argv[1])
desired_sigmas = np.loadtxt(sys.argv[2])*np.pi/180.0
ids = np.loadtxt(sys.argv[3])
radius = float(sys.argv[4])
k = float(sys.argv[5])
global list_ids
list_ids = np.ndarray.tolist(ids)
map(int, list_ids)
if np.size(ids) != np.size(B,0):
print("The ammount of aircrafts in the topology and ids does not match")
return
for i in range(0, len(ids)):
list_aircraft.append(aircraft(int(ids[i])))
# Ivy
interface.subscribe(message_recv)
for ac in list_aircraft:
settings = PaparazziACSettings(ac.id)
list_of_indexes = ['ell_a', 'ell_b']
for setting_ in list_of_indexes:
try:
index = settings.name_lookup[setting_].index
if setting_ == 'ell_a':
ac.a_index = index
if setting_ == 'ell_b':
ac.b_index = index
except Exception as e:
print(e)
print(setting_ + " setting not found, \
have you forgotten gvf.xml in your settings?")
try:
while True:
time.sleep(0.5)
formation(B, desired_sigmas, radius, k)
except KeyboardInterrupt:
return
if __name__ == '__main__':
main()
interface.shutdown()