diff --git a/tools/gdb/thread.py b/tools/gdb/thread.py index 4e8ba981f34..e1d7f653fb4 100644 --- a/tools/gdb/thread.py +++ b/tools/gdb/thread.py @@ -548,13 +548,60 @@ class Ps(gdb.Command): self.parse_and_show_info(tcb) +class DeadLock(gdb.Command): + def __init__(self): + super(DeadLock, self).__init__("deadlock", gdb.COMMAND_USER) + + def has_deadlock(self, pid): + """Check if the thread has a deadlock""" + tcb = utils.get_tcb(pid) + if not tcb or not tcb["waitobj"]: + return False + + sem = tcb["waitobj"].cast(utils.lookup_type("sem_t").pointer()) + if not sem["flags"] & SEM_TYPE_MUTEX: + return False + + # It's waiting on a mutex + mutex = tcb["waitobj"].cast(utils.lookup_type("mutex_t").pointer()) + holder = mutex["holder"] + if holder in self.holders: + return True + + self.holders.append(holder) + return self.has_deadlock(holder) + + def invoke(self, args, from_tty): + detected = [] + for tcb in utils.get_tcbs(): + self.holders = [] # Holders for this tcb + pid = tcb["pid"] + if pid in detected or not self.has_deadlock(tcb["pid"]): + continue + + # Deadlock detected + detected.append(pid) + detected.extend(self.holders) + gdb.write(f'Thread {pid} "{utils.get_task_name(tcb)}" has deadlocked!\n') + + gdb.write(f" holders: {pid}->") + gdb.write("->".join(str(pid) for pid in self.holders)) + gdb.write("\n") + + if not detected: + gdb.write("No deadlock detected.") + + gdb.write("\n") + + def register_commands(): + DeadLock() SetRegs() Ps() - Nxinfothreads() - Nxthread() Nxcontinue() + Nxinfothreads() Nxstep() + Nxthread() # Use custom command for thread if current target does not support it. # The recognized threads count is less than or equal to the number of CPUs in this case. diff --git a/tools/gdb/utils.py b/tools/gdb/utils.py index ea3aa481759..619f2d4a7d7 100644 --- a/tools/gdb/utils.py +++ b/tools/gdb/utils.py @@ -24,7 +24,6 @@ import re from typing import List import gdb - from macros import fetch_macro_info, try_expand g_symbol_cache = {} @@ -543,3 +542,22 @@ def get_tcbs(): npidhash = gdb.parse_and_eval("g_npidhash") return [pidhash[i] for i in range(0, npidhash) if pidhash[i]] + + +def get_tcb(pid): + """get tcb from pid""" + g_pidhash = gdb.parse_and_eval("g_pidhash") + g_npidhash = gdb.parse_and_eval("g_npidhash") + tcb = g_pidhash[pid & (g_npidhash - 1)] + if not tcb or pid != tcb["pid"]: + return None + + return tcb + + +def get_task_name(tcb): + try: + name = tcb["name"].cast(gdb.lookup_type("char").pointer()) + return name.string() + except gdb.error: + return ""