mirror of
https://github.com/PX4/PX4-Autopilot.git
synced 2026-05-28 10:46:33 +08:00
More fixes for Python 3 compatibility (#13008)
* More fixes for Python 3 compatibility * Workaround if the six module is not pip installed * Lose the semicolons
This commit is contained in:
committed by
Julian Oes
parent
2a848c365c
commit
6dc55f97d4
@@ -211,7 +211,7 @@ class NX_task(object):
|
||||
def waiting_for(self):
|
||||
"""return a description of what the task is waiting for, if it is waiting"""
|
||||
if self._state_is('TSTATE_WAIT_SEM'):
|
||||
try:
|
||||
try:
|
||||
waitsem = self._tcb['waitsem'].dereference()
|
||||
waitsem_holder = waitsem['holder']
|
||||
holder = NX_task.for_tcb(waitsem_holder['htcb'])
|
||||
@@ -234,8 +234,8 @@ class NX_task(object):
|
||||
@property
|
||||
def is_runnable(self):
|
||||
"""tests whether the task is runnable"""
|
||||
if (self._state_is('TSTATE_TASK_PENDING') or
|
||||
self._state_is('TSTATE_TASK_READYTORUN') or
|
||||
if (self._state_is('TSTATE_TASK_PENDING') or
|
||||
self._state_is('TSTATE_TASK_READYTORUN') or
|
||||
self._state_is('TSTATE_TASK_RUNNING')):
|
||||
return True
|
||||
return False
|
||||
@@ -271,7 +271,7 @@ class NX_task(object):
|
||||
|
||||
def __str__(self):
|
||||
return "{}:{}".format(self.pid, self.name)
|
||||
|
||||
|
||||
def showoff(self):
|
||||
print("-------")
|
||||
print(self.pid,end = ", ")
|
||||
@@ -282,7 +282,7 @@ class NX_task(object):
|
||||
print(self._tcb['adj_stack_size'],end = ", ")
|
||||
print(self.file_descriptors)
|
||||
print(self.registers)
|
||||
|
||||
|
||||
def __format__(self, format_spec):
|
||||
return format_spec.format(
|
||||
pid = self.pid,
|
||||
@@ -294,7 +294,7 @@ class NX_task(object):
|
||||
file_descriptors = self.file_descriptors,
|
||||
registers = self.registers
|
||||
)
|
||||
|
||||
|
||||
class NX_show_task (gdb.Command):
|
||||
"""(NuttX) prints information about a task"""
|
||||
|
||||
@@ -391,7 +391,7 @@ class NX_show_interrupted_thread (gdb.Command):
|
||||
|
||||
def invoke(self, args, from_tty):
|
||||
regs = gdb.lookup_global_symbol('current_regs').value()
|
||||
if regs is 0:
|
||||
if regs == 0:
|
||||
raise gdb.GdbError('not in interrupt context')
|
||||
else:
|
||||
registers = NX_register_set.with_xcpt_regs(regs)
|
||||
@@ -409,7 +409,7 @@ class NX_check_tcb(gdb.Command):
|
||||
""" check the tcb of a task from a address """
|
||||
def __init__(self):
|
||||
super(NX_check_tcb,self).__init__('show tcb', gdb.COMMAND_USER)
|
||||
|
||||
|
||||
def invoke(self,args,sth):
|
||||
tasks = NX_task.tasks()
|
||||
print("tcb int: ",int(args))
|
||||
@@ -425,29 +425,29 @@ NX_check_tcb()
|
||||
class NX_tcb(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
def is_in(self,arg,list):
|
||||
for i in list:
|
||||
if arg == i:
|
||||
return True;
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def find_tcb_list(self,dq_entry_t):
|
||||
tcb_list = []
|
||||
tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
|
||||
first_tcb = tcb_ptr.dereference()
|
||||
tcb_list.append(first_tcb);
|
||||
tcb_list.append(first_tcb)
|
||||
next_tcb = first_tcb['flink'].dereference()
|
||||
while not self.is_in(parse_int(next_tcb['pid']),[parse_int(t['pid']) for t in tcb_list]):
|
||||
tcb_list.append(next_tcb);
|
||||
old_tcb = next_tcb;
|
||||
tcb_list.append(next_tcb)
|
||||
old_tcb = next_tcb
|
||||
next_tcb = old_tcb['flink'].dereference()
|
||||
|
||||
|
||||
return [t for t in tcb_list if parse_int(t['pid'])<2000]
|
||||
|
||||
|
||||
def getTCB(self):
|
||||
list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
|
||||
tcb_list = [];
|
||||
tcb_list = []
|
||||
for l in list_of_listsnames:
|
||||
li = gdb.lookup_global_symbol(l)
|
||||
print(li)
|
||||
@@ -456,49 +456,49 @@ class NX_tcb(object):
|
||||
|
||||
class NX_check_stack_order(gdb.Command):
|
||||
""" Check the Stack order corresponding to the tasks """
|
||||
|
||||
|
||||
def __init__(self):
|
||||
super(NX_check_stack_order,self).__init__('show check_stack', gdb.COMMAND_USER)
|
||||
|
||||
|
||||
def is_in(self,arg,list):
|
||||
for i in list:
|
||||
if arg == i:
|
||||
return True;
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def find_tcb_list(self,dq_entry_t):
|
||||
tcb_list = []
|
||||
tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
|
||||
first_tcb = tcb_ptr.dereference()
|
||||
tcb_list.append(first_tcb);
|
||||
tcb_list.append(first_tcb)
|
||||
next_tcb = first_tcb['flink'].dereference()
|
||||
while not self.is_in(parse_int(next_tcb['pid']),[parse_int(t['pid']) for t in tcb_list]):
|
||||
tcb_list.append(next_tcb);
|
||||
old_tcb = next_tcb;
|
||||
tcb_list.append(next_tcb)
|
||||
old_tcb = next_tcb
|
||||
next_tcb = old_tcb['flink'].dereference()
|
||||
|
||||
|
||||
return [t for t in tcb_list if parse_int(t['pid'])<2000]
|
||||
|
||||
|
||||
def getTCB(self):
|
||||
list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
|
||||
tcb_list = [];
|
||||
tcb_list = []
|
||||
for l in list_of_listsnames:
|
||||
li = gdb.lookup_global_symbol(l)
|
||||
cursor = li.value()['head']
|
||||
tcb_list = tcb_list + self.find_tcb_list(cursor)
|
||||
return tcb_list
|
||||
|
||||
|
||||
def getSPfromTask(self,tcb):
|
||||
regmap = NX_register_set.v7em_regmap
|
||||
a =tcb['xcp']['regs']
|
||||
return parse_int(a[regmap['SP']])
|
||||
|
||||
|
||||
def find_closest(self,list,val):
|
||||
tmp_list = [abs(i-val) for i in list]
|
||||
tmp_min = min(tmp_list)
|
||||
idx = tmp_list.index(tmp_min)
|
||||
return idx,list[idx]
|
||||
|
||||
|
||||
def find_next_stack(self,address,_dict_in):
|
||||
add_list = []
|
||||
name_list = []
|
||||
@@ -510,28 +510,28 @@ class NX_check_stack_order(gdb.Command):
|
||||
name_list.append(self.check_name(key)+"_SP")
|
||||
else:
|
||||
name_list.append(self.check_name(key))
|
||||
|
||||
|
||||
idx,new_address = self.find_closest(add_list,address)
|
||||
return new_address,name_list[idx]
|
||||
|
||||
|
||||
def check_name(self,name):
|
||||
if isinstance(name,(list)):
|
||||
name = name[0];
|
||||
name = name[0]
|
||||
idx = name.find("\\")
|
||||
newname = name[:idx]
|
||||
|
||||
|
||||
return newname
|
||||
|
||||
|
||||
def invoke(self,args,sth):
|
||||
tcb = self.getTCB();
|
||||
stackadresses={};
|
||||
tcb = self.getTCB()
|
||||
stackadresses={}
|
||||
for t in tcb:
|
||||
p = [];
|
||||
p = []
|
||||
#print(t.name,t._tcb['stack_alloc_ptr'])
|
||||
p.append(parse_int(t['stack_alloc_ptr']))
|
||||
p.append(parse_int(t['adj_stack_ptr']))
|
||||
p.append(self.getSPfromTask(t))
|
||||
stackadresses[str(t['name'])] = p;
|
||||
stackadresses[str(t['name'])] = p
|
||||
address = int("0x30000000",0)
|
||||
print("stack address : process")
|
||||
for i in range(len(stackadresses)*3):
|
||||
@@ -539,12 +539,12 @@ class NX_check_stack_order(gdb.Command):
|
||||
print(hex(address),": ",name)
|
||||
|
||||
NX_check_stack_order()
|
||||
|
||||
|
||||
class NX_run_debug_util(gdb.Command):
|
||||
""" show the registers of a task corresponding to a tcb address"""
|
||||
def __init__(self):
|
||||
super(NX_run_debug_util,self).__init__('show regs', gdb.COMMAND_USER)
|
||||
|
||||
|
||||
def printRegisters(self,task):
|
||||
regmap = NX_register_set.v7em_regmap
|
||||
a =task._tcb['xcp']['regs']
|
||||
@@ -553,14 +553,14 @@ class NX_run_debug_util(gdb.Command):
|
||||
hex_addr= hex(int(a[regmap[reg]]))
|
||||
eval_string = 'info line *'+str(hex_addr)
|
||||
print(reg,": ",hex_addr,)
|
||||
|
||||
|
||||
def getPCfromTask(self,task):
|
||||
regmap = NX_register_set.v7em_regmap
|
||||
a =task._tcb['xcp']['regs']
|
||||
return hex(int(a[regmap['PC']]))
|
||||
|
||||
|
||||
def invoke(self,args,sth):
|
||||
tasks = NX_task.tasks()
|
||||
tasks = NX_task.tasks()
|
||||
if args == '':
|
||||
for t in tasks:
|
||||
self.printRegisters(t)
|
||||
@@ -568,56 +568,56 @@ class NX_run_debug_util(gdb.Command):
|
||||
print("this is the location in code where the current threads $pc is:")
|
||||
gdb.execute(eval_str)
|
||||
else:
|
||||
tcb_nr = int(args);
|
||||
tcb_nr = int(args)
|
||||
print("tcb_nr = ",tcb_nr)
|
||||
t = tasks[tcb_nr]
|
||||
self.printRegisters(t)
|
||||
eval_str = "list *"+str(self.getPCfromTask(t))
|
||||
print("this is the location in code where the current threads $pc is:")
|
||||
gdb.execute(eval_str)
|
||||
|
||||
|
||||
NX_run_debug_util()
|
||||
|
||||
|
||||
|
||||
class NX_search_tcb(gdb.Command):
|
||||
""" shot PID's of all running tasks """
|
||||
|
||||
|
||||
def __init__(self):
|
||||
super(NX_search_tcb,self).__init__('show alltcb', gdb.COMMAND_USER)
|
||||
|
||||
|
||||
def is_in(self,arg,list):
|
||||
for i in list:
|
||||
if arg == i:
|
||||
return True;
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def find_tcb_list(self,dq_entry_t):
|
||||
tcb_list = []
|
||||
tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
|
||||
first_tcb = tcb_ptr.dereference()
|
||||
tcb_list.append(first_tcb);
|
||||
tcb_list.append(first_tcb)
|
||||
next_tcb = first_tcb['flink'].dereference()
|
||||
while not self.is_in(parse_int(next_tcb['pid']),[parse_int(t['pid']) for t in tcb_list]):
|
||||
tcb_list.append(next_tcb);
|
||||
old_tcb = next_tcb;
|
||||
tcb_list.append(next_tcb)
|
||||
old_tcb = next_tcb
|
||||
next_tcb = old_tcb['flink'].dereference()
|
||||
|
||||
|
||||
return [t for t in tcb_list if parse_int(t['pid'])<2000]
|
||||
|
||||
|
||||
def invoke(self,args,sth):
|
||||
list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
|
||||
tasks = [];
|
||||
tasks = []
|
||||
for l in list_of_listsnames:
|
||||
li = gdb.lookup_global_symbol(l)
|
||||
cursor = li.value()['head']
|
||||
tasks = tasks + self.find_tcb_list(cursor)
|
||||
|
||||
|
||||
# filter for tasks that are listed twice
|
||||
tasks_filt = {}
|
||||
for t in tasks:
|
||||
pid = parse_int(t['pid']);
|
||||
pid = parse_int(t['pid'])
|
||||
if not pid in tasks_filt.keys():
|
||||
tasks_filt[pid] = t['name'];
|
||||
tasks_filt[pid] = t['name']
|
||||
print('{num_t} Tasks found:'.format(num_t = len(tasks_filt)))
|
||||
for pid in tasks_filt.keys():
|
||||
print("PID: ",pid," ",tasks_filt[pid])
|
||||
@@ -626,36 +626,36 @@ NX_search_tcb()
|
||||
|
||||
|
||||
class NX_my_bt(gdb.Command):
|
||||
""" 'fake' backtrace: backtrace the stack of a process and check every suspicious address for the list
|
||||
""" 'fake' backtrace: backtrace the stack of a process and check every suspicious address for the list
|
||||
arg: tcb_address$
|
||||
(can easily be found by typing 'showtask').
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
super(NX_my_bt,self).__init__('show mybt', gdb.COMMAND_USER)
|
||||
|
||||
|
||||
def readmem(self,addr):
|
||||
'''
|
||||
read memory at addr and return nr
|
||||
'''
|
||||
'''
|
||||
str_to_eval = "x/x "+hex(addr)
|
||||
resp = gdb.execute(str_to_eval,to_string = True)
|
||||
idx = resp.find('\t')
|
||||
return int(resp[idx:],16)
|
||||
|
||||
|
||||
def is_in_bounds(self,val):
|
||||
lower_bound = int("08004000",16)
|
||||
upper_bound = int("080ae0c0",16);
|
||||
upper_bound = int("080ae0c0",16)
|
||||
#print(lower_bound," ",val," ",upper_bound)
|
||||
if val>lower_bound and val<upper_bound:
|
||||
return True;
|
||||
return True
|
||||
else:
|
||||
return False;
|
||||
return False
|
||||
def get_tcb_from_address(self,addr):
|
||||
addr_value = gdb.Value(addr)
|
||||
tcb_ptr = addr_value.cast(gdb.lookup_type('struct tcb_s').pointer())
|
||||
return tcb_ptr.dereference()
|
||||
|
||||
|
||||
def resolve_file_line_func(self,addr,stack_percentage):
|
||||
gdb.write(str(round(stack_percentage,2))+":")
|
||||
str_to_eval = "info line *"+hex(addr)
|
||||
@@ -670,7 +670,7 @@ class NX_my_bt(gdb.Command):
|
||||
if str(func) == "None":
|
||||
func = block.superblock.function
|
||||
return words[3].strip('"'), line, func
|
||||
|
||||
|
||||
def invoke(self,args,sth):
|
||||
try:
|
||||
addr_dec = parse_int(args) # Trying to interpret the input as TCB address
|
||||
@@ -687,9 +687,9 @@ class NX_my_bt(gdb.Command):
|
||||
curr_sp = parse_int(_tcb['xcp']['regs'][0]) #curr stack pointer
|
||||
other_sp = parse_int(_tcb['xcp']['regs'][8]) # other stack pointer
|
||||
stacksize = parse_int(_tcb['adj_stack_size']) # other stack pointer
|
||||
|
||||
|
||||
print("tasks current SP = ",hex(curr_sp),"stack max ptr is at ",hex(up_stack))
|
||||
|
||||
|
||||
item = 0
|
||||
for sp in range(other_sp if curr_sp == up_stack else curr_sp, up_stack, 4):
|
||||
mem = self.readmem(sp)
|
||||
@@ -700,5 +700,5 @@ class NX_my_bt(gdb.Command):
|
||||
filename,line,func = self.resolve_file_line_func(mem, stack_percentage)
|
||||
print('#%-2d ' % item, '0x%08x in ' % mem, func, ' at ', filename, ':', line, sep='')
|
||||
item += 1
|
||||
|
||||
|
||||
NX_my_bt()
|
||||
|
||||
@@ -25,14 +25,11 @@ from __future__ import print_function
|
||||
|
||||
import sys
|
||||
|
||||
from six.moves import input
|
||||
|
||||
from kconfiglib import Symbol, Choice, BOOL, TRISTATE, HEX, standard_kconfig
|
||||
|
||||
|
||||
# Python 2/3 compatibility hack
|
||||
if sys.version_info[0] < 3:
|
||||
input = raw_input
|
||||
|
||||
|
||||
# Note: Used as the entry point in setup.py
|
||||
def _main():
|
||||
# Earlier symbols in Kconfig files might depend on later symbols and become
|
||||
|
||||
Reference in New Issue
Block a user