GP-4209: GhidraTime-MSTTD integration. Type hints for (most) Python agents.

This commit is contained in:
Dan
2025-03-24 18:28:07 +00:00
parent deb49d5322
commit 21a1602579
93 changed files with 6453 additions and 4118 deletions
@@ -15,4 +15,5 @@ src/main/py/LICENSE||GHIDRA||||END|
src/main/py/MANIFEST.in||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidralldb/py.typed||GHIDRA||||END|
src/main/py/src/ghidralldb/schema.xml||GHIDRA||||END|
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ghidralldb"
version = "11.3"
version = "11.4"
authors = [
{ name="Ghidra Development Team" },
]
@@ -17,9 +17,12 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==11.3",
"ghidratrace==11.4",
]
[project.urls]
"Homepage" = "https://github.com/NationalSecurityAgency/ghidra"
"Bug Tracker" = "https://github.com/NationalSecurityAgency/ghidra/issues"
[tool.setuptools.package-data]
ghidralldb = ["py.typed"]
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, List, Optional, Tuple
from ghidratrace.client import Address, RegVal
import lldb
@@ -20,8 +21,9 @@ from . import util
# NOTE: This map is derived from the ldefs using a script
language_map = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
language_map: Dict[str, List[str]] = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'armv4': ['ARM:BE:32:v4', 'ARM:LE:32:v4'],
'armv4t': ['ARM:BE:32:v4t', 'ARM:LE:32:v4t'],
@@ -50,8 +52,10 @@ language_map = {
'thumbv7em': ['ARM:BE:32:Cortex', 'ARM:LE:32:Cortex'],
'armv8': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'armv8l': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'arm64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'arm64e': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'arm64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm64e': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm64_32': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'mips': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
'mipsr2': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
@@ -102,8 +106,11 @@ language_map = {
'hexagon': [],
'hexagonv4': [],
'hexagonv5': [],
'riscv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC', 'RISCV:LE:32:RV32I', 'RISCV:LE:32:RV32IC', 'RISCV:LE:32:RV32IMC', 'RISCV:LE:32:default'],
'riscv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC', 'RISCV:LE:64:RV64I', 'RISCV:LE:64:RV64IC', 'RISCV:LE:64:default'],
'riscv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC', 'RISCV:LE:32:RV32I',
'RISCV:LE:32:RV32IC', 'RISCV:LE:32:RV32IMC',
'RISCV:LE:32:default'],
'riscv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC', 'RISCV:LE:64:RV64I',
'RISCV:LE:64:RV64IC', 'RISCV:LE:64:default'],
'unknown-mach-32': ['DATA:LE:32:default', 'DATA:LE:32:default'],
'unknown-mach-64': ['DATA:LE:64:default', 'DATA:LE:64:default'],
'arc': [],
@@ -111,19 +118,20 @@ language_map = {
'wasm32': ['x86:LE:32:default'],
}
data64_compiler_map = {
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
x86_compiler_map = {
x86_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
'Cygwin': 'windows',
'linux' : 'gcc',
'linux': 'gcc',
'default': 'gcc',
'unknown': 'gcc',
None: 'gcc',
}
default_compiler_map = {
default_compiler_map: Dict[Optional[str], str] = {
'freebsd': 'gcc',
'linux': 'gcc',
'netbsd': 'gcc',
@@ -138,7 +146,7 @@ default_compiler_map = {
'unknown': 'default',
}
compiler_map = {
compiler_map: Dict[str, Dict[Optional[str], str]] = {
'DATA:BE:64:': data64_compiler_map,
'DATA:LE:64:': data64_compiler_map,
'x86:LE:32:': x86_compiler_map,
@@ -148,7 +156,7 @@ compiler_map = {
}
def find_host_triple():
def find_host_triple() -> str:
dbg = util.get_debugger()
for i in range(dbg.GetNumPlatforms()):
platform = dbg.GetPlatformAtIndex(i)
@@ -157,19 +165,19 @@ def find_host_triple():
return 'unrecognized'
def find_triple():
def find_triple() -> str:
triple = util.get_target().triple
if triple is not None:
return triple
return find_host_triple()
def get_arch():
def get_arch() -> str:
triple = find_triple()
return triple.split('-')[0]
def get_endian():
def get_endian() -> str:
parm = util.get_convenience_variable('endian')
if parm != 'auto':
return parm
@@ -183,7 +191,7 @@ def get_endian():
return 'unrecognized'
def get_osabi():
def get_osabi() -> str:
parm = util.get_convenience_variable('osabi')
if not parm in ['auto', 'default']:
return parm
@@ -195,7 +203,7 @@ def get_osabi():
return triple.split('-')[2]
def compute_ghidra_language():
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = util.get_convenience_variable('ghidra-language')
if lang != 'auto':
@@ -223,37 +231,33 @@ def compute_ghidra_language():
return 'DATA' + lebe + '64:default'
def compute_ghidra_compiler(lang):
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = util.get_convenience_variable('ghidra-compiler')
if comp != 'auto':
return comp
# Check if the selected lang has specific compiler recommendations
matched_lang = sorted(
(l for l in compiler_map if l in lang),
key=lambda l: compiler_map[l]
)
if len(matched_lang) == 0:
# NOTE: Unlike other agents, we put prefixes in map keys
matches = [l for l in compiler_map if lang.startswith(l)]
if len(matches) == 0:
print(f"{lang} not found in compiler map - using default compiler")
return 'default'
comp_map = compiler_map[matched_lang[0]]
comp_map = compiler_map[matches[0]]
if comp_map == data64_compiler_map:
print(f"Using the DATA64 compiler map")
osabi = get_osabi()
if osabi in comp_map:
return comp_map[osabi]
if lang.startswith("x86:"):
print(f"{osabi} not found in compiler map - using gcc")
return 'gcc'
if None in comp_map:
return comp_map[None]
def_comp = comp_map[None]
print(f"{osabi} not found in compiler map - using {def_comp} compiler")
return def_comp
print(f"{osabi} not found in compiler map - using default compiler")
return 'default'
def compute_ghidra_lcsp():
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
@@ -261,10 +265,10 @@ def compute_ghidra_lcsp():
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, proc: lldb.SBProcess, offset: int):
def map(self, proc: lldb.SBProcess, offset: int) -> Tuple[str, Address]:
space = self.defaultSpace
return self.defaultSpace, Address(space, offset)
@@ -277,10 +281,10 @@ class DefaultMemoryMapper(object):
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers = {}
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang):
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
@@ -288,31 +292,31 @@ def compute_memory_mapper(lang):
class DefaultRegisterMapper(object):
def __init__(self, byte_order):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
self.byte_order = byte_order
self.union_winners = {}
def map_name(self, proc, name):
def map_name(self, proc: lldb.SBProcess, name: str) -> str:
return name
def map_value(self, proc, name, value):
def map_value(self, proc: lldb.SBProcess, name: str, value: bytes) -> RegVal:
return RegVal(self.map_name(proc, name), value)
def map_name_back(self, proc, name):
def map_name_back(self, proc: lldb.SBProcess, name: str) -> str:
return name
def map_value_back(self, proc, name, value):
def map_value_back(self, proc: lldb.SBProcess, name: str,
value: bytes) -> RegVal:
return RegVal(self.map_name_back(proc, name), value)
class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
def __init__(self):
def __init__(self) -> None:
super().__init__('little')
def map_name(self, proc, name):
def map_name(self, proc: lldb.SBProcess, name: str) -> str:
if name is None:
return 'UNKNOWN'
if name == 'eflags':
@@ -322,26 +326,27 @@ class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
return 'ymm' + name[3:]
return super().map_name(proc, name)
def map_value(self, proc, name, value):
def map_value(self, proc: lldb.SBProcess, name: str, value: bytes) -> RegVal:
rv = super().map_value(proc, name, value)
if rv.name.startswith('ymm') and len(rv.value) > 32:
return RegVal(rv.name, rv.value[-32:])
return rv
def map_name_back(self, proc, name):
def map_name_back(self, proc: lldb.SBProcess, name: str) -> str:
if name == 'rflags':
return 'eflags'
return super().map_name_back(proc, name)
DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big')
DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little')
register_mappers = {
register_mappers: Dict[str, DefaultRegisterMapper] = {
'x86:LE:64:default': Intel_x86_64_RegisterMapper()
}
def compute_register_mapper(lang):
def compute_register_mapper(lang: str) -> DefaultRegisterMapper:
if not lang in register_mappers:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER
File diff suppressed because it is too large Load Diff
@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from dataclasses import dataclass, field
import threading
import time
from typing import Any, Optional, Union
import lldb
@@ -24,34 +26,41 @@ from . import commands, util
ALL_EVENTS = 0xFFFF
@dataclass(frozen=False)
class HookState(object):
__slots__ = ('installed', 'mem_catchpoint')
installed = False
def __init__(self):
def __init__(self) -> None:
self.installed = False
self.mem_catchpoint = None
@dataclass(frozen=False)
class ProcessState(object):
__slots__ = ('first', 'regions', 'modules', 'threads',
'breaks', 'watches', 'visited')
first = True
# For things we can detect changes to between stops
regions = False
modules = False
threads = False
breaks = False
watches = False
# For frames and threads that have already been synced since last stop
visited: set[Any] = field(default_factory=set)
def __init__(self):
def __init__(self) -> None:
self.first = True
# For things we can detect changes to between stops
self.regions = False
self.modules = False
self.threads = False
self.breaks = False
self.watches = False
# For frames and threads that have already been synced since last stop
self.visited = set()
def record(self, description=None):
def record(self, description: Optional[str] = None) -> None:
first = self.first
self.first = False
trace = commands.STATE.require_trace()
if description is not None:
commands.STATE.trace.snapshot(description)
trace.snapshot(description)
if first:
commands.put_processes()
commands.put_environment()
@@ -121,7 +130,8 @@ class QuitSentinel(object):
QUIT = QuitSentinel()
def process_event(self, listener, event):
def process_event(self, listener: lldb.SBListener,
event: lldb.SBEvent) -> Union[QuitSentinel, bool]:
try:
desc = util.get_description(event)
# print(f"Event: {desc}")
@@ -130,7 +140,7 @@ def process_event(self, listener, event):
# LLDB may crash on event.GetBroadcasterClass, otherwise
# All the checks below, e.g. SBTarget.EventIsTargetEvent, call this
print(f"Ignoring {desc} because target is invalid")
return
return False
event_process = util.get_process()
if event_process.IsValid() and event_process.GetProcessID() not in PROC_STATE:
PROC_STATE[event_process.GetProcessID()] = ProcessState()
@@ -260,13 +270,14 @@ def process_event(self, listener, event):
return True
except BaseException as e:
print(e)
return False
class EventThread(threading.Thread):
func = process_event
event = lldb.SBEvent()
def run(self):
def run(self) -> None:
# Let's only try at most 4 times to retrieve any kind of event.
# After that, the thread exits.
listener = lldb.SBListener('eventlistener')
@@ -365,40 +376,40 @@ class EventThread(threading.Thread):
"""
def on_new_process(event):
def on_new_process(event: lldb.SBEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(event.process.num)):
with trace.client.batch():
with trace.open_tx(f"New Process {event.process.num}"):
commands.put_processes() # TODO: Could put just the one....
def on_process_selected():
def on_process_selected() -> None:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Process {} selected".format(proc.GetProcessID())):
with trace.client.batch():
with trace.open_tx(f"Process {proc.GetProcessID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.activate()
def on_process_deleted(event):
def on_process_deleted(event: lldb.SBEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
if event.process.num in PROC_STATE:
del PROC_STATE[event.process.num]
with commands.STATE.client.batch():
with trace.open_tx("Process {} deleted".format(event.process.num)):
with trace.client.batch():
with trace.open_tx(f"Process {event.process.num} deleted"):
commands.put_processes() # TODO: Could just delete the one....
def on_new_thread(event):
def on_new_thread(event: lldb.SBEvent) -> None:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
@@ -406,224 +417,237 @@ def on_new_thread(event):
# TODO: Syscall clone/exit to detect thread destruction?
def on_thread_selected():
def on_thread_selected() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
t = util.selected_thread()
with commands.STATE.client.batch():
with trace.open_tx("Thread {}.{} selected".format(proc.GetProcessID(), t.GetThreadID())):
with trace.client.batch():
with trace.open_tx(f"Thread {proc.GetProcessID()}.{t.GetThreadID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.put_threads()
commands.activate()
return True
def on_frame_selected():
def on_frame_selected() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
f = util.selected_frame()
t = f.GetThread()
with commands.STATE.client.batch():
with trace.open_tx("Frame {}.{}.{} selected".format(proc.GetProcessID(), t.GetThreadID(), f.GetFrameID())):
with trace.client.batch():
with trace.open_tx(f"Frame {proc.GetProcessID()}.{t.GetThreadID()}.{f.GetFrameID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.put_threads()
commands.put_frames()
commands.activate()
return True
def on_syscall_memory():
def on_syscall_memory() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
PROC_STATE[proc.GetProcessID()].regions = True
return True
def on_memory_changed(event):
def on_memory_changed(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Memory *0x{:08x} changed".format(event.address)):
return False
with trace.client.batch():
with trace.open_tx(f"Memory *0x{event.address:08x} changed"):
commands.put_bytes(event.address, event.address + event.length,
pages=False, is_mi=False, result=None)
pages=False, result=None)
return True
def on_register_changed(event):
# print("Register changed: {}".format(dir(event)))
def on_register_changed(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
# I'd rather have a descriptor!
# TODO: How do I get the descriptor from the number?
# For now, just record the lot
with commands.STATE.client.batch():
with trace.open_tx("Register {} changed".format(event.regnum)):
return False
with trace.client.batch():
with trace.open_tx(f"Register {event.regnum} changed"):
banks = event.frame.GetRegisters()
commands.putreg(
event.frame, banks.GetFirstValueByName(commands.DEFAULT_REGISTER_BANK))
return True
def on_cont(event):
def on_cont(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
state = PROC_STATE[proc.GetProcessID()]
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Continued"):
state.record_continued()
return True
def on_stop(event):
def on_stop(event: lldb.SBEvent) -> bool:
proc = lldb.SBProcess.GetProcessFromEvent(
event) if event is not None else util.get_process()
if proc.GetProcessID() not in PROC_STATE:
print("not in state")
return
return False
trace = commands.STATE.trace
if trace is None:
print("no trace")
return
return False
state = PROC_STATE[proc.GetProcessID()]
state.visited.clear()
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Stopped"):
state.record("Stopped")
commands.put_event_thread()
commands.put_threads()
commands.put_frames()
commands.activate()
return True
def on_exited(event):
def on_exited(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
state = PROC_STATE[proc.GetProcessID()]
state.visited.clear()
exit_code = proc.GetExitStatus()
description = "Exited with code {}".format(exit_code)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx(description):
state.record(description)
state.record_exited(exit_code)
commands.put_event_thread()
commands.activate()
return False
def modules_changed():
def modules_changed() -> bool:
# Assumption: affects the current process
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
PROC_STATE[proc.GetProcessID()].modules = True
return True
def on_new_objfile(event):
def on_new_objfile(event: lldb.SBEvent) -> bool:
modules_changed()
return True
def on_free_objfile(event):
def on_free_objfile(event: lldb.SBEvent) -> bool:
modules_changed()
return True
def on_breakpoint_created(b):
def on_breakpoint_created(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} created".format(b.GetID())):
commands.put_single_breakpoint(b, proc)
return True
def on_breakpoint_modified(b):
def on_breakpoint_modified(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} modified".format(b.GetID())):
commands.put_single_breakpoint(b, proc)
return True
def on_breakpoint_deleted(b):
def on_breakpoint_deleted(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
bpt_path = commands.PROC_BREAK_PATTERN.format(
procnum=proc.GetProcessID(), breaknum=b.GetID())
bpt_obj = trace.proxy_object_path(bpt_path)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Breakpoint {} deleted".format(b.GetID())):
bpt_obj.remove(tree=True)
return True
def on_watchpoint_created(b):
def on_watchpoint_created(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} created".format(b.GetID())):
commands.put_single_watchpoint(b, proc)
return True
def on_watchpoint_modified(b):
def on_watchpoint_modified(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Watchpoint {} modified".format(b.GetID())):
commands.put_single_watchpoint(b, proc)
return True
def on_watchpoint_deleted(b):
def on_watchpoint_deleted(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
wpt_path = commands.PROC_WATCH_PATTERN.format(
procnum=proc.GetProcessID(), watchnum=b.GetID())
wpt_obj = trace.proxy_object_path(wpt_path)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Watchpoint {} deleted".format(b.GetID())):
wpt_obj.remove(tree=True)
return True
def install_hooks():
def install_hooks() -> None:
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
@@ -632,18 +656,18 @@ def install_hooks():
event_thread.start()
def remove_hooks():
def remove_hooks() -> None:
if not HOOK_STATE.installed:
return
HOOK_STATE.installed = False
def enable_current_process():
def enable_current_process() -> None:
proc = util.get_process()
PROC_STATE[proc.GetProcessID()] = ProcessState()
def disable_current_process():
def disable_current_process() -> None:
proc = util.get_process()
if proc.GetProcessID() in PROC_STATE:
# Silently ignore already disabled
File diff suppressed because it is too large Load Diff
@@ -14,17 +14,24 @@
# limitations under the License.
##
from collections import namedtuple
from dataclasses import dataclass
import os
import re
import sys
from typing import Any, Dict, List, Optional, Union
import lldb
LldbVersion = namedtuple('LldbVersion', ['display', 'full', 'major', 'minor'])
@dataclass(frozen=True)
class LldbVersion:
display: str
full: str
major: int
minor: int
def _compute_lldb_ver():
def _compute_lldb_ver() -> LldbVersion:
blurb = lldb.debugger.GetVersionString()
top = blurb.split('\n')[0]
if ' version ' in top:
@@ -40,12 +47,15 @@ LLDB_VERSION = _compute_lldb_ver()
GNU_DEBUGDATA_PREFIX = ".gnu_debugdata for "
class Module(namedtuple('BaseModule', ['name', 'base', 'max', 'sections'])):
pass
@dataclass
class Section:
name: str
start: int
end: int
offset: int
attrs: List[str]
class Section(namedtuple('BaseSection', ['name', 'start', 'end', 'offset', 'attrs'])):
def better(self, other):
def better(self, other: 'Section') -> 'Section':
start = self.start if self.start != 0 else other.start
end = self.end if self.end != 0 else other.end
offset = self.offset if self.offset != 0 else other.offset
@@ -54,18 +64,17 @@ class Section(namedtuple('BaseSection', ['name', 'start', 'end', 'offset', 'attr
return Section(self.name, start, end, offset, list(attrs))
@dataclass(frozen=True)
class Module:
name: str
base: int
max: int
sections: Dict[str, Section]
# AFAICT, Objfile does not give info about load addresses :(
class ModuleInfoReader(object):
def name_from_line(self, line):
mat = self.objfile_pattern.fullmatch(line)
if mat is None:
return None
n = mat['name']
if n.startswith(GNU_DEBUGDATA_PREFIX):
return None
return None if mat is None else mat['name']
def section_from_sbsection(self, s):
def section_from_sbsection(self, s: lldb.SBSection) -> Section:
start = s.GetLoadAddress(get_target())
if start >= sys.maxsize*2:
start = 0
@@ -75,7 +84,7 @@ class ModuleInfoReader(object):
attrs = s.GetPermissions()
return Section(name, start, end, offset, attrs)
def finish_module(self, name, sections):
def finish_module(self, name: str, sections: Dict[str, Section]) -> Module:
alloc = {k: s for k, s in sections.items()}
if len(alloc) == 0:
return Module(name, 0, 0, alloc)
@@ -91,10 +100,10 @@ class ModuleInfoReader(object):
max_addr = max(s.end for s in alloc.values())
return Module(name, base_addr, max_addr, alloc)
def get_modules(self):
def get_modules(self) -> Dict[str, Module]:
modules = {}
name = None
sections = {}
sections: Dict[str, Section] = {}
for i in range(0, get_target().GetNumModules()):
module = get_target().GetModuleAtIndex(i)
fspec = module.GetFileSpec()
@@ -108,19 +117,24 @@ class ModuleInfoReader(object):
return modules
def _choose_module_info_reader():
def _choose_module_info_reader() -> ModuleInfoReader:
return ModuleInfoReader()
MODULE_INFO_READER = _choose_module_info_reader()
class Region(namedtuple('BaseRegion', ['start', 'end', 'offset', 'perms', 'objfile'])):
pass
@dataclass
class Region:
start: int
end: int
offset: int
perms: Optional[str]
objfile: str
class RegionInfoReader(object):
def region_from_sbmemreg(self, info):
def region_from_sbmemreg(self, info: lldb.SBMemoryRegionInfo) -> Region:
start = info.GetRegionBase()
end = info.GetRegionEnd()
offset = info.GetRegionBase()
@@ -136,7 +150,7 @@ class RegionInfoReader(object):
objfile = info.GetName()
return Region(start, end, offset, perms, objfile)
def get_regions(self):
def get_regions(self) -> List[Region]:
regions = []
reglist = get_process().GetMemoryRegions()
for i in range(0, reglist.GetSize()):
@@ -148,7 +162,7 @@ class RegionInfoReader(object):
regions.append(r)
return regions
def full_mem(self):
def full_mem(self) -> Region:
# TODO: This may not work for Harvard architectures
try:
sizeptr = int(parse_and_eval('sizeof(void*)')) * 8
@@ -157,7 +171,7 @@ class RegionInfoReader(object):
return Region(0, 1 << 64, 0, None, 'full memory')
def _choose_region_info_reader():
def _choose_region_info_reader() -> RegionInfoReader:
return RegionInfoReader()
@@ -169,69 +183,70 @@ BREAK_PATTERN = re.compile('')
BREAK_LOC_PATTERN = re.compile('')
class BreakpointLocation(namedtuple('BaseBreakpointLocation', ['address', 'enabled', 'thread_groups'])):
pass
class BreakpointLocationInfoReader(object):
def get_locations(self, breakpoint):
def get_locations(self, breakpoint: lldb.SBBreakpoint) -> List[
lldb.SBBreakpointLocation]:
return breakpoint.locations
def _choose_breakpoint_location_info_reader():
def _choose_breakpoint_location_info_reader() -> BreakpointLocationInfoReader:
return BreakpointLocationInfoReader()
BREAKPOINT_LOCATION_INFO_READER = _choose_breakpoint_location_info_reader()
def get_debugger():
def get_debugger() -> lldb.SBDebugger:
return lldb.SBDebugger.FindDebuggerWithID(1)
def get_target():
def get_target() -> lldb.SBTarget:
return get_debugger().GetTargetAtIndex(0)
def get_process():
def get_process() -> lldb.SBProcess:
return get_target().GetProcess()
def selected_thread():
def selected_thread() -> lldb.SBThread:
return get_process().GetSelectedThread()
def selected_frame():
def selected_frame() -> lldb.SBFrame:
return selected_thread().GetSelectedFrame()
def parse_and_eval(expr, signed=False):
def parse_and_eval(expr: str, signed: bool = False) -> int:
if signed is True:
return get_eval(expr).GetValueAsSigned()
return get_eval(expr).GetValueAsUnsigned()
def get_eval(expr):
def get_eval(expr: str) -> lldb.SBValue:
eval = get_target().EvaluateExpression(expr)
if eval.GetError().Fail():
raise ValueError(eval.GetError().GetCString())
return eval
def get_description(object, level=None):
def get_description(object: Union[
lldb.SBThread, lldb.SBBreakpoint, lldb.SBWatchpoint, lldb.SBEvent],
level: Optional[int] = None) -> str:
stream = lldb.SBStream()
if level is None:
object.GetDescription(stream)
else:
elif isinstance(object, lldb.SBWatchpoint):
object.GetDescription(stream, level)
else:
raise ValueError(f"Object {object} does not support description level")
return escape_ansi(stream.GetData())
conv_map = {}
conv_map: Dict[str, str] = {}
def get_convenience_variable(id):
#val = get_target().GetEnvironment().Get(id)
def get_convenience_variable(id: str) -> str:
# val = get_target().GetEnvironment().Get(id)
if id not in conv_map:
return "auto"
val = conv_map[id]
@@ -240,21 +255,21 @@ def get_convenience_variable(id):
return val
def set_convenience_variable(id, value):
#env = get_target().GetEnvironment()
def set_convenience_variable(id: str, value: str) -> None:
# env = get_target().GetEnvironment()
# return env.Set(id, value, True)
conv_map[id] = value
def escape_ansi(line):
def escape_ansi(line: str) -> str:
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', line)
def debracket(init):
val = init
def debracket(init: Optional[str]) -> str:
if init is None:
return ""
val = init
val = val.replace("[", "(")
val = val.replace("]", ")")
return val