mirror of
https://github.com/paparazzi/paparazzi.git
synced 2026-05-30 11:37:06 +08:00
[calib] implement the calibration button for mag_viewer (#3472)
This commit is contained in:
committed by
GitHub
parent
ab21964c1a
commit
6370ff488b
@@ -11,10 +11,12 @@ try:
|
|||||||
#raise ModuleNotFoundError()
|
#raise ModuleNotFoundError()
|
||||||
|
|
||||||
sys.path.append(utils.PAPARAZZI_SRC + "/sw/lib/python")
|
sys.path.append(utils.PAPARAZZI_SRC + "/sw/lib/python")
|
||||||
|
sys.path.append(utils.PAPARAZZI_SRC + "/sw/tools/calibration")
|
||||||
sys.path.append(utils.PAPARAZZI_HOME + "/var/lib/python") # pprzlink
|
sys.path.append(utils.PAPARAZZI_HOME + "/var/lib/python") # pprzlink
|
||||||
|
|
||||||
from pprzlink.message import PprzMessage
|
from pprzlink.message import PprzMessage
|
||||||
from pprz_connect import PprzConnect, PprzConfig
|
from pprz_connect import PprzConnect, PprzConfig
|
||||||
|
import calibration_utils
|
||||||
|
|
||||||
class MagViewer(QtWidgets.QWidget):
|
class MagViewer(QtWidgets.QWidget):
|
||||||
|
|
||||||
@@ -24,6 +26,7 @@ try:
|
|||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
|
|
||||||
self.mag_data = np.empty((0,3))
|
self.mag_data = np.empty((0,3))
|
||||||
|
self.mag_id = None
|
||||||
|
|
||||||
self.vlay = QtWidgets.QVBoxLayout(self)
|
self.vlay = QtWidgets.QVBoxLayout(self)
|
||||||
|
|
||||||
@@ -78,9 +81,9 @@ try:
|
|||||||
|
|
||||||
self.mag_combo = QtWidgets.QComboBox(self)
|
self.mag_combo = QtWidgets.QComboBox(self)
|
||||||
self.cmds_lay.addWidget(self.mag_combo)
|
self.cmds_lay.addWidget(self.mag_combo)
|
||||||
self.mag_combo.addItem("---")
|
self.mag_combo.addItem("--- Select sensor in list ---")
|
||||||
|
|
||||||
self.ivy_sub_button = QtWidgets.QPushButton("Subscribe", self)
|
self.ivy_sub_button = QtWidgets.QPushButton("Start listening data", self)
|
||||||
self.cmds_lay.addWidget(self.ivy_sub_button)
|
self.cmds_lay.addWidget(self.ivy_sub_button)
|
||||||
self.ivy_sub_button.clicked.connect(self.sub_unsub)
|
self.ivy_sub_button.clicked.connect(self.sub_unsub)
|
||||||
|
|
||||||
@@ -105,10 +108,10 @@ try:
|
|||||||
def sub_unsub(self):
|
def sub_unsub(self):
|
||||||
if self.raw_bind_id is None:
|
if self.raw_bind_id is None:
|
||||||
self.raw_bind_id = self.connect.ivy.subscribe(self.mag_raw_cb, PprzMessage("telemetry", "IMU_MAG_RAW"))
|
self.raw_bind_id = self.connect.ivy.subscribe(self.mag_raw_cb, PprzMessage("telemetry", "IMU_MAG_RAW"))
|
||||||
self.ivy_sub_button.setText("Unsubscribe")
|
self.ivy_sub_button.setText("Stop listening data")
|
||||||
else:
|
else:
|
||||||
self.connect.ivy.unsubscribe(self.raw_bind_id)
|
self.connect.ivy.unsubscribe(self.raw_bind_id)
|
||||||
self.ivy_sub_button.setText("Subscribe")
|
self.ivy_sub_button.setText("Start listening data")
|
||||||
self.raw_bind_id = None
|
self.raw_bind_id = None
|
||||||
|
|
||||||
def reset_data(self):
|
def reset_data(self):
|
||||||
@@ -124,9 +127,10 @@ try:
|
|||||||
if ac_id not in self.confs:
|
if ac_id not in self.confs:
|
||||||
return
|
return
|
||||||
ac_name = self.confs[ac_id].name
|
ac_name = self.confs[ac_id].name
|
||||||
mag_id = f"{ac_name}: {msg['id']}"
|
mag_id = int(msg['id'])
|
||||||
|
mag_label = f"{ac_name}: {mag_id}"
|
||||||
|
|
||||||
mag_idx = self.mag_combo.findText(mag_id)
|
mag_idx = self.mag_combo.findText(mag_label)
|
||||||
cur_idx = self.mag_combo.currentIndex()
|
cur_idx = self.mag_combo.currentIndex()
|
||||||
|
|
||||||
if cur_idx == mag_idx:
|
if cur_idx == mag_idx:
|
||||||
@@ -136,10 +140,10 @@ try:
|
|||||||
mag = np.array([mx, my, mz])
|
mag = np.array([mx, my, mz])
|
||||||
self.mag_sig.emit((mag_id, mag))
|
self.mag_sig.emit((mag_id, mag))
|
||||||
elif mag_idx == -1:
|
elif mag_idx == -1:
|
||||||
self.mag_combo.addItem(mag_id)
|
self.mag_combo.addItem(mag_label)
|
||||||
|
|
||||||
def update_mag(self, mag_data):
|
def update_mag(self, mag_data):
|
||||||
mid, mag = mag_data
|
self.mag_id, mag = mag_data
|
||||||
self.mag_data = np.vstack([self.mag_data, mag])
|
self.mag_data = np.vstack([self.mag_data, mag])
|
||||||
mag_unit = mag / np.linalg.norm(mag)
|
mag_unit = mag / np.linalg.norm(mag)
|
||||||
self.set_points(self.mag_data)
|
self.set_points(self.mag_data)
|
||||||
@@ -165,8 +169,19 @@ try:
|
|||||||
self.gl_widget.setCameraPosition(distance=self.mean_norm*5)
|
self.gl_widget.setCameraPosition(distance=self.mean_norm*5)
|
||||||
|
|
||||||
def calibrate(self):
|
def calibrate(self):
|
||||||
dia = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Icon.Information,"Calibration", "plop\ntoto", QtWidgets.QMessageBox.StandardButton.Ok, self)
|
calib_text = ""
|
||||||
dia.open()
|
try:
|
||||||
|
p0 = calibration_utils.get_min_max_guess(self.mag_data, 1.)
|
||||||
|
p1, msg, success = calibration_utils.optimize_calibration(self.mag_data, 1., p0)
|
||||||
|
if success in [1, 2, 3, 4]:
|
||||||
|
calib_text = calibration_utils.format_xml(p1, "MAG", self.mag_id, 11)
|
||||||
|
else:
|
||||||
|
calib_text = "calibration failed\n"+msg
|
||||||
|
except Exception as e:
|
||||||
|
calib_text = "calibration error\n"+str(e)
|
||||||
|
finally:
|
||||||
|
dia = QtWidgets.QMessageBox(QtWidgets.QMessageBox.Icon.Information,"Calibration", calib_text, QtWidgets.QMessageBox.StandardButton.Ok, self)
|
||||||
|
dia.open()
|
||||||
|
|
||||||
|
|
||||||
def set_points(self, points, color=(1, 0, 0, 1), size=5):
|
def set_points(self, points, color=(1, 0, 0, 1), size=5):
|
||||||
@@ -185,7 +200,7 @@ except ImportError:
|
|||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
self.vlay = QtWidgets.QVBoxLayout(self)
|
self.vlay = QtWidgets.QVBoxLayout(self)
|
||||||
self.error_label = QtWidgets.QLabel("module not found!")
|
self.error_label = QtWidgets.QLabel("Module not found!\nTry to install pyqtgraph")
|
||||||
self.vlay.addWidget(self.error_label)
|
self.vlay.addWidget(self.error_label)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ import sys
|
|||||||
import os
|
import os
|
||||||
from optparse import OptionParser
|
from optparse import OptionParser
|
||||||
import scipy
|
import scipy
|
||||||
from scipy import optimize
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import calibration_utils
|
import calibration_utils
|
||||||
|
|
||||||
@@ -145,28 +144,23 @@ def main():
|
|||||||
p0 = calibration_utils.get_min_max_guess(flt_meas, sensor_ref)
|
p0 = calibration_utils.get_min_max_guess(flt_meas, sensor_ref)
|
||||||
cp0, np0 = calibration_utils.scale_measurements(flt_meas, p0)
|
cp0, np0 = calibration_utils.scale_measurements(flt_meas, p0)
|
||||||
print("initial guess : avg "+str(np0.mean())+" std "+str(np0.std()))
|
print("initial guess : avg "+str(np0.mean())+" std "+str(np0.std()))
|
||||||
# print p0
|
|
||||||
|
|
||||||
def err_func(p, meas, y):
|
# optimize parameters
|
||||||
c_p, n_p = calibration_utils.scale_measurements(meas, p)
|
p1, msg, success = calibration_utils.optimize_calibration(flt_meas, sensor_ref, p0)
|
||||||
err = y*np.ones(len(meas)) - n_p
|
optimize_failed = success not in [1, 2, 3, 4]
|
||||||
return err
|
if optimize_failed:
|
||||||
|
|
||||||
p1, cov, info, msg, success = optimize.leastsq(err_func, p0[:], args=(flt_meas, sensor_ref), full_output=1)
|
|
||||||
optimze_failed = success not in [1, 2, 3, 4]
|
|
||||||
if optimze_failed:
|
|
||||||
print("Optimization error: ", msg)
|
print("Optimization error: ", msg)
|
||||||
print("Please try to provide a clean logfile with proper distribution of measurements.")
|
print("Please try to provide a clean logfile with proper distribution of measurements.")
|
||||||
#sys.exit(1)
|
#sys.exit(1)
|
||||||
|
|
||||||
cp1, np1 = calibration_utils.scale_measurements(flt_meas, p1)
|
cp1, np1 = calibration_utils.scale_measurements(flt_meas, p1)
|
||||||
|
|
||||||
if optimze_failed:
|
if optimize_failed:
|
||||||
print("last iteration of failed optimized guess : avg "+str(np1.mean())+" std "+str(np1.std()))
|
print("last iteration of failed optimized guess : avg "+str(np1.mean())+" std "+str(np1.std()))
|
||||||
else:
|
else:
|
||||||
print("optimized guess : avg "+str(np1.mean())+" std "+str(np1.std()))
|
print("optimized guess : avg "+str(np1.mean())+" std "+str(np1.std()))
|
||||||
|
|
||||||
if not optimze_failed:
|
if not optimize_failed:
|
||||||
calibration_utils.print_xml(p1, options.sensor, options.sensor_id, sensor_res)
|
calibration_utils.print_xml(p1, options.sensor, options.sensor_id, sensor_res)
|
||||||
|
|
||||||
if options.plot:
|
if options.plot:
|
||||||
|
|||||||
@@ -25,12 +25,11 @@ import re
|
|||||||
from telnetlib import theNULL
|
from telnetlib import theNULL
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy import sin, cos
|
from numpy import sin, cos
|
||||||
from scipy import linalg, stats
|
from scipy import linalg, stats, optimize
|
||||||
from fractions import Fraction
|
from fractions import Fraction
|
||||||
|
|
||||||
import matplotlib
|
import matplotlib
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
from mpl_toolkits.mplot3d import Axes3D
|
|
||||||
|
|
||||||
|
|
||||||
def get_ids_in_log(filename):
|
def get_ids_in_log(filename):
|
||||||
@@ -141,7 +140,6 @@ def get_min_max_guess(meas, scale):
|
|||||||
else:
|
else:
|
||||||
return np.array([0, 0, 0, 0])
|
return np.array([0, 0, 0, 0])
|
||||||
|
|
||||||
|
|
||||||
def scale_measurements(meas, p):
|
def scale_measurements(meas, p):
|
||||||
"""Scale the set of measurements."""
|
"""Scale the set of measurements."""
|
||||||
l_comp = []
|
l_comp = []
|
||||||
@@ -152,6 +150,15 @@ def scale_measurements(meas, p):
|
|||||||
l_norm.append(linalg.norm(sm))
|
l_norm.append(linalg.norm(sm))
|
||||||
return np.array(l_comp), np.array(l_norm)
|
return np.array(l_comp), np.array(l_norm)
|
||||||
|
|
||||||
|
def optimize_calibration(meas, ref, p0):
|
||||||
|
""" optimize parameters based on an initial guess """
|
||||||
|
def err_func(p, m, y):
|
||||||
|
c_p, n_p = scale_measurements(m, p)
|
||||||
|
err = y*np.ones(len(m)) - n_p
|
||||||
|
return err
|
||||||
|
|
||||||
|
sol, cov, info, msg, success = optimize.leastsq(err_func, p0[:], args=(meas, ref), full_output=1)
|
||||||
|
return sol, msg, success
|
||||||
|
|
||||||
def estimate_mag_current_relation(meas):
|
def estimate_mag_current_relation(meas):
|
||||||
"""Calculate linear coefficient of magnetometer-current relation."""
|
"""Calculate linear coefficient of magnetometer-current relation."""
|
||||||
@@ -186,32 +193,36 @@ def _continious_frac(v, max_val, a, x, num, den, s):
|
|||||||
else:
|
else:
|
||||||
return _continious_frac(v, max_val, a1, x1, (num2, num3), (den2, den3), s)
|
return _continious_frac(v, max_val, a1, x1, (num2, num3), (den2, den3), s)
|
||||||
|
|
||||||
def print_xml(p, sensor, sensor_id, res):
|
def format_xml(p, sensor, sensor_id, res):
|
||||||
"""Print xml for airframe file."""
|
"""Print xml for airframe file."""
|
||||||
x_sens = continious_frac(p[3]*2**res)
|
x_sens = continious_frac(p[3]*2**res)
|
||||||
y_sens = continious_frac(p[4]*2**res)
|
y_sens = continious_frac(p[4]*2**res)
|
||||||
z_sens = continious_frac(p[5]*2**res)
|
z_sens = continious_frac(p[5]*2**res)
|
||||||
|
|
||||||
print("")
|
s = f""
|
||||||
print('<define name="IMU_'+sensor+'_CALIB" type="array">')
|
s += f'<define name="IMU_{sensor}_CALIB" type="array">\n'
|
||||||
print(' <field type="struct">')
|
s += f' <field type="struct">\n'
|
||||||
print(' <field name="abi_id" value="'+sensor_id+'"/>')
|
s += f' <field name="abi_id" value="{sensor_id}"/>\n'
|
||||||
print(' <field name="calibrated" type="struct">')
|
s += f' <field name="calibrated" type="struct">\n'
|
||||||
print(' <field name="neutral" value="true"/>')
|
s += f' <field name="neutral" value="true"/>\n'
|
||||||
print(' <field name="scale" value="true"/>')
|
s += f' <field name="scale" value="true"/>\n'
|
||||||
print(' </field>')
|
s += f' </field>\n'
|
||||||
print(' <field name="neutral" value="'+str(int(round(p[0])))+','+str(int(round(p[1])))+','+str(int(round(p[2])))+'" type="int[]"/>')
|
s += f' <field name="neutral" value="{round(p[0]):d},{int(round(p[1])):d},{int(round(p[2])):d}" type="int[]"/>\n'
|
||||||
print(' <field name="scale" value="{{'+str(x_sens[0])+','+str(y_sens[0])+','+str(z_sens[0])+'},{'+str(x_sens[1])+','+str(y_sens[1])+','+str(z_sens[1])+'}}"/>')
|
s += f' <field name="scale" value="{{{{ {x_sens[0]:d}, {y_sens[0]:d}, {z_sens[0]:d} }},{{ {x_sens[1]:d}, {y_sens[1]:d}, {z_sens[1]:d} }}}}"/>\n'
|
||||||
print(' </field>')
|
s += f' </field>\n'
|
||||||
print('</define>')
|
s += f'</define>\n'
|
||||||
print("")
|
s += f'\n'
|
||||||
print("<define name=\""+sensor+"_X_NEUTRAL\" value=\""+str(int(round(p[0])))+"\"/>")
|
s += f'<define name="{sensor}_X_NEUTRAL" value="{round(p[0]):d}"/>\n'
|
||||||
print("<define name=\""+sensor+"_Y_NEUTRAL\" value=\""+str(int(round(p[1])))+"\"/>")
|
s += f'<define name="{sensor}_Y_NEUTRAL" value="{round(p[1]):d}"/>\n'
|
||||||
print("<define name=\""+sensor+"_Z_NEUTRAL\" value=\""+str(int(round(p[2])))+"\"/>")
|
s += f'<define name="{sensor}_Z_NEUTRAL" value="{round(p[2]):d}"/>\n'
|
||||||
print("<define name=\""+sensor+"_X_SENS\" value=\""+str(p[3]*2**res)+"\" integer=\"16\"/>")
|
s += f'<define name="{sensor}_X_SENS" value="{p[3]*2**res:.4f}" integer="16"/>\n'
|
||||||
print("<define name=\""+sensor+"_Y_SENS\" value=\""+str(p[4]*2**res)+"\" integer=\"16\"/>")
|
s += f'<define name="{sensor}_Y_SENS" value="{p[4]*2**res:.4f}" integer="16"/>\n'
|
||||||
print("<define name=\""+sensor+"_Z_SENS\" value=\""+str(p[5]*2**res)+"\" integer=\"16\"/>")
|
s += f'<define name="{sensor}_Z_SENS" value="{p[5]*2**res:.4f}" integer="16"/>\n'
|
||||||
|
return s
|
||||||
|
|
||||||
|
def print_xml(p, sensor, sensor_id, res):
|
||||||
|
"""Print xml for airframe file."""
|
||||||
|
print(format_xml(p, sensor, sensor_id, res))
|
||||||
|
|
||||||
def print_imu_scaled(sensor, measurements, attrs):
|
def print_imu_scaled(sensor, measurements, attrs):
|
||||||
print("")
|
print("")
|
||||||
@@ -373,10 +384,7 @@ def plot_mag_3d(measured, calibrated, p):
|
|||||||
rect_r = [left/2+0.5, bottom, width, height]
|
rect_r = [left/2+0.5, bottom, width, height]
|
||||||
|
|
||||||
fig = plt.figure(figsize=plt.figaspect(0.5))
|
fig = plt.figure(figsize=plt.figaspect(0.5))
|
||||||
if matplotlib.__version__.startswith('0'):
|
ax = fig.add_subplot(1, 2, 1, position=rect_l, projection='3d')
|
||||||
ax = Axes3D(fig, rect=rect_l)
|
|
||||||
else:
|
|
||||||
ax = fig.add_subplot(1, 2, 1, position=rect_l, projection='3d')
|
|
||||||
# plot measurements
|
# plot measurements
|
||||||
ax.scatter(mx, my, mz)
|
ax.scatter(mx, my, mz)
|
||||||
# plot line from center to ellipsoid center
|
# plot line from center to ellipsoid center
|
||||||
@@ -398,10 +406,7 @@ def plot_mag_3d(measured, calibrated, p):
|
|||||||
ax.set_ylabel('y')
|
ax.set_ylabel('y')
|
||||||
ax.set_zlabel('z')
|
ax.set_zlabel('z')
|
||||||
|
|
||||||
if matplotlib.__version__.startswith('0'):
|
ax = fig.add_subplot(1, 2, 2, position=rect_r, projection='3d')
|
||||||
ax = Axes3D(fig, rect=rect_r)
|
|
||||||
else:
|
|
||||||
ax = fig.add_subplot(1, 2, 2, position=rect_r, projection='3d')
|
|
||||||
ax.plot_wireframe(wx, wy, wz, color='grey', alpha=0.5)
|
ax.plot_wireframe(wx, wy, wz, color='grey', alpha=0.5)
|
||||||
ax.scatter(cx, cy, cz)
|
ax.scatter(cx, cy, cz)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user