Files
ODrive/tools/odrive/utils.py

634 lines
27 KiB
Python
Executable File

from __future__ import print_function
import sys
import time
import threading
import platform
import subprocess
import os
from fibre.utils import Event
import odrive.enums
from odrive.enums import *
try:
if platform.system() == 'Windows':
import win32console
import colorama
colorama.init()
except ImportError:
print("Could not init terminal features.")
print("Refer to install instructions at http://docs.odriverobotics.com/#downloading-and-installing-tools")
sys.stdout.flush()
pass
if sys.version_info < (3, 0):
input = raw_input
_VT100Colors = {
'green': '\x1b[92;1m',
'cyan': '\x1b[96;1m',
'yellow': '\x1b[93;1m',
'red': '\x1b[91;1m',
'default': '\x1b[0m'
}
async def get_serial_number_str(device):
if hasattr(device, '_serial_number_property'):
return format(await device._serial_number_property.read(), 'x').upper()
else:
return "[unknown serial number]"
def get_serial_number_str_sync(device):
if hasattr(device, '_serial_number_property'):
return format(device._serial_number_property.read(), 'x').upper()
else:
return "[unknown serial number]"
def calculate_thermistor_coeffs(degree, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom = False, plot = False):
import numpy as np
T_25 = 25 + 273.15 #Kelvin
temps = np.linspace(Tmin, Tmax, 1000)
tempsK = temps + 273.15
# https://en.wikipedia.org/wiki/Thermistor#B_or_%CE%B2_parameter_equation
r_inf = R_25 * np.exp(-Beta/T_25)
R_temps = r_inf * np.exp(Beta/tempsK)
if thermistor_bottom:
V = R_temps / (Rload + R_temps)
else:
V = Rload / (Rload + R_temps)
fit = np.polyfit(V, temps, degree)
p1 = np.poly1d(fit)
fit_temps = p1(V)
if plot:
import matplotlib.pyplot as plt
print(fit)
plt.plot(V, temps, label='actual')
plt.plot(V, fit_temps, label='fit')
plt.xlabel('normalized voltage')
plt.ylabel('Temp [C]')
plt.legend(loc=0)
plt.show()
return p1
class OperationAbortedException(Exception):
pass
def set_motor_thermistor_coeffs(axis, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom = False):
coeffs = calculate_thermistor_coeffs(3, Rload, R_25, Beta, Tmin, Tmax, thermistor_bottom)
axis.motor.motor_thermistor.config.poly_coefficient_0 = float(coeffs[3])
axis.motor.motor_thermistor.config.poly_coefficient_1 = float(coeffs[2])
axis.motor.motor_thermistor.config.poly_coefficient_2 = float(coeffs[1])
axis.motor.motor_thermistor.config.poly_coefficient_3 = float(coeffs[0])
def dump_errors(odrv, clear=False, printfunc = print):
axes = [(name, getattr(odrv, name)) for name in dir(odrv) if name.startswith('axis')]
axes.sort()
def dump_errors_for_module(indent, name, obj, path, errorcodes):
prefix = indent + name.strip('0123456789') + ": "
for elem in path.split('.'):
if not hasattr(obj, elem):
printfunc(prefix + _VT100Colors['yellow'] + "not found" + _VT100Colors['default'])
return
parent = obj
obj = getattr(obj, elem)
if obj != 0:
printfunc(indent + name + ": " + _VT100Colors['red'] + "Error(s):" + _VT100Colors['default'])
for bit in range(64):
if obj & (1 << bit) != 0:
printfunc(indent + " " + errorcodes.get((1 << bit), 'UNKNOWN ERROR: 0x{:08X}'.format(1 << bit)))
else:
printfunc(indent + name + ": " + _VT100Colors['green'] + "no error" + _VT100Colors['default'])
system_error_codes = {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("ODRIVE_ERROR_")}
dump_errors_for_module("", "system", odrv, 'error', system_error_codes)
for name, axis in axes:
printfunc(name)
# Flatten axis and submodules
# (name, obj, path, errorcode)
module_decode_map = [
('axis', axis, 'error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("AXIS_ERROR_")}),
('motor', axis, 'motor.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("MOTOR_ERROR_")}),
('sensorless_estimator', axis, 'sensorless_estimator.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("SENSORLESS_ESTIMATOR_ERROR")}),
('encoder', axis, 'encoder.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("ENCODER_ERROR_")}),
('controller', axis, 'controller.error', {v: k for k, v in odrive.enums.__dict__ .items() if k.startswith("CONTROLLER_ERROR_")}),
]
for name, obj, path, errorcodes in module_decode_map:
dump_errors_for_module(" ", name, obj, path, errorcodes)
if clear:
odrv.clear_errors()
def oscilloscope_dump(odrv, num_vals, filename='oscilloscope.csv'):
with open(filename, 'w') as f:
for x in range(num_vals):
f.write(str(odrv.oscilloscope.get_val(x)))
f.write('\n')
data_rate = 200
plot_rate = 10
num_samples = 500
def start_liveplotter(get_var_callback):
"""
Starts a liveplotter.
The variable that is plotted is retrieved from get_var_callback.
This function returns immediately and the liveplotter quits when
the user closes it.
"""
import matplotlib.pyplot as plt
cancellation_token = Event()
global vals
vals = []
def fetch_data():
global vals
while not cancellation_token.is_set():
try:
data = get_var_callback()
except Exception as ex:
print(str(ex))
time.sleep(1)
continue
vals.append(data)
if len(vals) > num_samples:
vals = vals[-num_samples:]
time.sleep(1/data_rate)
# TODO: use animation for better UI performance, see:
# https://matplotlib.org/examples/animation/simple_anim.html
def plot_data():
global vals
plt.ion()
# Make sure the script terminates when the user closes the plotter
def closed(evt):
cancellation_token.set()
fig = plt.figure()
fig.canvas.mpl_connect('close_event', closed)
while not cancellation_token.is_set():
plt.clf()
plt.plot(vals)
plt.legend(list(range(len(vals))))
fig.canvas.draw()
fig.canvas.start_event_loop(1/plot_rate)
fetch_t = threading.Thread(target=fetch_data)
fetch_t.daemon = True
fetch_t.start()
plot_t = threading.Thread(target=plot_data)
plot_t.daemon = True
plot_t.start()
return cancellation_token;
#plot_data()
class BulkCapture:
'''
Asynchronously captures a bulk set of data when instance is created.
get_var_callback: a function that returns the data you want to collect (see the example below)
data_rate: Rate in hz
length: Length of time to capture in seconds
Example Usage:
capture = BulkCapture(lambda :[odrv0.axis0.encoder.pos_estimate, odrv0.axis0.controller.pos_setpoint])
# Do stuff while capturing (like sending position commands)
capture.event.wait() # When you're done doing stuff, wait for the capture to be completed.
print(capture.data) # Do stuff with the data
capture.plot_data() # Helper method to plot the data
'''
def __init__(self,
get_var_callback,
data_rate=500.0,
duration=2.0):
from threading import Event, Thread
import numpy as np
self.get_var_callback = get_var_callback
self.event = Event()
def loop():
vals = []
start_time = time.monotonic()
period = 1.0 / data_rate
while time.monotonic() - start_time < duration:
try:
data = get_var_callback()
except Exception as ex:
print(str(ex))
print("Waiting 1 second before next data point")
time.sleep(1)
continue
relative_time = time.monotonic() - start_time
vals.append([relative_time] + data)
time.sleep(period - (relative_time % period)) # this ensures consistently timed samples
self.data = np.array(vals) # A lock is not really necessary due to the event
print("Capture complete")
achieved_data_rate = len(self.data) / self.data[-1, 0]
if achieved_data_rate < (data_rate * 0.9):
print("Achieved average data rate: {}Hz".format(achieved_data_rate))
print("If this rate is significantly lower than what you specified, consider lowering it below the achieved value for more consistent sampling.")
self.event.set() # tell the main thread that the bulk capture is complete
Thread(target=loop, daemon=True).start()
def plot(self):
import matplotlib.pyplot as plt
import inspect
from textwrap import wrap
plt.plot(self.data[:,0], self.data[:,1:])
plt.xlabel("Time (seconds)")
title = (str(inspect.getsource(self.get_var_callback))
.strip("['\\n']")
.split(" = ")[1])
plt.title("\n".join(wrap(title, 60)))
plt.legend(range(self.data.shape[1]-1))
plt.show()
def step_and_plot( axis,
step_size=100.0,
settle_time=0.5,
data_rate=500.0,
ctrl_mode=CONTROL_MODE_POSITION_CONTROL):
if ctrl_mode is CONTROL_MODE_POSITION_CONTROL:
get_var_callback = lambda :[axis.encoder.pos_estimate, axis.controller.pos_setpoint]
initial_setpoint = axis.encoder.pos_estimate
def set_setpoint(setpoint):
axis.controller.pos_setpoint = setpoint
elif ctrl_mode is CONTROL_MODE_VELOCITY_CONTROL:
get_var_callback = lambda :[axis.encoder.vel_estimate, axis.controller.vel_setpoint]
initial_setpoint = 0
def set_setpoint(setpoint):
axis.controller.vel_setpoint = setpoint
else:
print("Invalid control mode")
return
initial_settle_time = 0.5
initial_control_mode = axis.controller.config.control_mode # Set it back afterwards
print(initial_control_mode)
axis.controller.config.control_mode = ctrl_mode
axis.requested_state = AXIS_STATE_CLOSED_LOOP_CONTROL
capture = BulkCapture(get_var_callback,
data_rate=data_rate,
duration=initial_settle_time + settle_time)
set_setpoint(initial_setpoint)
time.sleep(initial_settle_time)
set_setpoint(initial_setpoint + step_size) # relative/incremental movement
capture.event.wait() # wait for Bulk Capture to be complete
axis.requested_state = AXIS_STATE_IDLE
axis.controller.config.control_mode = initial_control_mode
capture.plot()
def print_drv_regs(name, motor):
"""
Dumps the current gate driver regisers for the specified motor
"""
fault = motor.gate_driver.drv_fault
status_reg_1 = motor.gate_driver.status_reg_1
status_reg_2 = motor.gate_driver.status_reg_2
ctrl_reg_1 = motor.gate_driver.ctrl_reg_1
ctrl_reg_2 = motor.gate_driver.ctrl_reg_2
print(name + ": " + str(fault))
print("DRV Fault Code: " + str(fault))
print("Status Reg 1: " + str(status_reg_1) + " (" + format(status_reg_1, '#010b') + ")")
print("Status Reg 2: " + str(status_reg_2) + " (" + format(status_reg_2, '#010b') + ")")
print("Control Reg 1: " + str(ctrl_reg_1) + " (" + format(ctrl_reg_1, '#013b') + ")")
print("Control Reg 2: " + str(ctrl_reg_2) + " (" + format(ctrl_reg_2, '#09b') + ")")
def show_oscilloscope(odrv):
size = 18000
values = []
for i in range(size):
values.append(odrv.oscilloscope.get_val(i))
import matplotlib.pyplot as plt
plt.plot(values)
plt.show()
def rate_test(device):
"""
Tests how many integers per second can be transmitted
"""
# import matplotlib.pyplot as plt
# plt.ion()
print("reading 10000 values...")
numFrames = 10000
vals = []
for _ in range(numFrames):
vals.append(device.n_evt_control_loop)
loopsPerFrame = (vals[-1] - vals[0])/numFrames
loopsPerSec = (168000000/(6*3500))
FramePerSec = loopsPerSec/loopsPerFrame
print("Frames per second: " + str(FramePerSec))
# plt.plot(vals)
# plt.show(block=True)
def usb_burn_in_test(get_var_callback, cancellation_token):
"""
Starts background threads that read a values form the USB device in a spin-loop
"""
def fetch_data():
global vals
i = 0
while not cancellation_token.is_set():
try:
get_var_callback()
i += 1
except Exception as ex:
print(str(ex))
time.sleep(1)
i = 0
continue
if i % 1000 == 0:
print("read {} values".format(i))
threading.Thread(target=fetch_data, daemon=True).start()
def yes_no_prompt(question, default=None):
if default is None:
question += " [y/n] "
elif default == True:
question += " [Y/n] "
elif default == False:
question += " [y/N] "
while True:
print(question, end='')
choice = input().lower()
if choice in {'yes', 'y'}:
return True
elif choice in {'no', 'n'}:
return False
elif choice == '' and default is not None:
return default
def dump_interrupts(odrv):
interrupts = [
(-12, "MemoryManagement_IRQn"),
(-11, "BusFault_IRQn"),
(-10, "UsageFault_IRQn"),
(-5, "SVCall_IRQn"),
(-4, "DebugMonitor_IRQn"),
(-2, "PendSV_IRQn"),
(-1, "SysTick_IRQn"),
(0, "WWDG_IRQn"),
(1, "PVD_IRQn"),
(2, "TAMP_STAMP_IRQn"),
(3, "RTC_WKUP_IRQn"),
(4, "FLASH_IRQn"),
(5, "RCC_IRQn"),
(6, "EXTI0_IRQn"),
(7, "EXTI1_IRQn"),
(8, "EXTI2_IRQn"),
(9, "EXTI3_IRQn"),
(10, "EXTI4_IRQn"),
(11, "DMA1_Stream0_IRQn"),
(12, "DMA1_Stream1_IRQn"),
(13, "DMA1_Stream2_IRQn"),
(14, "DMA1_Stream3_IRQn"),
(15, "DMA1_Stream4_IRQn"),
(16, "DMA1_Stream5_IRQn"),
(17, "DMA1_Stream6_IRQn"),
(18, "ADC_IRQn"),
(19, "CAN1_TX_IRQn"),
(20, "CAN1_RX0_IRQn"),
(21, "CAN1_RX1_IRQn"),
(22, "CAN1_SCE_IRQn"),
(23, "EXTI9_5_IRQn"),
(24, "TIM1_BRK_TIM9_IRQn"),
(25, "TIM1_UP_TIM10_IRQn"),
(26, "TIM1_TRG_COM_TIM11_IRQn"),
(27, "TIM1_CC_IRQn"),
(28, "TIM2_IRQn"),
(29, "TIM3_IRQn"),
(30, "TIM4_IRQn"),
(31, "I2C1_EV_IRQn"),
(32, "I2C1_ER_IRQn"),
(33, "I2C2_EV_IRQn"),
(34, "I2C2_ER_IRQn"),
(35, "SPI1_IRQn"),
(36, "SPI2_IRQn"),
(37, "USART1_IRQn"),
(38, "USART2_IRQn"),
(39, "USART3_IRQn"),
(40, "EXTI15_10_IRQn"),
(41, "RTC_Alarm_IRQn"),
(42, "OTG_FS_WKUP_IRQn"),
(43, "TIM8_BRK_TIM12_IRQn"),
(44, "TIM8_UP_TIM13_IRQn"),
(45, "TIM8_TRG_COM_TIM14_IRQn"),
(46, "TIM8_CC_IRQn"),
(47, "DMA1_Stream7_IRQn"),
(48, "FMC_IRQn"),
(49, "SDMMC1_IRQn"),
(50, "TIM5_IRQn"),
(51, "SPI3_IRQn"),
(52, "UART4_IRQn"),
(53, "UART5_IRQn"),
(54, "TIM6_DAC_IRQn"),
(55, "TIM7_IRQn"),
(56, "DMA2_Stream0_IRQn"),
(57, "DMA2_Stream1_IRQn"),
(58, "DMA2_Stream2_IRQn"),
(59, "DMA2_Stream3_IRQn"),
(60, "DMA2_Stream4_IRQn"),
(61, "ETH_IRQn"),
(62, "ETH_WKUP_IRQn"),
(63, "CAN2_TX_IRQn"),
(64, "CAN2_RX0_IRQn"),
(65, "CAN2_RX1_IRQn"),
(66, "CAN2_SCE_IRQn"),
(67, "OTG_FS_IRQn"),
(68, "DMA2_Stream5_IRQn"),
(69, "DMA2_Stream6_IRQn"),
(70, "DMA2_Stream7_IRQn"),
(71, "USART6_IRQn"),
(72, "I2C3_EV_IRQn"),
(73, "I2C3_ER_IRQn"),
(74, "OTG_HS_EP1_OUT_IRQn"),
(75, "OTG_HS_EP1_IN_IRQn"),
(76, "OTG_HS_WKUP_IRQn"),
(77, "OTG_HS_IRQn"),
# gap
(80, "RNG_IRQn"),
(81, "FPU_IRQn"),
(82, "UART7_IRQn"),
(83, "UART8_IRQn"),
(84, "SPI4_IRQn"),
(85, "SPI5_IRQn"),
# gap
(87, "SAI1_IRQn"),
# gap
(91, "SAI2_IRQn"),
(92, "QUADSPI_IRQn"),
(93, "LPTIM1_IRQn"),
# gap
(103, "SDMMC2_IRQn")
]
print("| # | Name | Prio | En | Count |")
print("|-----|-------------------------|------|----|---------|")
for irqn, irq_name in interrupts:
status = odrv.get_interrupt_status(irqn)
if (status != 0):
print("| {} | {} | {} | {} | {} |".format(
str(irqn).rjust(3),
irq_name.ljust(23),
str(status & 0xff).rjust(4),
" *" if (status & 0x80000000) else " ",
str((status >> 8) & 0x7fffff).rjust(7)))
def dump_threads(odrv):
prefixes = ["max_stack_usage_", "stack_size_", "prio_"]
keys = [k[len(prefix):] for k in dir(odrv.system_stats) for prefix in prefixes if k.startswith(prefix)]
good_keys = set([k for k in set(keys) if keys.count(k) == len(prefixes)])
if len(good_keys) > len(set(keys)):
print("Warning: incomplete thread information for threads {}".format(set(keys) - good_keys))
print("| Name | Stack Size [B] | Max Ever Stack Usage [B] | Prio |")
print("|---------|----------------|--------------------------|------|")
for k in sorted(good_keys):
sz = getattr(odrv.system_stats, "stack_size_" + k)
use = getattr(odrv.system_stats, "max_stack_usage_" + k)
print("| {} | {} | {} | {} |".format(
k.ljust(7),
str(sz).rjust(14),
"{} ({:.1f}%)".format(use, use / sz * 100).rjust(24),
str(getattr(odrv.system_stats, "prio_" + k)).rjust(4)
))
def dump_dma(odrv):
if odrv.hw_version_major == 3:
dma_functions = [[
# https://www.st.com/content/ccc/resource/technical/document/reference_manual/3d/6d/5a/66/b4/99/40/d4/DM00031020.pdf/files/DM00031020.pdf/jcr:content/translations/en.DM00031020.pdf Table 42
["SPI3_RX", "-", "SPI3_RX", "SPI2_RX", "SPI2_TX", "SPI3_TX", "-", "SPI3_TX"],
["I2C1_RX", "-", "TIM7_UP", "-", "TIM7_UP", "I2C1_RX", "I2C1_TX", "I2C1_TX"],
["TIM4_CH1", "-", "I2S3_EXT_RX", "TIM4_CH2", "I2S2_EXT_TX", "I2S3_EXT_TX", "TIM4_UP", "TIM4_CH3"],
["I2S3_EXT_RX", "TIM2_UP/TIM2_CH3", "I2C3_RX", "I2S2_EXT_RX", "I2C3_TX", "TIM2_CH1", "TIM2_CH2/TIM2_CH4", "TIM2_UP/TIM2_CH4"],
["UART5_RX", "USART3_RX", "UART4_RX", "USART3_TX", "UART4_TX", "USART2_RX", "USART2_TX", "UART5_TX"],
["UART8_TX", "UART7_TX", "TIM3_CH4/TIM3_UP", "UART7_RX", "TIM3_CH1/TIM3_TRIG", "TIM3_CH2", "UART8_RX", "TIM3_CH3"],
["TIM5_CH3/TIM5_UP", "TIM5_CH4/TIM5_TRIG", "TIM5_CH1", "TIM5_CH4/TIM5_TRIG", "TIM5_CH2", "-", "TIM5_UP", "-"],
["-", "TIM6_UP", "I2C2_RX", "I2C2_RX", "USART3_TX", "DAC1", "DAC2", "I2C2_TX"],
], [
# https://www.st.com/content/ccc/resource/technical/document/reference_manual/3d/6d/5a/66/b4/99/40/d4/DM00031020.pdf/files/DM00031020.pdf/jcr:content/translations/en.DM00031020.pdf Table 43
["ADC1", "SAI1_A", "TIM8_CH1/TIM8_CH2/TIM8_CH3", "SAI1_A", "ADC1", "SAI1_B", "TIM1_CH1/TIM1_CH2/TIM1_CH3", "-"],
["-", "DCMI", "ADC2", "ADC2", "SAI1_B", "SPI6_TX", "SPI6_RX", "DCMI"],
["ADC3", "ADC3", "-", "SPI5_RX", "SPI5_TX", "CRYP_OUT", "CRYP_IN", "HASH_IN"],
["SPI1_RX", "-", "SPI1_RX", "SPI1_TX", "-", "SPI1_TX", "-", "-"],
["SPI4_RX", "SPI4_TX", "USART1_RX", "SDIO", "-", "USART1_RX", "SDIO", "USART1_TX"],
["-", "USART6_RX", "USART6_RX", "SPI4_RX", "SPI4_TX", "-", "USART6_TX", "USART6_TX"],
["TIM1_TRIG", "TIM1_CH1", "TIM1_CH2", "TIM1_CH1", "TIM1_CH4/TIM1_TRIG/TIM1_COM", "TIM1_UP", "TIM1_CH3", "-"],
["-", "TIM8_UP", "TIM8_CH1", "TIM8_CH2", "TIM8_CH3", "SPI5_RX", "SPI5_TX", "TIM8_CH4/TIM8_TRIG/TIM8_COM"],
]]
elif odrv.hw_version_major == 4:
dma_functions = [[
# https://www.st.com/resource/en/reference_manual/dm00305990-stm32f72xxx-and-stm32f73xxx-advanced-armbased-32bit-mcus-stmicroelectronics.pdf Table 26
["SPI3_RX", "-", "SPI3_RX", "SPI2_RX", "SPI2_TX", "SPI3_TX", "-", "SPI3_TX"],
["I2C1_RX", "I2C3_RX", "TIM7_UP", "-", "TIM7_UP", "I2C1_RX", "I2C1_TX", "I2C1_TX"],
["TIM4_CH1", "-", "-", "TIM4_CH2", "-", "-", "TIM4_UP", "TIM4_CH3"],
["-", "TIM2_UP/TIM2_CH3", "I2C3_RX", "-", "I2C3_TX", "TIM2_CH1", "TIM2_CH2/TIM2_CH4", "TIM2_UP/TIM2_CH4"],
["UART5_RX", "USART3_RX", "UART4_RX", "USART3_TX", "UART4_TX", "USART2_RX", "USART2_TX", "UART5_TX"],
["UART8_TX", "UART7_TX", "TIM3_CH4/TIM3_UP", "UART7_RX", "TIM3_CH1/TIM3_TRIG", "TIM3_CH2", "UART8_RX", "TIM3_CH3"],
["TIM5_CH3/TIM5_UP", "TIM5_CH4/TIM5_TRIG", "TIM5_CH1", "TIM5_CH4/TIM5_TRIG", "TIM5_CH2", "-", "TIM5_UP", "-"],
["-", "TIM6_UP", "I2C2_RX", "I2C2_RX", "USART3_TX", "DAC1", "DAC2", "I2C2_TX"],
], [
# https://www.st.com/resource/en/reference_manual/dm00305990-stm32f72xxx-and-stm32f73xxx-advanced-armbased-32bit-mcus-stmicroelectronics.pdf Table 27
["ADC1", "SAI1_A", "TIM8_CH1/TIM8_CH2/TIM8_CH3", "SAI1_A", "ADC1", "SAI1_B", "TIM1_CH1/TIM1_CH2/TIM1_CH3", "SAI2_B"],
["-", "-", "ADC2", "ADC2", "SAI1_B", "-", "-", "-"],
["ADC3", "ADC3", "-", "SPI5_RX", "SPI5_TX", "AES_OUT", "AES_IN", "-"],
["SPI1_RX", "-", "SPI1_RX", "SPI1_TX", "SAI2_A", "SPI1_TX", "SAI2_B", "QUADSPI"],
["SPI4_RX", "SPI4_TX", "USART1_RX", "SDMMC1", "-", "USART1_RX", "SDMMC1", "USART1_TX"],
["-", "USART6_RX", "USART6_RX", "SPI4_RX", "SPI4_TX", "-", "USART6_TX", "USART6_TX"],
["TIM1_TRIG", "TIM1_CH1", "TIM1_CH2", "TIM1_CH1", "TIM1_CH4/TIM1_TRIG/TIM1_COM", "TIM1_UP", "TIM1_CH3", "-"],
["-", "TIM8_UP", "TIM8_CH1", "TIM8_CH2", "TIM8_CH3", "SPI5_RX", "SPI5_TX", "TIM8_CH4/TIM8_TRIG/TIM8_COM"],
None,
None,
None,
["SDMMC2", "-", "-", "-", "-", "SDMMC2", "-", "-"],
]]
print("| Name | Prio | Channel | Configured |")
print("|--------------|------|----------------------------------|------------|")
for stream_num in range(16):
status = odrv.get_dma_status(stream_num)
if (status != 0):
channel = (status >> 2) & 0x7
ch_name = dma_functions[stream_num >> 3][channel][stream_num & 0x7]
print("| DMA{}_Stream{} | {} | {} {} | {} |".format(
(stream_num >> 3) + 1,
(stream_num & 0x7),
(status & 0x3),
channel,
("(" + ch_name + ")").ljust(30),
"*" if (status & 0x80000000) else " "))
def dump_timing(odrv, n_samples=100, path='/tmp/timings.png'):
import matplotlib.pyplot as plt
import re
import numpy as np
timings = []
for attr in dir(odrv.task_times):
if not attr.startswith('_'):
timings.append((attr, getattr(odrv.task_times, attr), [], [])) # (name, obj, start_times, lengths)
for k in dir(odrv):
if re.match(r'axis[0-9]+', k):
for attr in dir(getattr(odrv, k).task_times):
if not attr.startswith('_'):
timings.append((k + '.' + attr, getattr(getattr(odrv, k).task_times, attr), [], [])) # (name, obj, start_times, lengths)
# Take a couple of samples
print("sampling...")
for i in range(n_samples):
odrv.task_timers_armed = True # Trigger sample and wait for it to finish
while odrv.task_timers_armed: pass
for name, obj, start_times, lengths in timings:
start_times.append(obj.start_time)
lengths.append(obj.length)
print("done")
# sort by start time
timings = sorted(timings, key = lambda x: np.mean(x[2]))
plt.rcParams['figure.figsize'] = 21, 9
plt.figure()
plt.grid(True)
plt.barh(
[-i for i in range(len(timings))], # y positions
[np.mean(lengths) for name, obj, start_times, lengths in timings], # lengths
left = [np.mean(start_times) for name, obj, start_times, lengths in timings], # starts
xerr = (
[np.std(lengths) for name, obj, start_times, lengths in timings], # error bars to the left side
[(min(obj.max_length, 20100) - np.mean(lengths)) for name, obj, start_times, lengths in timings], # error bars to the right side - TODO: remove artificial min()
),
tick_label = [name for name, obj, start_times, lengths in timings], # labels
)
plt.savefig(path, bbox_inches='tight')