mirror of
https://github.com/odriverobotics/ODrive.git
synced 2026-02-06 23:41:53 +08:00
634 lines
27 KiB
Python
Executable File
634 lines
27 KiB
Python
Executable File
from __future__ import print_function
|
|
|
|
import sys
|
|
import time
|
|
import threading
|
|
import platform
|
|
import subprocess
|
|
import os
|
|
from fibre.utils import Event
|
|
import odrive.enums
|
|
from odrive.enums import *
|
|
|
|
try:
|
|
if platform.system() == 'Windows':
|
|
import win32console
|
|
import colorama
|
|
colorama.init()
|
|
except ImportError:
|
|
print("Could not init terminal features.")
|
|
print("Refer to install instructions at http://docs.odriverobotics.com/#downloading-and-installing-tools")
|
|
sys.stdout.flush()
|
|
pass
|
|
|
|
if sys.version_info < (3, 0):
|
|
input = raw_input
|
|
|
|
_VT100Colors = {
|
|
'green': '\x1b[92;1m',
|
|
'cyan': '\x1b[96;1m',
|
|
'yellow': '\x1b[93;1m',
|
|
'red': '\x1b[91;1m',
|
|
'default': '\x1b[0m'
|
|
}
|
|
|
|
async def get_serial_number_str(device):
|
|
if hasattr(device, '_serial_number_property'):
|
|
return format(await device._serial_number_property.read(), 'x').upper()
|
|
else:
|
|
return "[unknown serial number]"
|
|
|
|
def get_serial_number_str_sync(device):
|
|
if hasattr(device, '_serial_number_property'):
|
|
return format(device._serial_number_property.read(), 'x').upper()
|
|
else:
|
|
return "[unknown serial number]"
|
|
|
|
def calculate_thermistor_coeffs(degree, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom = False, plot = False):
|
|
import numpy as np
|
|
T_25 = 25 + 273.15 #Kelvin
|
|
temps = np.linspace(Tmin, Tmax, 1000)
|
|
tempsK = temps + 273.15
|
|
|
|
# https://en.wikipedia.org/wiki/Thermistor#B_or_%CE%B2_parameter_equation
|
|
r_inf = R_25 * np.exp(-Beta/T_25)
|
|
R_temps = r_inf * np.exp(Beta/tempsK)
|
|
if thermistor_bottom:
|
|
V = R_temps / (Rload + R_temps)
|
|
else:
|
|
V = Rload / (Rload + R_temps)
|
|
|
|
fit = np.polyfit(V, temps, degree)
|
|
p1 = np.poly1d(fit)
|
|
fit_temps = p1(V)
|
|
|
|
if plot:
|
|
import matplotlib.pyplot as plt
|
|
print(fit)
|
|
plt.plot(V, temps, label='actual')
|
|
plt.plot(V, fit_temps, label='fit')
|
|
plt.xlabel('normalized voltage')
|
|
plt.ylabel('Temp [C]')
|
|
plt.legend(loc=0)
|
|
plt.show()
|
|
|
|
return p1
|
|
|
|
class OperationAbortedException(Exception):
|
|
pass
|
|
|
|
def set_motor_thermistor_coeffs(axis, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom = False):
|
|
coeffs = calculate_thermistor_coeffs(3, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom)
|
|
axis.motor.motor_thermistor.config.poly_coefficient_0 = float(coeffs[3])
|
|
axis.motor.motor_thermistor.config.poly_coefficient_1 = float(coeffs[2])
|
|
axis.motor.motor_thermistor.config.poly_coefficient_2 = float(coeffs[1])
|
|
axis.motor.motor_thermistor.config.poly_coefficient_3 = float(coeffs[0])
|
|
|
|
def dump_errors(odrv, clear=False, printfunc = print):
|
|
axes = [(name, getattr(odrv, name)) for name in dir(odrv) if name.startswith('axis')]
|
|
axes.sort()
|
|
|
|
def dump_errors_for_module(indent, name, obj, path, errorcodes):
|
|
prefix = indent + name.strip('0123456789') + ": "
|
|
for elem in path.split('.'):
|
|
if not hasattr(obj, elem):
|
|
printfunc(prefix + _VT100Colors['yellow'] + "not found" + _VT100Colors['default'])
|
|
return
|
|
parent = obj
|
|
obj = getattr(obj, elem)
|
|
if obj != 0:
|
|
printfunc(indent + name + ": " + _VT100Colors['red'] + "Error(s):" + _VT100Colors['default'])
|
|
for bit in range(64):
|
|
if obj & (1 << bit) != 0:
|
|
printfunc(indent + " " + errorcodes.get((1 << bit), 'UNKNOWN ERROR: 0x{:08X}'.format(1 << bit)))
|
|
else:
|
|
printfunc(indent + name + ": " + _VT100Colors['green'] + "no error" + _VT100Colors['default'])
|
|
|
|
system_error_codes = {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("ODRIVE_ERROR_")}
|
|
dump_errors_for_module("", "system", odrv, 'error', system_error_codes)
|
|
|
|
for name, axis in axes:
|
|
printfunc(name)
|
|
|
|
# Flatten axis and submodules
|
|
# (name, obj, path, errorcode)
|
|
module_decode_map = [
|
|
('axis', axis, 'error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("AXIS_ERROR_")}),
|
|
('motor', axis, 'motor.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("MOTOR_ERROR_")}),
|
|
('sensorless_estimator', axis, 'sensorless_estimator.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("SENSORLESS_ESTIMATOR_ERROR")}),
|
|
('encoder', axis, 'encoder.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("ENCODER_ERROR_")}),
|
|
('controller', axis, 'controller.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("CONTROLLER_ERROR_")}),
|
|
]
|
|
|
|
for name, obj, path, errorcodes in module_decode_map:
|
|
dump_errors_for_module(" ", name, obj, path, errorcodes)
|
|
|
|
if clear:
|
|
odrv.clear_errors()
|
|
|
|
def oscilloscope_dump(odrv, num_vals, filename='oscilloscope.csv'):
|
|
with open(filename, 'w') as f:
|
|
for x in range(num_vals):
|
|
f.write(str(odrv.oscilloscope.get_val(x)))
|
|
f.write('\n')
|
|
|
|
data_rate = 200
|
|
plot_rate = 10
|
|
num_samples = 500
|
|
def start_liveplotter(get_var_callback):
|
|
"""
|
|
Starts a liveplotter.
|
|
The variable that is plotted is retrieved from get_var_callback.
|
|
This function returns immediately and the liveplotter quits when
|
|
the user closes it.
|
|
"""
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
cancellation_token = Event()
|
|
|
|
global vals
|
|
vals = []
|
|
def fetch_data():
|
|
global vals
|
|
while not cancellation_token.is_set():
|
|
try:
|
|
data = get_var_callback()
|
|
except Exception as ex:
|
|
print(str(ex))
|
|
time.sleep(1)
|
|
continue
|
|
vals.append(data)
|
|
if len(vals) > num_samples:
|
|
vals = vals[-num_samples:]
|
|
time.sleep(1/data_rate)
|
|
|
|
# TODO: use animation for better UI performance, see:
|
|
# https://matplotlib.org/examples/animation/simple_anim.html
|
|
def plot_data():
|
|
global vals
|
|
|
|
plt.ion()
|
|
|
|
# Make sure the script terminates when the user closes the plotter
|
|
def closed(evt):
|
|
cancellation_token.set()
|
|
fig = plt.figure()
|
|
fig.canvas.mpl_connect('close_event', closed)
|
|
|
|
while not cancellation_token.is_set():
|
|
plt.clf()
|
|
plt.plot(vals)
|
|
plt.legend(list(range(len(vals))))
|
|
fig.canvas.draw()
|
|
fig.canvas.start_event_loop(1/plot_rate)
|
|
|
|
fetch_t = threading.Thread(target=fetch_data)
|
|
fetch_t.daemon = True
|
|
fetch_t.start()
|
|
|
|
plot_t = threading.Thread(target=plot_data)
|
|
plot_t.daemon = True
|
|
plot_t.start()
|
|
|
|
return cancellation_token;
|
|
#plot_data()
|
|
|
|
|
|
class BulkCapture:
|
|
'''
|
|
Asynchronously captures a bulk set of data when instance is created.
|
|
|
|
get_var_callback: a function that returns the data you want to collect (see the example below)
|
|
data_rate: Rate in hz
|
|
length: Length of time to capture in seconds
|
|
|
|
Example Usage:
|
|
capture = BulkCapture(lambda :[odrv0.axis0.encoder.pos_estimate, odrv0.axis0.controller.pos_setpoint])
|
|
# Do stuff while capturing (like sending position commands)
|
|
capture.event.wait() # When you're done doing stuff, wait for the capture to be completed.
|
|
print(capture.data) # Do stuff with the data
|
|
capture.plot_data() # Helper method to plot the data
|
|
'''
|
|
|
|
def __init__(self,
|
|
get_var_callback,
|
|
data_rate=500.0,
|
|
duration=2.0):
|
|
from threading import Event, Thread
|
|
import numpy as np
|
|
|
|
self.get_var_callback = get_var_callback
|
|
self.event = Event()
|
|
def loop():
|
|
vals = []
|
|
start_time = time.monotonic()
|
|
period = 1.0 / data_rate
|
|
while time.monotonic() - start_time < duration:
|
|
try:
|
|
data = get_var_callback()
|
|
except Exception as ex:
|
|
print(str(ex))
|
|
print("Waiting 1 second before next data point")
|
|
time.sleep(1)
|
|
continue
|
|
relative_time = time.monotonic() - start_time
|
|
vals.append([relative_time] + data)
|
|
time.sleep(period - (relative_time % period)) # this ensures consistently timed samples
|
|
self.data = np.array(vals) # A lock is not really necessary due to the event
|
|
print("Capture complete")
|
|
achieved_data_rate = len(self.data) / self.data[-1, 0]
|
|
if achieved_data_rate < (data_rate * 0.9):
|
|
print("Achieved average data rate: {}Hz".format(achieved_data_rate))
|
|
print("If this rate is significantly lower than what you specified, consider lowering it below the achieved value for more consistent sampling.")
|
|
self.event.set() # tell the main thread that the bulk capture is complete
|
|
Thread(target=loop, daemon=True).start()
|
|
|
|
def plot(self):
|
|
import matplotlib.pyplot as plt
|
|
import inspect
|
|
from textwrap import wrap
|
|
plt.plot(self.data[:,0], self.data[:,1:])
|
|
plt.xlabel("Time (seconds)")
|
|
title = (str(inspect.getsource(self.get_var_callback))
|
|
.strip("['\\n']")
|
|
.split(" = ")[1])
|
|
plt.title("\n".join(wrap(title, 60)))
|
|
plt.legend(range(self.data.shape[1]-1))
|
|
plt.show()
|
|
|
|
|
|
def step_and_plot( axis,
|
|
step_size=100.0,
|
|
settle_time=0.5,
|
|
data_rate=500.0,
|
|
ctrl_mode=CONTROL_MODE_POSITION_CONTROL):
|
|
|
|
if ctrl_mode is CONTROL_MODE_POSITION_CONTROL:
|
|
get_var_callback = lambda :[axis.encoder.pos_estimate, axis.controller.pos_setpoint]
|
|
initial_setpoint = axis.encoder.pos_estimate
|
|
def set_setpoint(setpoint):
|
|
axis.controller.pos_setpoint = setpoint
|
|
elif ctrl_mode is CONTROL_MODE_VELOCITY_CONTROL:
|
|
get_var_callback = lambda :[axis.encoder.vel_estimate, axis.controller.vel_setpoint]
|
|
initial_setpoint = 0
|
|
def set_setpoint(setpoint):
|
|
axis.controller.vel_setpoint = setpoint
|
|
else:
|
|
print("Invalid control mode")
|
|
return
|
|
|
|
initial_settle_time = 0.5
|
|
initial_control_mode = axis.controller.config.control_mode # Set it back afterwards
|
|
print(initial_control_mode)
|
|
axis.controller.config.control_mode = ctrl_mode
|
|
axis.requested_state = AXIS_STATE_CLOSED_LOOP_CONTROL
|
|
|
|
capture = BulkCapture(get_var_callback,
|
|
data_rate=data_rate,
|
|
duration=initial_settle_time + settle_time)
|
|
|
|
set_setpoint(initial_setpoint)
|
|
time.sleep(initial_settle_time)
|
|
set_setpoint(initial_setpoint + step_size) # relative/incremental movement
|
|
|
|
capture.event.wait() # wait for Bulk Capture to be complete
|
|
|
|
axis.requested_state = AXIS_STATE_IDLE
|
|
axis.controller.config.control_mode = initial_control_mode
|
|
capture.plot()
|
|
|
|
|
|
def print_drv_regs(name, motor):
|
|
"""
|
|
Dumps the current gate driver regisers for the specified motor
|
|
"""
|
|
fault = motor.gate_driver.drv_fault
|
|
status_reg_1 = motor.gate_driver.status_reg_1
|
|
status_reg_2 = motor.gate_driver.status_reg_2
|
|
ctrl_reg_1 = motor.gate_driver.ctrl_reg_1
|
|
ctrl_reg_2 = motor.gate_driver.ctrl_reg_2
|
|
print(name + ": " + str(fault))
|
|
print("DRV Fault Code: " + str(fault))
|
|
print("Status Reg 1: " + str(status_reg_1) + " (" + format(status_reg_1, '#010b') + ")")
|
|
print("Status Reg 2: " + str(status_reg_2) + " (" + format(status_reg_2, '#010b') + ")")
|
|
print("Control Reg 1: " + str(ctrl_reg_1) + " (" + format(ctrl_reg_1, '#013b') + ")")
|
|
print("Control Reg 2: " + str(ctrl_reg_2) + " (" + format(ctrl_reg_2, '#09b') + ")")
|
|
|
|
def show_oscilloscope(odrv):
|
|
size = 18000
|
|
values = []
|
|
for i in range(size):
|
|
values.append(odrv.oscilloscope.get_val(i))
|
|
|
|
import matplotlib.pyplot as plt
|
|
plt.plot(values)
|
|
plt.show()
|
|
|
|
def rate_test(device):
|
|
"""
|
|
Tests how many integers per second can be transmitted
|
|
"""
|
|
|
|
# import matplotlib.pyplot as plt
|
|
# plt.ion()
|
|
|
|
print("reading 10000 values...")
|
|
numFrames = 10000
|
|
vals = []
|
|
for _ in range(numFrames):
|
|
vals.append(device.n_evt_control_loop)
|
|
|
|
loopsPerFrame = (vals[-1] - vals[0])/numFrames
|
|
loopsPerSec = (168000000/(6*3500))
|
|
FramePerSec = loopsPerSec/loopsPerFrame
|
|
print("Frames per second: " + str(FramePerSec))
|
|
|
|
# plt.plot(vals)
|
|
# plt.show(block=True)
|
|
|
|
def usb_burn_in_test(get_var_callback, cancellation_token):
|
|
"""
|
|
Starts background threads that read a values form the USB device in a spin-loop
|
|
"""
|
|
|
|
def fetch_data():
|
|
global vals
|
|
i = 0
|
|
while not cancellation_token.is_set():
|
|
try:
|
|
get_var_callback()
|
|
i += 1
|
|
except Exception as ex:
|
|
print(str(ex))
|
|
time.sleep(1)
|
|
i = 0
|
|
continue
|
|
if i % 1000 == 0:
|
|
print("read {} values".format(i))
|
|
threading.Thread(target=fetch_data, daemon=True).start()
|
|
|
|
def yes_no_prompt(question, default=None):
|
|
if default is None:
|
|
question += " [y/n] "
|
|
elif default == True:
|
|
question += " [Y/n] "
|
|
elif default == False:
|
|
question += " [y/N] "
|
|
|
|
while True:
|
|
print(question, end='')
|
|
|
|
choice = input().lower()
|
|
if choice in {'yes', 'y'}:
|
|
return True
|
|
elif choice in {'no', 'n'}:
|
|
return False
|
|
elif choice == '' and default is not None:
|
|
return default
|
|
|
|
def dump_interrupts(odrv):
|
|
interrupts = [
|
|
(-12, "MemoryManagement_IRQn"),
|
|
(-11, "BusFault_IRQn"),
|
|
(-10, "UsageFault_IRQn"),
|
|
(-5, "SVCall_IRQn"),
|
|
(-4, "DebugMonitor_IRQn"),
|
|
(-2, "PendSV_IRQn"),
|
|
(-1, "SysTick_IRQn"),
|
|
(0, "WWDG_IRQn"),
|
|
(1, "PVD_IRQn"),
|
|
(2, "TAMP_STAMP_IRQn"),
|
|
(3, "RTC_WKUP_IRQn"),
|
|
(4, "FLASH_IRQn"),
|
|
(5, "RCC_IRQn"),
|
|
(6, "EXTI0_IRQn"),
|
|
(7, "EXTI1_IRQn"),
|
|
(8, "EXTI2_IRQn"),
|
|
(9, "EXTI3_IRQn"),
|
|
(10, "EXTI4_IRQn"),
|
|
(11, "DMA1_Stream0_IRQn"),
|
|
(12, "DMA1_Stream1_IRQn"),
|
|
(13, "DMA1_Stream2_IRQn"),
|
|
(14, "DMA1_Stream3_IRQn"),
|
|
(15, "DMA1_Stream4_IRQn"),
|
|
(16, "DMA1_Stream5_IRQn"),
|
|
(17, "DMA1_Stream6_IRQn"),
|
|
(18, "ADC_IRQn"),
|
|
(19, "CAN1_TX_IRQn"),
|
|
(20, "CAN1_RX0_IRQn"),
|
|
(21, "CAN1_RX1_IRQn"),
|
|
(22, "CAN1_SCE_IRQn"),
|
|
(23, "EXTI9_5_IRQn"),
|
|
(24, "TIM1_BRK_TIM9_IRQn"),
|
|
(25, "TIM1_UP_TIM10_IRQn"),
|
|
(26, "TIM1_TRG_COM_TIM11_IRQn"),
|
|
(27, "TIM1_CC_IRQn"),
|
|
(28, "TIM2_IRQn"),
|
|
(29, "TIM3_IRQn"),
|
|
(30, "TIM4_IRQn"),
|
|
(31, "I2C1_EV_IRQn"),
|
|
(32, "I2C1_ER_IRQn"),
|
|
(33, "I2C2_EV_IRQn"),
|
|
(34, "I2C2_ER_IRQn"),
|
|
(35, "SPI1_IRQn"),
|
|
(36, "SPI2_IRQn"),
|
|
(37, "USART1_IRQn"),
|
|
(38, "USART2_IRQn"),
|
|
(39, "USART3_IRQn"),
|
|
(40, "EXTI15_10_IRQn"),
|
|
(41, "RTC_Alarm_IRQn"),
|
|
(42, "OTG_FS_WKUP_IRQn"),
|
|
(43, "TIM8_BRK_TIM12_IRQn"),
|
|
(44, "TIM8_UP_TIM13_IRQn"),
|
|
(45, "TIM8_TRG_COM_TIM14_IRQn"),
|
|
(46, "TIM8_CC_IRQn"),
|
|
(47, "DMA1_Stream7_IRQn"),
|
|
(48, "FMC_IRQn"),
|
|
(49, "SDMMC1_IRQn"),
|
|
(50, "TIM5_IRQn"),
|
|
(51, "SPI3_IRQn"),
|
|
(52, "UART4_IRQn"),
|
|
(53, "UART5_IRQn"),
|
|
(54, "TIM6_DAC_IRQn"),
|
|
(55, "TIM7_IRQn"),
|
|
(56, "DMA2_Stream0_IRQn"),
|
|
(57, "DMA2_Stream1_IRQn"),
|
|
(58, "DMA2_Stream2_IRQn"),
|
|
(59, "DMA2_Stream3_IRQn"),
|
|
(60, "DMA2_Stream4_IRQn"),
|
|
(61, "ETH_IRQn"),
|
|
(62, "ETH_WKUP_IRQn"),
|
|
(63, "CAN2_TX_IRQn"),
|
|
(64, "CAN2_RX0_IRQn"),
|
|
(65, "CAN2_RX1_IRQn"),
|
|
(66, "CAN2_SCE_IRQn"),
|
|
(67, "OTG_FS_IRQn"),
|
|
(68, "DMA2_Stream5_IRQn"),
|
|
(69, "DMA2_Stream6_IRQn"),
|
|
(70, "DMA2_Stream7_IRQn"),
|
|
(71, "USART6_IRQn"),
|
|
(72, "I2C3_EV_IRQn"),
|
|
(73, "I2C3_ER_IRQn"),
|
|
(74, "OTG_HS_EP1_OUT_IRQn"),
|
|
(75, "OTG_HS_EP1_IN_IRQn"),
|
|
(76, "OTG_HS_WKUP_IRQn"),
|
|
(77, "OTG_HS_IRQn"),
|
|
# gap
|
|
(80, "RNG_IRQn"),
|
|
(81, "FPU_IRQn"),
|
|
(82, "UART7_IRQn"),
|
|
(83, "UART8_IRQn"),
|
|
(84, "SPI4_IRQn"),
|
|
(85, "SPI5_IRQn"),
|
|
# gap
|
|
(87, "SAI1_IRQn"),
|
|
# gap
|
|
(91, "SAI2_IRQn"),
|
|
(92, "QUADSPI_IRQn"),
|
|
(93, "LPTIM1_IRQn"),
|
|
# gap
|
|
(103, "SDMMC2_IRQn")
|
|
]
|
|
|
|
print("| # | Name | Prio | En | Count |")
|
|
print("|-----|-------------------------|------|----|---------|")
|
|
for irqn, irq_name in interrupts:
|
|
status = odrv.get_interrupt_status(irqn)
|
|
if (status != 0):
|
|
print("| {} | {} | {} | {} | {} |".format(
|
|
str(irqn).rjust(3),
|
|
irq_name.ljust(23),
|
|
str(status & 0xff).rjust(4),
|
|
" *" if (status & 0x80000000) else " ",
|
|
str((status >> 8) & 0x7fffff).rjust(7)))
|
|
|
|
def dump_threads(odrv):
|
|
prefixes = ["max_stack_usage_", "stack_size_", "prio_"]
|
|
keys = [k[len(prefix):] for k in dir(odrv.system_stats) for prefix in prefixes if k.startswith(prefix)]
|
|
good_keys = set([k for k in set(keys) if keys.count(k) == len(prefixes)])
|
|
if len(good_keys) > len(set(keys)):
|
|
print("Warning: incomplete thread information for threads {}".format(set(keys) - good_keys))
|
|
|
|
print("| Name | Stack Size [B] | Max Ever Stack Usage [B] | Prio |")
|
|
print("|---------|----------------|--------------------------|------|")
|
|
for k in sorted(good_keys):
|
|
sz = getattr(odrv.system_stats, "stack_size_" + k)
|
|
use = getattr(odrv.system_stats, "max_stack_usage_" + k)
|
|
print("| {} | {} | {} | {} |".format(
|
|
k.ljust(7),
|
|
str(sz).rjust(14),
|
|
"{} ({:.1f}%)".format(use, use / sz * 100).rjust(24),
|
|
str(getattr(odrv.system_stats, "prio_" + k)).rjust(4)
|
|
))
|
|
|
|
|
|
def dump_dma(odrv):
|
|
if odrv.hw_version_major == 3:
|
|
dma_functions = [[
|
|
# https://www.st.com/content/ccc/resource/technical/document/reference_manual/3d/6d/5a/66/b4/99/40/d4/DM00031020.pdf/files/DM00031020.pdf/jcr:content/translations/en.DM00031020.pdf Table 42
|
|
["SPI3_RX", "-", "SPI3_RX", "SPI2_RX", "SPI2_TX", "SPI3_TX", "-", "SPI3_TX"],
|
|
["I2C1_RX", "-", "TIM7_UP", "-", "TIM7_UP", "I2C1_RX", "I2C1_TX", "I2C1_TX"],
|
|
["TIM4_CH1", "-", "I2S3_EXT_RX", "TIM4_CH2", "I2S2_EXT_TX", "I2S3_EXT_TX", "TIM4_UP", "TIM4_CH3"],
|
|
["I2S3_EXT_RX", "TIM2_UP/TIM2_CH3", "I2C3_RX", "I2S2_EXT_RX", "I2C3_TX", "TIM2_CH1", "TIM2_CH2/TIM2_CH4", "TIM2_UP/TIM2_CH4"],
|
|
["UART5_RX", "USART3_RX", "UART4_RX", "USART3_TX", "UART4_TX", "USART2_RX", "USART2_TX", "UART5_TX"],
|
|
["UART8_TX", "UART7_TX", "TIM3_CH4/TIM3_UP", "UART7_RX", "TIM3_CH1/TIM3_TRIG", "TIM3_CH2", "UART8_RX", "TIM3_CH3"],
|
|
["TIM5_CH3/TIM5_UP", "TIM5_CH4/TIM5_TRIG", "TIM5_CH1", "TIM5_CH4/TIM5_TRIG", "TIM5_CH2", "-", "TIM5_UP", "-"],
|
|
["-", "TIM6_UP", "I2C2_RX", "I2C2_RX", "USART3_TX", "DAC1", "DAC2", "I2C2_TX"],
|
|
], [
|
|
# https://www.st.com/content/ccc/resource/technical/document/reference_manual/3d/6d/5a/66/b4/99/40/d4/DM00031020.pdf/files/DM00031020.pdf/jcr:content/translations/en.DM00031020.pdf Table 43
|
|
["ADC1", "SAI1_A", "TIM8_CH1/TIM8_CH2/TIM8_CH3", "SAI1_A", "ADC1", "SAI1_B", "TIM1_CH1/TIM1_CH2/TIM1_CH3", "-"],
|
|
["-", "DCMI", "ADC2", "ADC2", "SAI1_B", "SPI6_TX", "SPI6_RX", "DCMI"],
|
|
["ADC3", "ADC3", "-", "SPI5_RX", "SPI5_TX", "CRYP_OUT", "CRYP_IN", "HASH_IN"],
|
|
["SPI1_RX", "-", "SPI1_RX", "SPI1_TX", "-", "SPI1_TX", "-", "-"],
|
|
["SPI4_RX", "SPI4_TX", "USART1_RX", "SDIO", "-", "USART1_RX", "SDIO", "USART1_TX"],
|
|
["-", "USART6_RX", "USART6_RX", "SPI4_RX", "SPI4_TX", "-", "USART6_TX", "USART6_TX"],
|
|
["TIM1_TRIG", "TIM1_CH1", "TIM1_CH2", "TIM1_CH1", "TIM1_CH4/TIM1_TRIG/TIM1_COM", "TIM1_UP", "TIM1_CH3", "-"],
|
|
["-", "TIM8_UP", "TIM8_CH1", "TIM8_CH2", "TIM8_CH3", "SPI5_RX", "SPI5_TX", "TIM8_CH4/TIM8_TRIG/TIM8_COM"],
|
|
]]
|
|
elif odrv.hw_version_major == 4:
|
|
dma_functions = [[
|
|
# https://www.st.com/resource/en/reference_manual/dm00305990-stm32f72xxx-and-stm32f73xxx-advanced-armbased-32bit-mcus-stmicroelectronics.pdf Table 26
|
|
["SPI3_RX", "-", "SPI3_RX", "SPI2_RX", "SPI2_TX", "SPI3_TX", "-", "SPI3_TX"],
|
|
["I2C1_RX", "I2C3_RX", "TIM7_UP", "-", "TIM7_UP", "I2C1_RX", "I2C1_TX", "I2C1_TX"],
|
|
["TIM4_CH1", "-", "-", "TIM4_CH2", "-", "-", "TIM4_UP", "TIM4_CH3"],
|
|
["-", "TIM2_UP/TIM2_CH3", "I2C3_RX", "-", "I2C3_TX", "TIM2_CH1", "TIM2_CH2/TIM2_CH4", "TIM2_UP/TIM2_CH4"],
|
|
["UART5_RX", "USART3_RX", "UART4_RX", "USART3_TX", "UART4_TX", "USART2_RX", "USART2_TX", "UART5_TX"],
|
|
["UART8_TX", "UART7_TX", "TIM3_CH4/TIM3_UP", "UART7_RX", "TIM3_CH1/TIM3_TRIG", "TIM3_CH2", "UART8_RX", "TIM3_CH3"],
|
|
["TIM5_CH3/TIM5_UP", "TIM5_CH4/TIM5_TRIG", "TIM5_CH1", "TIM5_CH4/TIM5_TRIG", "TIM5_CH2", "-", "TIM5_UP", "-"],
|
|
["-", "TIM6_UP", "I2C2_RX", "I2C2_RX", "USART3_TX", "DAC1", "DAC2", "I2C2_TX"],
|
|
], [
|
|
# https://www.st.com/resource/en/reference_manual/dm00305990-stm32f72xxx-and-stm32f73xxx-advanced-armbased-32bit-mcus-stmicroelectronics.pdf Table 27
|
|
["ADC1", "SAI1_A", "TIM8_CH1/TIM8_CH2/TIM8_CH3", "SAI1_A", "ADC1", "SAI1_B", "TIM1_CH1/TIM1_CH2/TIM1_CH3", "SAI2_B"],
|
|
["-", "-", "ADC2", "ADC2", "SAI1_B", "-", "-", "-"],
|
|
["ADC3", "ADC3", "-", "SPI5_RX", "SPI5_TX", "AES_OUT", "AES_IN", "-"],
|
|
["SPI1_RX", "-", "SPI1_RX", "SPI1_TX", "SAI2_A", "SPI1_TX", "SAI2_B", "QUADSPI"],
|
|
["SPI4_RX", "SPI4_TX", "USART1_RX", "SDMMC1", "-", "USART1_RX", "SDMMC1", "USART1_TX"],
|
|
["-", "USART6_RX", "USART6_RX", "SPI4_RX", "SPI4_TX", "-", "USART6_TX", "USART6_TX"],
|
|
["TIM1_TRIG", "TIM1_CH1", "TIM1_CH2", "TIM1_CH1", "TIM1_CH4/TIM1_TRIG/TIM1_COM", "TIM1_UP", "TIM1_CH3", "-"],
|
|
["-", "TIM8_UP", "TIM8_CH1", "TIM8_CH2", "TIM8_CH3", "SPI5_RX", "SPI5_TX", "TIM8_CH4/TIM8_TRIG/TIM8_COM"],
|
|
None,
|
|
None,
|
|
None,
|
|
["SDMMC2", "-", "-", "-", "-", "SDMMC2", "-", "-"],
|
|
]]
|
|
|
|
print("| Name | Prio | Channel | Configured |")
|
|
print("|--------------|------|----------------------------------|------------|")
|
|
for stream_num in range(16):
|
|
status = odrv.get_dma_status(stream_num)
|
|
if (status != 0):
|
|
channel = (status >> 2) & 0x7
|
|
ch_name = dma_functions[stream_num >> 3][channel][stream_num & 0x7]
|
|
print("| DMA{}_Stream{} | {} | {} {} | {} |".format(
|
|
(stream_num >> 3) + 1,
|
|
(stream_num & 0x7),
|
|
(status & 0x3),
|
|
channel,
|
|
("(" + ch_name + ")").ljust(30),
|
|
"*" if (status & 0x80000000) else " "))
|
|
|
|
def dump_timing(odrv, n_samples=100, path='/tmp/timings.png'):
|
|
import matplotlib.pyplot as plt
|
|
import re
|
|
import numpy as np
|
|
|
|
timings = []
|
|
|
|
for attr in dir(odrv.task_times):
|
|
if not attr.startswith('_'):
|
|
timings.append((attr, getattr(odrv.task_times, attr), [], [])) # (name, obj, start_times, lengths)
|
|
for k in dir(odrv):
|
|
if re.match(r'axis[0-9]+', k):
|
|
for attr in dir(getattr(odrv, k).task_times):
|
|
if not attr.startswith('_'):
|
|
timings.append((k + '.' + attr, getattr(getattr(odrv, k).task_times, attr), [], [])) # (name, obj, start_times, lengths)
|
|
|
|
# Take a couple of samples
|
|
print("sampling...")
|
|
for i in range(n_samples):
|
|
odrv.task_timers_armed = True # Trigger sample and wait for it to finish
|
|
while odrv.task_timers_armed: pass
|
|
for name, obj, start_times, lengths in timings:
|
|
start_times.append(obj.start_time)
|
|
lengths.append(obj.length)
|
|
print("done")
|
|
|
|
# sort by start time
|
|
timings = sorted(timings, key = lambda x: np.mean(x[2]))
|
|
|
|
plt.rcParams['figure.figsize'] = 21, 9
|
|
plt.figure()
|
|
plt.grid(True)
|
|
plt.barh(
|
|
[-i for i in range(len(timings))], # y positions
|
|
[np.mean(lengths) for name, obj, start_times, lengths in timings], # lengths
|
|
left = [np.mean(start_times) for name, obj, start_times, lengths in timings], # starts
|
|
xerr = (
|
|
[np.std(lengths) for name, obj, start_times, lengths in timings], # error bars to the left side
|
|
[(min(obj.max_length, 20100) - np.mean(lengths)) for name, obj, start_times, lengths in timings], # error bars to the right side - TODO: remove artificial min()
|
|
),
|
|
tick_label = [name for name, obj, start_times, lengths in timings], # labels
|
|
)
|
|
plt.savefig(path, bbox_inches='tight')
|