Merge remote-tracking branch 'origin/GP-4209_Dan_ttdIntegration--SQUASHED'

This commit is contained in:
Ryan Kurtz
2025-03-24 15:14:13 -04:00
93 changed files with 6453 additions and 4118 deletions
@@ -17,5 +17,6 @@ src/main/py/MANIFEST.in||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidradbg/dbgmodel/DbgModel.idl||GHIDRA||||END|
src/main/py/src/ghidradbg/py.typed||GHIDRA||||END|
src/main/py/src/ghidradbg/schema.xml||GHIDRA||||END|
src/main/py/src/ghidradbg/schema_exdi.xml||GHIDRA||||END|
@@ -53,10 +53,12 @@ def main():
print("dbgeng requires a target trace - please try again.")
cmd.ghidra_trace_disconnect()
return
cmd.ghidra_trace_open(target, start_trace=False)
# TODO: HACK
# Also, the wait() must precede sync_enable() or else PROC_STATE will
# contain the wrong PID, and later events will get snuffed
try:
dbg.wait()
except KeyboardInterrupt as ki:
@@ -64,8 +66,9 @@ def main():
cmd.ghidra_trace_start(target)
cmd.ghidra_trace_sync_enable()
on_state_changed(DbgEng.DEBUG_CES_EXECUTION_STATUS, DbgEng.DEBUG_STATUS_BREAK)
on_state_changed(DbgEng.DEBUG_CES_EXECUTION_STATUS,
DbgEng.DEBUG_STATUS_BREAK)
cmd.repl()
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ghidradbg"
version = "11.3"
version = "11.4"
authors = [
{ name="Ghidra Development Team" },
]
@@ -17,7 +17,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==11.3",
"ghidratrace==11.4",
"pybag>=2.2.12"
]
@@ -26,7 +26,7 @@ dependencies = [
"Bug Tracker" = "https://github.com/NationalSecurityAgency/ghidra/issues"
[tool.setuptools.package-data]
ghidradbg = ["*.tlb"]
ghidradbg = ["*.tlb", "py.typed"]
[tool.setuptools]
include-package-data = true
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# NOTE: libraries must precede EVERYTHING, esp pybag and DbgMod
@@ -13,13 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, List, Optional, Tuple
from ghidratrace.client import Address, RegVal
from pybag import pydbg
from . import util
language_map = {
language_map: Dict[str, List[str]] = {
'AARCH64': ['AARCH64:LE:64:AppleSilicon'],
'ARM': ['ARM:LE:32:v8'],
'Itanium': [],
@@ -31,25 +33,25 @@ language_map = {
'SH4': ['SuperH4:LE:32:default'],
}
data64_compiler_map = {
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
x86_compiler_map = {
x86_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
'Cygwin': 'windows',
'default': 'windows',
}
default_compiler_map = {
default_compiler_map: Dict[Optional[str], str] = {
'windows': 'default',
}
windows_compiler_map = {
windows_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
}
compiler_map = {
compiler_map : Dict[str, Dict[Optional[str], str]]= {
'DATA:BE:64:default': data64_compiler_map,
'DATA:LE:64:default': data64_compiler_map,
'x86:LE:32:default': x86_compiler_map,
@@ -62,11 +64,11 @@ compiler_map = {
}
def get_arch():
def get_arch() -> str:
try:
type = util.dbg.get_actual_processor_type()
except Exception:
print("Error getting actual processor type.")
except Exception as e:
print(f"Error getting actual processor type: {e}")
return "Unknown"
if type is None:
return "x86_64"
@@ -76,25 +78,25 @@ def get_arch():
return "AARCH64"
if type == 0x014c:
return "x86"
if type == 0x0160: # R3000 BE
if type == 0x0160: # R3000 BE
return "MIPS-BE"
if type == 0x0162: # R3000 LE
if type == 0x0162: # R3000 LE
return "MIPS"
if type == 0x0166: # R4000 LE
if type == 0x0166: # R4000 LE
return "MIPS"
if type == 0x0168: # R10000 LE
if type == 0x0168: # R10000 LE
return "MIPS"
if type == 0x0169: # WCE v2 LE
if type == 0x0169: # WCE v2 LE
return "MIPS"
if type == 0x0266: # MIPS 16
if type == 0x0266: # MIPS 16
return "MIPS"
if type == 0x0366: # MIPS FPU
if type == 0x0366: # MIPS FPU
return "MIPS"
if type == 0x0466: # MIPS FPU16
if type == 0x0466: # MIPS FPU16
return "MIPS"
if type == 0x0184: # Alpha AXP
if type == 0x0184: # Alpha AXP
return "Alpha"
if type == 0x0284: # Aplha 64
if type == 0x0284: # Aplha 64
return "Alpha"
if type >= 0x01a2 and type < 0x01a6:
return "SH"
@@ -102,17 +104,17 @@ def get_arch():
return "SH4"
if type == 0x01a6:
return "SH5"
if type == 0x01c0: # ARM LE
if type == 0x01c0: # ARM LE
return "ARM"
if type == 0x01c2: # ARM Thumb/Thumb-2 LE
if type == 0x01c2: # ARM Thumb/Thumb-2 LE
return "ARM"
if type == 0x01c4: # ARM Thumb-2 LE
if type == 0x01c4: # ARM Thumb-2 LE
return "ARM"
if type == 0x01d3: # AM33
if type == 0x01d3: # AM33
return "ARM"
if type == 0x01f0 or type == 0x1f1: # PPC
if type == 0x01f0 or type == 0x1f1: # PPC
return "PPC"
if type == 0x0200:
if type == 0x0200:
return "Itanium"
if type == 0x0520:
return "Infineon"
@@ -120,23 +122,23 @@ def get_arch():
return "CEF"
if type == 0x0EBC:
return "EFI"
if type == 0x8664: # AMD64 (K8)
if type == 0x8664: # AMD64 (K8)
return "x86_64"
if type == 0x9041: # M32R
if type == 0x9041: # M32R
return "M32R"
if type == 0xC0EE:
return "CEE"
return "Unknown"
def get_endian():
def get_endian() -> str:
parm = util.get_convenience_variable('endian')
if parm != 'auto':
return parm
return 'little'
def get_osabi():
def get_osabi() -> str:
parm = util.get_convenience_variable('osabi')
if not parm in ['auto', 'default']:
return parm
@@ -150,7 +152,7 @@ def get_osabi():
return "windows"
def compute_ghidra_language():
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = util.get_convenience_variable('ghidra-language')
if lang != 'auto':
@@ -175,7 +177,7 @@ def compute_ghidra_language():
return 'DATA' + lebe + '64:default'
def compute_ghidra_compiler(lang):
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = util.get_convenience_variable('ghidra-compiler')
if comp != 'auto':
@@ -197,7 +199,7 @@ def compute_ghidra_compiler(lang):
return 'default'
def compute_ghidra_lcsp():
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
@@ -205,10 +207,10 @@ def compute_ghidra_lcsp():
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, proc: int, offset: int):
def map(self, proc: int, offset: int) -> Tuple[str, Address]:
space = self.defaultSpace
return self.defaultSpace, Address(space, offset)
@@ -220,10 +222,10 @@ class DefaultMemoryMapper(object):
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers = {}
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang):
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
@@ -231,16 +233,15 @@ def compute_memory_mapper(lang):
class DefaultRegisterMapper(object):
def __init__(self, byte_order):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
self.byte_order = byte_order
self.union_winners = {}
def map_name(self, proc, name):
def map_name(self, proc: int, name: str):
return name
def map_value(self, proc, name, value):
def map_value(self, proc: int, name: str, value: int):
try:
# TODO: this seems half-baked
av = value.to_bytes(8, "big")
@@ -249,10 +250,10 @@ class DefaultRegisterMapper(object):
.format(name, value, type(value)))
return RegVal(self.map_name(proc, name), av)
def map_name_back(self, proc, name):
def map_name_back(self, proc: int, name: str) -> str:
return name
def map_value_back(self, proc, name, value):
def map_value_back(self, proc: int, name: str, value: bytes):
return RegVal(self.map_name_back(proc, name), value)
File diff suppressed because it is too large Load Diff
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
import os
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
@@ -44,7 +44,7 @@ class ModelIterator(object):
next = (self._index, mo.ModelObject(object))
self._index += 1
return next
index = mo.ModelObject(indexer)
ival = index.GetIntrinsicValue()
if ival is None:
@@ -37,9 +37,8 @@ class ModelMethod(object):
metadata = POINTER(DbgMod.IKeyStore)()
try:
self._method.Call(byref(object), argcount, byref(arguments),
byref(result), byref(metadata))
byref(result), byref(metadata))
except COMError as ce:
return None
return mo.ModelObject(result)
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
from enum import Enum
@@ -221,10 +221,10 @@ class ModelObject(object):
return None
self.dconcept = StringDisplayableConcept(dconcept)
return self.dconcept.ToDisplayString(self)
# This does NOT work - returns a null pointer for value. Why?
# One possibility: casting is not a valid way to obtain an IModelMethod
#
#
# def ToDisplayString0(self):
# map = self.GetAttributes()
# method = map["ToDisplayString"]
@@ -338,11 +338,10 @@ class ModelObject(object):
next = map[element]
else:
next = next.GetKeyValue(element)
#if next is None:
# if next is None:
# print(f"{element} not found")
return next
def GetValue(self):
value = self.GetIntrinsicValue()
if value is None:
@@ -350,4 +349,3 @@ class ModelObject(object):
if value.vt == 0xd:
return None
return value.value
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ctypes import *
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ghidradbg import arch, commands, util
from ghidratrace import sch
from ghidratrace.client import Client, Address, AddressRange, TraceObject
from ghidratrace.client import Client, Address, AddressRange, Trace, TraceObject
PAGE_SIZE = 4096
from ghidradbg import arch, commands, util
SESSION_PATH = 'Sessions[0]'
PROCESSES_PATH = SESSION_PATH + '.ExdiProcesses'
@@ -42,106 +42,97 @@ SECTIONS_ADD_PATTERN = '.Sections'
SECTION_KEY_PATTERN = '[{secname}]'
SECTION_ADD_PATTERN = SECTIONS_ADD_PATTERN + SECTION_KEY_PATTERN
@util.dbg.eng_thread
def ghidra_trace_put_processes_exdi():
"""
Put the list of processes into the trace's processes list.
"""
def ghidra_trace_put_processes_exdi() -> None:
"""Put the list of processes into the trace's processes list."""
radix = util.get_convenience_variable('output-radix')
commands.STATE.require_tx()
with commands.STATE.client.batch() as b:
put_processes_exdi(commands.STATE, radix)
trace, tx = commands.STATE.require_tx()
with trace.client.batch() as b:
put_processes_exdi(trace, radix)
@util.dbg.eng_thread
def ghidra_trace_put_regions_exdi():
"""
Read the memory map, if applicable, and write to the trace's Regions
"""
def ghidra_trace_put_regions_exdi() -> None:
"""Read the memory map, if applicable, and write to the trace's Regions."""
commands.STATE.require_tx()
with commands.STATE.client.batch() as b:
put_regions_exdi(commands.STATE)
trace, tx = commands.STATE.require_tx()
with trace.client.batch() as b:
put_regions_exdi(trace)
@util.dbg.eng_thread
def ghidra_trace_put_kmodules_exdi():
"""
Gather object files, if applicable, and write to the trace's Modules
"""
def ghidra_trace_put_kmodules_exdi() -> None:
"""Gather object files, if applicable, and write to the trace's Modules."""
commands.STATE.require_tx()
with commands.STATE.client.batch() as b:
put_kmodules_exdi(commands.STATE)
trace, tx = commands.STATE.require_tx()
with trace.client.batch() as b:
put_kmodules_exdi(trace)
@util.dbg.eng_thread
def ghidra_trace_put_threads_exdi(pid):
"""
Put the current process's threads into the Ghidra trace
"""
def ghidra_trace_put_threads_exdi(pid: int) -> None:
"""Put the current process's threads into the Ghidra trace."""
radix = util.get_convenience_variable('output-radix')
commands.STATE.require_tx()
with commands.STATE.client.batch() as b:
put_threads_exdi(commands.STATE, pid, radix)
trace, tx = commands.STATE.require_tx()
with trace.client.batch() as b:
put_threads_exdi(trace, pid, radix)
@util.dbg.eng_thread
def ghidra_trace_put_all_exdi():
"""
Put everything currently selected into the Ghidra trace
"""
def ghidra_trace_put_all_exdi() -> None:
"""Put everything currently selected into the Ghidra trace."""
radix = util.get_convenience_variable('output-radix')
commands.STATE.require_tx()
with commands.STATE.client.batch() as b:
trace, tx = commands.STATE.require_tx()
with trace.client.batch() as b:
if util.dbg.use_generics == False:
put_processes_exdi(commands.STATE, radix)
put_regions_exdi(commands.STATE)
put_kmodules_exdi(commands.STATE)
put_processes_exdi(trace, radix)
put_regions_exdi(trace)
put_kmodules_exdi(trace)
@util.dbg.eng_thread
def put_processes_exdi(state, radix):
def put_processes_exdi(trace: Trace, radix: int) -> None:
radix = util.get_convenience_variable('output-radix')
keys = []
result = util.dbg._base.cmd("!process 0 0")
lines = list(x for x in result.splitlines() if "DeepFreeze" not in x)
count = int((len(lines)-2)/5)
for i in range(0,count):
l1 = lines[i*5+1].strip().split() # PROCESS
l2 = lines[i*5+2].strip().split() # SessionId, Cid, Peb: ParentId
l3 = lines[i*5+3].strip().split() # DirBase, ObjectTable, HandleCount
l4 = lines[i*5+4].strip().split() # Image
for i in range(0, count):
l1 = lines[i*5+1].strip().split() # PROCESS
l2 = lines[i*5+2].strip().split() # SessionId, Cid, Peb: ParentId
l3 = lines[i*5+3].strip().split() # DirBase, ObjectTable, HandleCount
l4 = lines[i*5+4].strip().split() # Image
id = int(l2[3], 16)
name = l4[1]
ppath = PROCESS_PATTERN.format(pid=id)
procobj = state.trace.create_object(ppath)
procobj = trace.create_object(ppath)
keys.append(PROCESS_KEY_PATTERN.format(pid=id))
pidstr = ('0x{:x}' if radix ==
pidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(id)
procobj.set_value('PID', id)
procobj.set_value('Name', name)
procobj.set_value('_display', '[{}] {}'.format(pidstr, name))
(base, addr) = commands.map_address(int(l1[1],16))
procobj.set_value('EPROCESS', addr, schema="ADDRESS")
(base, addr) = commands.map_address(int(l2[5],16))
procobj.set_value('PEB', addr, schema="ADDRESS")
(base, addr) = commands.map_address(int(l3[1],16))
procobj.set_value('DirBase', addr, schema="ADDRESS")
(base, addr) = commands.map_address(int(l3[3],16))
procobj.set_value('ObjectTable', addr, schema="ADDRESS")
#procobj.set_value('ObjectTable', l3[3])
tcobj = state.trace.create_object(ppath+".Threads")
(base, addr) = commands.map_address(int(l1[1], 16))
procobj.set_value('EPROCESS', addr, schema=sch.ADDRESS)
(base, addr) = commands.map_address(int(l2[5], 16))
procobj.set_value('PEB', addr, schema=sch.ADDRESS)
(base, addr) = commands.map_address(int(l3[1], 16))
procobj.set_value('DirBase', addr, schema=sch.ADDRESS)
(base, addr) = commands.map_address(int(l3[3], 16))
procobj.set_value('ObjectTable', addr, schema=sch.ADDRESS)
# procobj.set_value('ObjectTable', l3[3])
tcobj = trace.create_object(ppath+".Threads")
procobj.insert()
tcobj.insert()
state.trace.proxy_object_path(PROCESSES_PATH).retain_values(keys)
trace.proxy_object_path(PROCESSES_PATH).retain_values(keys)
@util.dbg.eng_thread
def put_regions_exdi(state):
def put_regions_exdi(trace: Trace) -> None:
radix = util.get_convenience_variable('output-radix')
keys = []
result = util.dbg._base.cmd("!address")
@@ -153,33 +144,33 @@ def put_regions_exdi(state):
continue
if init == False:
continue
fields = l.strip().replace('`','').split() # PROCESS
fields = l.strip().replace('`', '').split() # PROCESS
if len(fields) < 4:
continue
start = fields[0]
#finish = fields[1]
# finish = fields[1]
length = fields[2]
type = fields[3]
(sbase, saddr) = commands.map_address(int(start,16))
#(fbase, faddr) = commands.map_address(int(finish,16))
rng = saddr.extend(int(length,16))
(sbase, saddr) = commands.map_address(int(start, 16))
# (fbase, faddr) = commands.map_address(int(finish,16))
rng = saddr.extend(int(length, 16))
rpath = REGION_PATTERN.format(start=start)
keys.append(REGION_KEY_PATTERN.format(start=start))
regobj = state.trace.create_object(rpath)
regobj.set_value('Range', rng, schema="RANGE")
regobj = trace.create_object(rpath)
regobj.set_value('Range', rng, schema=sch.RANGE)
regobj.set_value('Size', length)
regobj.set_value('Type', type)
regobj.set_value('_readable', True)
regobj.set_value('_writable', True)
regobj.set_value('_executable', True)
regobj.set_value('_display', '[{}] {}'.format(
start, type))
start, type))
regobj.insert()
state.trace.proxy_object_path(MEMORY_PATH).retain_values(keys)
trace.proxy_object_path(MEMORY_PATH).retain_values(keys)
@util.dbg.eng_thread
def put_kmodules_exdi(state):
def put_kmodules_exdi(trace: Trace) -> None:
radix = util.get_convenience_variable('output-radix')
keys = []
result = util.dbg._base.cmd("lm")
@@ -190,32 +181,33 @@ def put_kmodules_exdi(state):
continue
if "Unloaded" in l:
continue
fields = l.strip().replace('`','').split()
fields = l.strip().replace('`', '').split()
if len(fields) < 3:
continue
start = fields[0]
finish = fields[1]
name = fields[2]
sname = name.replace('.sys','').replace('.dll','')
(sbase, saddr) = commands.map_address(int(start,16))
(fbase, faddr) = commands.map_address(int(finish,16))
sz = faddr.offset - saddr.offset
sname = name.replace('.sys', '').replace('.dll', '')
(sbase, saddr) = commands.map_address(int(start, 16))
(fbase, faddr) = commands.map_address(int(finish, 16))
sz = faddr.offset - saddr.offset
rng = saddr.extend(sz)
mpath = KMODULE_PATTERN.format(modpath=sname)
keys.append(KMODULE_KEY_PATTERN.format(modpath=sname))
modobj = commands.STATE.trace.create_object(mpath)
modobj = trace.create_object(mpath)
modobj.set_value('Name', name)
modobj.set_value('Base', saddr, schema="ADDRESS")
modobj.set_value('Range', rng, schema="RANGE")
modobj.set_value('Base', saddr, schema=sch.ADDRESS)
modobj.set_value('Range', rng, schema=sch.RANGE)
modobj.set_value('Size', hex(sz))
modobj.insert()
state.trace.proxy_object_path(KMODULES_PATH).retain_values(keys)
trace.proxy_object_path(KMODULES_PATH).retain_values(keys)
@util.dbg.eng_thread
def put_threads_exdi(state, pid, radix):
def put_threads_exdi(trace: Trace, pid: int, radix: int) -> None:
radix = util.get_convenience_variable('output-radix')
pidstr = ('0x{:x}' if radix == 16 else '0{:o}' if radix == 8 else '{}').format(pid)
pidstr = ('0x{:x}' if radix == 16 else '0{:o}' if radix ==
8 else '{}').format(pid)
keys = []
result = util.dbg._base.cmd("!process "+hex(pid)+" 4")
lines = result.split("\n")
@@ -223,15 +215,15 @@ def put_threads_exdi(state, pid, radix):
l = l.strip()
if "THREAD" not in l:
continue
fields = l.split()
cid = fields[3] # pid.tid (decimal)
tid = int(cid.split('.')[1],16)
fields = l.split()
cid = fields[3] # pid.tid (decimal)
tid = int(cid.split('.')[1], 16)
tidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(tid)
tpath = THREAD_PATTERN.format(pid=pid, tnum=tid)
tobj = commands.STATE.trace.create_object(tpath)
tobj = trace.create_object(tpath)
keys.append(THREAD_KEY_PATTERN.format(tnum=tidstr))
tobj = state.trace.create_object(tpath)
tobj = trace.create_object(tpath)
tobj.set_value('PID', pidstr)
tobj.set_value('TID', tidstr)
tobj.set_value('_display', '[{}]'.format(tidstr))
@@ -240,5 +232,5 @@ def put_threads_exdi(state, pid, radix):
tobj.set_value('Win32Thread', fields[7])
tobj.set_value('State', fields[8])
tobj.insert()
commands.STATE.trace.proxy_object_path(
THREADS_PATTERN.format(pid=pidstr)).retain_values(keys)
trace.proxy_object_path(THREADS_PATTERN.format(
pid=pidstr)).retain_values(keys)
@@ -16,15 +16,17 @@
import re
from ghidratrace import sch
from ghidratrace.client import MethodRegistry, ParamDesc, Address, AddressRange
from ghidratrace.client import (MethodRegistry, ParamDesc, Address,
AddressRange, TraceObject)
from ghidradbg import util, commands, methods
from ghidradbg.methods import REGISTRY, SESSIONS_PATTERN, SESSION_PATTERN, extre
from . import exdi_commands
XPROCESSES_PATTERN = extre(SESSION_PATTERN, '\.ExdiProcesses')
XPROCESS_PATTERN = extre(XPROCESSES_PATTERN, '\[(?P<procnum>\\d*)\]')
XTHREADS_PATTERN = extre(XPROCESS_PATTERN, '\.Threads')
XPROCESSES_PATTERN = extre(SESSION_PATTERN, '\\.ExdiProcesses')
XPROCESS_PATTERN = extre(XPROCESSES_PATTERN, '\\[(?P<procnum>\\d*)\\]')
XTHREADS_PATTERN = extre(XPROCESS_PATTERN, '\\.Threads')
def find_pid_by_pattern(pattern, object, err_msg):
mat = pattern.fullmatch(object.path)
@@ -38,16 +40,23 @@ def find_pid_by_obj(object):
return find_pid_by_pattern(XTHREADS_PATTERN, object, "an ExdiThreadsContainer")
class ExdiProcessContainer(TraceObject):
pass
class ExdiThreadContainer(TraceObject):
pass
@REGISTRY.method(action='refresh', display="Refresh Target Processes")
def refresh_exdi_processes(node: sch.Schema('ExdiProcessContainer')):
def refresh_exdi_processes(node: ExdiProcessContainer) -> None:
"""Refresh the list of processes in the target kernel."""
with commands.open_tracked_tx('Refresh Processes'):
exdi_commands.ghidra_trace_put_processes_exdi()
@REGISTRY.method(action='refresh', display="Refresh Process Threads")
def refresh_exdi_threads(node: sch.Schema('ExdiThreadContainer')):
def refresh_exdi_threads(node: ExdiThreadContainer) -> None:
"""Refresh the list of threads in the process."""
pid = find_pid_by_obj(node)
with commands.open_tracked_tx('Refresh Threads'):
File diff suppressed because it is too large Load Diff
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
import ctypes
import os
File diff suppressed because it is too large Load Diff
@@ -1,5 +1,6 @@
<context>
<schema name="DbgRoot" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="EventScope" />
<attribute name="Sessions" schema="SessionContainer" required="yes" fixed="yes" />
<attribute name="Settings" schema="ANY" />
<attribute name="State" schema="State" />
@@ -16,7 +17,6 @@
</schema>
<schema name="Session" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="EventScope" />
<interface name="FocusScope" />
<interface name="Aggregate" />
<element schema="VOID" />
@@ -1,5 +1,6 @@
<context>
<schema name="DbgRoot" canonical="yes" elementResync="NEVER" attributeResync="NEVER">
<interface name="EventScope" />
<attribute name="Sessions" schema="SessionContainer" required="yes" fixed="yes" />
<attribute name="Settings" schema="ANY" />
<attribute name="State" schema="ANY" />
@@ -16,7 +17,6 @@
</schema>
<schema name="Session" elementResync="NEVER" attributeResync="NEVER">
<interface name="Activatable" />
<interface name="EventScope" />
<interface name="FocusScope" />
<interface name="Aggregate" />
<element schema="VOID" />
File diff suppressed because it is too large Load Diff
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ghidradrgn"
version = "11.3"
version = "11.4"
authors = [
{ name="Ghidra Development Team" },
]
@@ -17,7 +17,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==11.3",
"ghidratrace==11.4",
]
[project.urls]
File diff suppressed because it is too large Load Diff
@@ -19,12 +19,14 @@ from io import StringIO
import re
import sys
import time
from typing import Annotated, Any, Dict, Optional
import drgn
import drgn.cli
from ghidratrace import sch
from ghidratrace.client import MethodRegistry, ParamDesc, Address, AddressRange
from ghidratrace.client import (
MethodRegistry, ParamDesc, Address, AddressRange, TraceObject)
from . import util, commands, hooks
@@ -133,12 +135,12 @@ def find_frame_by_level(level):
except Exception as e:
print(e)
return
for i,f in enumerate(frames):
for i, f in enumerate(frames):
if i == level:
if i != util.selected_frame():
util.select_frame(i)
return i,f
return i, f
def find_frame_by_pattern(pattern, object, err_msg):
@@ -185,11 +187,59 @@ def find_module_by_obj(object):
return find_module_by_pattern(MODULE_PATTERN, object, "a Module")
shared_globals = dict()
shared_globals: Dict[str, Any] = dict()
@REGISTRY.method
def execute(cmd: str, to_string: bool=False):
class Environment(TraceObject):
pass
class LocalsContainer(TraceObject):
pass
class Memory(TraceObject):
pass
class ModuleContainer(TraceObject):
pass
class Process(TraceObject):
pass
class ProcessContainer(TraceObject):
pass
class Stack(TraceObject):
pass
class RegisterValueContainer(TraceObject):
pass
class StackFrame(TraceObject):
pass
class SymbolContainer(TraceObject):
pass
class Thread(TraceObject):
pass
class ThreadContainer(TraceObject):
pass
@REGISTRY.method()
def execute(cmd: str, to_string: bool = False) -> Optional[str]:
"""Execute a Python3 command or script."""
if to_string:
data = StringIO()
@@ -198,49 +248,48 @@ def execute(cmd: str, to_string: bool=False):
return data.getvalue()
else:
exec(cmd, shared_globals)
return None
@REGISTRY.method(action='refresh', display='Refresh Processes')
def refresh_processes(node: sch.Schema('ProcessContainer')):
def refresh_processes(node: ProcessContainer) -> None:
"""Refresh the list of processes."""
with commands.open_tracked_tx('Refresh Processes'):
commands.ghidra_trace_put_processes()
@REGISTRY.method(action='refresh', display='Refresh Environment')
def refresh_environment(node: sch.Schema('Environment')):
def refresh_environment(node: Environment) -> None:
"""Refresh the environment descriptors (arch, os, endian)."""
with commands.open_tracked_tx('Refresh Environment'):
commands.ghidra_trace_put_environment()
@REGISTRY.method(action='refresh', display='Refresh Threads')
def refresh_threads(node: sch.Schema('ThreadContainer')):
def refresh_threads(node: ThreadContainer) -> None:
"""Refresh the list of threads in the process."""
with commands.open_tracked_tx('Refresh Threads'):
commands.ghidra_trace_put_threads()
# @REGISTRY.method(action='refresh', display='Refresh Symbols')
# def refresh_symbols(node: sch.Schema('SymbolContainer')):
# """Refresh the list of symbols in the process."""
# with commands.open_tracked_tx('Refresh Symbols'):
# commands.ghidra_trace_put_symbols()
# def refresh_symbols(node: SymbolContainer) -> None:
# """Refresh the list of symbols in the process."""
# with commands.open_tracked_tx('Refresh Symbols'):
# commands.ghidra_trace_put_symbols()
@REGISTRY.method(action='show_symbol', display='Retrieve Symbols')
def retrieve_symbols(
session: sch.Schema('SymbolContainer'),
pattern: ParamDesc(str, display='Pattern')):
"""
Load the symbol set matching the pattern.
"""
conainer: SymbolContainer,
pattern: Annotated[str, ParamDesc(display='Pattern')]) -> None:
"""Load the symbol set matching the pattern."""
with commands.open_tracked_tx('Retrieve Symbols'):
commands.put_symbols(pattern)
@REGISTRY.method(action='refresh', display='Refresh Stack')
def refresh_stack(node: sch.Schema('Stack')):
def refresh_stack(node: Stack) -> None:
"""Refresh the backtrace for the thread."""
tnum = find_thread_by_stack_obj(node)
with commands.open_tracked_tx('Refresh Stack'):
@@ -248,55 +297,53 @@ def refresh_stack(node: sch.Schema('Stack')):
@REGISTRY.method(action='refresh', display='Refresh Registers')
def refresh_registers(node: sch.Schema('RegisterValueContainer')):
"""Refresh the register values for the selected frame"""
def refresh_registers(node: RegisterValueContainer) -> None:
"""Refresh the register values for the selected frame."""
level = find_frame_by_regs_obj(node)
with commands.open_tracked_tx('Refresh Registers'):
commands.ghidra_trace_putreg()
@REGISTRY.method(action='refresh', display='Refresh Locals')
def refresh_locals(node: sch.Schema('LocalsContainer')):
"""Refresh the local values for the selected frame"""
def refresh_locals(node: LocalsContainer) -> None:
"""Refresh the local values for the selected frame."""
level = find_frame_by_locals_obj(node)
with commands.open_tracked_tx('Refresh Registers'):
commands.ghidra_trace_put_locals()
if hasattr(drgn, 'RelocatableModule'):
@REGISTRY.method(action='refresh', display='Refresh Memory')
def refresh_mappings(node: sch.Schema('Memory')):
"""Refresh the list of memory regions for the process."""
with commands.open_tracked_tx('Refresh Memory Regions'):
commands.ghidra_trace_put_regions()
@REGISTRY.method(action='refresh', display='Refresh Memory',
condition=hasattr(drgn, 'RelocatableModule'))
def refresh_mappings(node: Memory) -> None:
"""Refresh the list of memory regions for the process."""
with commands.open_tracked_tx('Refresh Memory Regions'):
commands.ghidra_trace_put_regions()
if hasattr(drgn, 'RelocatableModule'):
@REGISTRY.method(action='refresh', display='Refresh Modules')
def refresh_modules(node: sch.Schema('ModuleContainer')):
"""
Refresh the modules list for the process.
"""
with commands.open_tracked_tx('Refresh Modules'):
commands.ghidra_trace_put_modules()
@REGISTRY.method(action='refresh', display='Refresh Modules',
condition=hasattr(drgn, 'RelocatableModule'))
def refresh_modules(node: ModuleContainer) -> None:
"""Refresh the modules list for the process."""
with commands.open_tracked_tx('Refresh Modules'):
commands.ghidra_trace_put_modules()
@REGISTRY.method(action='activate')
def activate_process(process: sch.Schema('Process')):
def activate_process(process: Process) -> None:
"""Switch to the process."""
find_proc_by_obj(process)
@REGISTRY.method(action='activate')
def activate_thread(thread: sch.Schema('Thread')):
def activate_thread(thread: Thread) -> None:
"""Switch to the thread."""
find_thread_by_obj(thread)
@REGISTRY.method(action='activate')
def activate_frame(frame: sch.Schema('StackFrame')):
def activate_frame(frame: StackFrame) -> None:
"""Select the frame."""
i,f = find_frame_by_obj(frame)
i, f = find_frame_by_obj(frame)
util.select_frame(i)
with commands.open_tracked_tx('Refresh Stack'):
commands.ghidra_trace_put_frames()
@@ -304,12 +351,12 @@ def activate_frame(frame: sch.Schema('StackFrame')):
commands.ghidra_trace_putreg()
@REGISTRY.method
def read_mem(process: sch.Schema('Process'), range: AddressRange):
@REGISTRY.method()
def read_mem(process: Process, range: AddressRange) -> None:
"""Read memory."""
# print("READ_MEM: process={}, range={}".format(process, range))
nproc = find_proc_by_obj(process)
offset_start = process.trace.memory_mapper.map_back(
offset_start = process.trace.extra.require_mm().map_back(
nproc, Address(range.space, range.min))
with commands.open_tracked_tx('Read Memory'):
result = commands.put_bytes(
@@ -320,9 +367,8 @@ def read_mem(process: sch.Schema('Process'), range: AddressRange):
@REGISTRY.method(action='attach', display='Attach by pid')
def attach_pid(
processes: sch.Schema('ProcessContainer'),
pid: ParamDesc(str, display='PID')):
def attach_pid(processes: ProcessContainer,
pid: Annotated[str, ParamDesc(display='PID')]) -> None:
"""Attach the process to the given target."""
prog = drgn.Program()
prog.set_pid(int(pid))
@@ -333,7 +379,7 @@ def attach_pid(
prog.load_debug_info(None, **default_symbols)
except drgn.MissingDebugInfoError as e:
print(e)
#commands.ghidra_trace_start(pid)
# commands.ghidra_trace_start(pid)
commands.PROGRAMS[pid] = prog
commands.prog = prog
with commands.open_tracked_tx('Refresh Processes'):
@@ -341,9 +387,8 @@ def attach_pid(
@REGISTRY.method(action='attach', display='Attach core dump')
def attach_core(
processes: sch.Schema('ProcessContainer'),
core: ParamDesc(str, display='Core dump')):
def attach_core(processes: ProcessContainer,
core: Annotated[str, ParamDesc(display='Core dump')]) -> None:
"""Attach the process to the given target."""
prog = drgn.Program()
prog.set_core_dump(core)
@@ -352,7 +397,7 @@ def attach_core(
prog.load_debug_info(None, **default_symbols)
except drgn.MissingDebugInfoError as e:
print(e)
util.selected_pid += 1
commands.PROGRAMS[util.selected_pid] = prog
commands.prog = prog
@@ -361,7 +406,8 @@ def attach_core(
@REGISTRY.method(action='step_into')
def step_into(thread: sch.Schema('Thread'), n: ParamDesc(int, display='N')=1):
def step_into(thread: Thread,
n: Annotated[int, ParamDesc(display='N')] = 1) -> None:
"""Step one instruction exactly."""
find_thread_by_obj(thread)
time.sleep(1)
@@ -369,22 +415,20 @@ def step_into(thread: sch.Schema('Thread'), n: ParamDesc(int, display='N')=1):
# @REGISTRY.method
# def kill(process: sch.Schema('Process')):
# def kill(process: Process) -> None:
# """Kill execution of the process."""
# commands.ghidra_trace_kill()
# @REGISTRY.method(action='resume')
# def go(process: sch.Schema('Process')):
# def go(process: Process) -> None:
# """Continue execution of the process."""
# util.dbg.run_async(lambda: dbg().go())
# @REGISTRY.method
# def interrupt(process: sch.Schema('Process')):
# def interrupt(process: Process) -> None:
# """Interrupt the execution of the debugged program."""
# # SetInterrupt is reentrant, so bypass the thread checks
# util.dbg._protected_base._control.SetInterrupt(
# DbgEng.DEBUG_INTERRUPT_ACTIVE)
@@ -18,5 +18,6 @@ src/main/py/LICENSE||GHIDRA||||END|
src/main/py/MANIFEST.in||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidragdb/py.typed||GHIDRA||||END|
src/main/py/src/ghidragdb/schema.xml||GHIDRA||||END|
src/main/py/tests/EMPTY||GHIDRA||||END|
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ghidragdb"
version = "11.3"
version = "11.4"
authors = [
{ name="Ghidra Development Team" },
]
@@ -17,9 +17,12 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==11.3",
"ghidratrace==11.4",
]
[project.urls]
"Homepage" = "https://github.com/NationalSecurityAgency/ghidra"
"Bug Tracker" = "https://github.com/NationalSecurityAgency/ghidra/issues"
[tool.setuptools.package-data]
ghidragdb = ["py.typed"]
@@ -13,17 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from ghidratrace.client import Address, RegVal
import gdb
# NOTE: This map is derived from the ldefs using a script
# i386 is hand-patched
language_map = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'aarch64:ilp32': ['AARCH64:BE:32:ilp32', 'AARCH64:LE:32:ilp32', 'AARCH64:LE:64:AppleSilicon'],
language_map: Dict[str, List[str]] = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'aarch64:ilp32': ['AARCH64:BE:32:ilp32', 'AARCH64:LE:32:ilp32',
'AARCH64:LE:64:AppleSilicon'],
'arm': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'arm_any': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'arm_any': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8',
'ARM:LE:32:v8T'],
'armv2': ['ARM:BE:32:v4', 'ARM:LE:32:v4'],
'armv2a': ['ARM:BE:32:v4', 'ARM:LE:32:v4'],
'armv3': ['ARM:BE:32:v4', 'ARM:LE:32:v4'],
@@ -55,7 +59,8 @@ language_map = {
'i386:x86-64': ['x86:LE:64:default'],
'i386:x86-64:intel': ['x86:LE:64:default'],
'i8086': ['x86:LE:16:Protected Mode', 'x86:LE:16:Real Mode'],
'iwmmxt': ['ARM:BE:32:v7', 'ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v7', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'iwmmxt': ['ARM:BE:32:v7', 'ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v7',
'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'm68hc12': ['HC-12:BE:16:default'],
'm68k': ['68000:BE:32:default'],
'm68k:68020': ['68000:BE:32:MC68020'],
@@ -63,41 +68,51 @@ language_map = {
'm9s12x': ['HCS-12:BE:24:default', 'HCS-12X:BE:24:default'],
'mips:3000': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
'mips:4000': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
'mips:5000': ['MIPS:BE:64:64-32addr', 'MIPS:BE:64:default', 'MIPS:LE:64:64-32addr', 'MIPS:LE:64:default'],
'mips:5000': ['MIPS:BE:64:64-32addr', 'MIPS:BE:64:default',
'MIPS:LE:64:64-32addr', 'MIPS:LE:64:default'],
'mips:micromips': ['MIPS:BE:32:micro'],
'msp:430X': ['TI_MSP430:LE:16:default'],
'powerpc:403': ['PowerPC:BE:32:4xx', 'PowerPC:LE:32:4xx'],
'powerpc:MPC8XX': ['PowerPC:BE:32:MPC8270', 'PowerPC:BE:32:QUICC', 'PowerPC:LE:32:QUICC'],
'powerpc:MPC8XX': ['PowerPC:BE:32:MPC8270', 'PowerPC:BE:32:QUICC',
'PowerPC:LE:32:QUICC'],
'powerpc:common': ['PowerPC:BE:32:default', 'PowerPC:LE:32:default'],
'powerpc:common64': ['PowerPC:BE:64:64-32addr', 'PowerPC:BE:64:default', 'PowerPC:LE:64:64-32addr', 'PowerPC:LE:64:default'],
'powerpc:common64': ['PowerPC:BE:64:64-32addr', 'PowerPC:BE:64:default',
'PowerPC:LE:64:64-32addr', 'PowerPC:LE:64:default'],
'powerpc:e500': ['PowerPC:BE:32:e500', 'PowerPC:LE:32:e500'],
'powerpc:e500mc': ['PowerPC:BE:64:A2ALT', 'PowerPC:LE:64:A2ALT'],
'powerpc:e500mc64': ['PowerPC:BE:64:A2-32addr', 'PowerPC:BE:64:A2ALT-32addr', 'PowerPC:LE:64:A2-32addr', 'PowerPC:LE:64:A2ALT-32addr'],
'riscv:rv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC', 'RISCV:LE:32:RV32I', 'RISCV:LE:32:RV32IC', 'RISCV:LE:32:RV32IMC', 'RISCV:LE:32:default'],
'riscv:rv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC', 'RISCV:LE:64:RV64I', 'RISCV:LE:64:RV64IC', 'RISCV:LE:64:default'],
'powerpc:e500mc64': ['PowerPC:BE:64:A2-32addr',
'PowerPC:BE:64:A2ALT-32addr',
'PowerPC:LE:64:A2-32addr',
'PowerPC:LE:64:A2ALT-32addr'],
'riscv:rv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC',
'RISCV:LE:32:RV32I', 'RISCV:LE:32:RV32IC',
'RISCV:LE:32:RV32IMC', 'RISCV:LE:32:default'],
'riscv:rv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC',
'RISCV:LE:64:RV64I', 'RISCV:LE:64:RV64IC',
'RISCV:LE:64:default'],
'sh4': ['SuperH4:BE:32:default', 'SuperH4:LE:32:default'],
'sparc:v9b': ['sparc:BE:32:default', 'sparc:BE:64:default'],
'xscale': ['ARM:BE:32:v6', 'ARM:LE:32:v6'],
'z80': ['z80:LE:16:default', 'z8401x:LE:16:default']
}
data64_compiler_map = {
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
x86_compiler_map = {
x86_compiler_map: Dict[Optional[str], str] = {
'GNU/Linux': 'gcc',
'Windows': 'windows',
# This may seem wrong, but Ghidra cspecs really describe the ABI
'Cygwin': 'windows',
}
riscv_compiler_map = {
riscv_compiler_map: Dict[Optional[str], str] = {
'GNU/Linux': 'gcc',
'Cygwin': 'gcc',
}
compiler_map = {
compiler_map: Dict[str, Dict[Optional[str], str]] = {
'DATA:BE:64:default': data64_compiler_map,
'DATA:LE:64:default': data64_compiler_map,
'x86:LE:32:default': x86_compiler_map,
@@ -107,14 +122,14 @@ compiler_map = {
}
def get_arch():
def get_arch() -> str:
return gdb.selected_inferior().architecture().name()
def get_endian():
def get_endian() -> str:
parm = gdb.parameter('endian')
if not parm in ['', 'auto', 'default']:
return parm
return str(parm)
# Once again, we have to hack using the human-readable 'show'
show = gdb.execute('show endian', to_string=True)
if 'little' in show:
@@ -124,10 +139,10 @@ def get_endian():
return 'unrecognized'
def get_osabi():
def get_osabi() -> str:
parm = gdb.parameter('osabi')
if not parm in ['', 'auto', 'default']:
return parm
return str(parm)
# We have to hack around the fact the GDB won't give us the current OS ABI
# via the API if it is "auto" or "default". Using "show", we can get it, but
# we have to parse output meant for a human. The current value will be on
@@ -138,11 +153,11 @@ def get_osabi():
return line.split('"')[-2]
def compute_ghidra_language():
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = gdb.parameter('ghidra-language')
if not lang in ['', 'auto', 'default']:
return lang
return str(lang)
# Get the list of possible languages for the arch. We'll need to sift
# through them by endian and probably prefer default/simpler variants. The
@@ -163,11 +178,11 @@ def compute_ghidra_language():
return 'DATA' + lebe + '64:default'
def compute_ghidra_compiler(lang):
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = gdb.parameter('ghidra-compiler')
if not comp in ['', 'auto', 'default']:
return comp
return str(comp)
# Check if the selected lang has specific compiler recommendations
if not lang in compiler_map:
@@ -185,7 +200,7 @@ def compute_ghidra_compiler(lang):
return 'default'
def compute_ghidra_lcsp():
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
@@ -193,10 +208,10 @@ def compute_ghidra_lcsp():
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, inf: gdb.Inferior, offset: int):
def map(self, inf: gdb.Inferior, offset: int) -> Tuple[str, Address]:
if inf.num == 1:
space = self.defaultSpace
else:
@@ -213,10 +228,10 @@ class DefaultMemoryMapper(object):
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers = {}
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang):
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
@@ -224,16 +239,16 @@ def compute_memory_mapper(lang):
class DefaultRegisterMapper(object):
def __init__(self, byte_order):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
raise ValueError(f"Invalid byte_order: {byte_order}")
self.byte_order = byte_order
self.union_winners = {}
def map_name(self, inf, name):
def map_name(self, inf: gdb.Inferior, name: str):
return name
def convert_value(self, value, type=None):
def convert_value(self, value: gdb.Value,
type: Optional[gdb.Type] = None) -> bytes:
if type is None:
type = value.dynamic_type.strip_typedefs()
l = type.sizeof
@@ -241,39 +256,43 @@ class DefaultRegisterMapper(object):
# NOTE: Might like to pre-lookup 'unsigned char', but it depends on the
# architecture *at the time of lookup*.
cv = value.cast(gdb.lookup_type('unsigned char').array(l - 1))
rng = range(l)
if self.byte_order == 'little':
rng = reversed(rng)
return bytes(cv[i] for i in rng)
rng: Sequence[int] = range(l)
it = reversed(rng) if self.byte_order == 'little' else rng
result = bytes(cv[i] for i in it)
return result
def map_value(self, inf, name, value):
def map_value(self, inf: gdb.Inferior, name: str,
value: gdb.Value) -> RegVal:
try:
av = self.convert_value(value)
except gdb.error as e:
raise gdb.GdbError("Cannot convert {}'s value: '{}', type: '{}'"
.format(name, value, value.type))
raise gdb.GdbError(
f"Cannot convert {name}'s value: '{value}', type: '{value.type}'")
return RegVal(self.map_name(inf, name), av)
def convert_value_back(self, value, size=None):
def convert_value_back(self, value: bytes,
size: Optional[int] = None) -> bytes:
if size is not None:
value = value[-size:].rjust(size, b'\0')
if self.byte_order == 'little':
value = bytes(reversed(value))
return value
def map_name_back(self, inf, name):
def map_name_back(self, inf: gdb.Inferior, name: str) -> str:
return name
def map_value_back(self, inf, name, value):
return RegVal(self.map_name_back(inf, name), self.convert_value_back(value))
def map_value_back(self, inf: gdb.Inferior, name: str,
value: bytes) -> RegVal:
return RegVal(
self.map_name_back(inf, name), self.convert_value_back(value))
class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
def __init__(self):
def __init__(self) -> None:
super().__init__('little')
def map_name(self, inf, name):
def map_name(self, inf: gdb.Inferior, name: str) -> str:
if name == 'eflags':
return 'rflags'
if name.startswith('zmm'):
@@ -281,13 +300,14 @@ class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
return 'ymm' + name[3:]
return super().map_name(inf, name)
def map_value(self, inf, name, value):
def map_value(self, inf: gdb.Inferior, name: str,
value: gdb.Value) -> RegVal:
rv = super().map_value(inf, name, value)
if rv.name.startswith('ymm') and len(rv.value) > 32:
return RegVal(rv.name, rv.value[-32:])
return rv
def map_name_back(self, inf, name):
def map_name_back(self, inf: gdb.Inferior, name: str) -> str:
if name == 'rflags':
return 'eflags'
return name
@@ -296,12 +316,12 @@ class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big')
DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little')
register_mappers = {
register_mappers: Dict[str, DefaultRegisterMapper] = {
'x86:LE:64:default': Intel_x86_64_RegisterMapper()
}
def compute_register_mapper(lang):
def compute_register_mapper(lang: str) -> DefaultRegisterMapper:
if not lang in register_mappers:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER
File diff suppressed because it is too large Load Diff
@@ -13,68 +13,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from dataclasses import dataclass, field
import functools
import time
import traceback
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, cast
import gdb
from ghidratrace.client import Batch
from . import commands, util
class GhidraHookPrefix(gdb.Command):
"""Commands for exporting data to a Ghidra trace"""
"""Commands for exporting data to a Ghidra trace."""
def __init__(self):
def __init__(self) -> None:
super().__init__('hooks-ghidra', gdb.COMMAND_NONE, prefix=True)
GhidraHookPrefix()
@dataclass(frozen=False)
class HookState(object):
__slots__ = ('installed', 'batch', 'skip_continue', 'in_break_w_cont')
installed = False
batch: Optional[Batch] = None
skip_continue = False
in_break_w_cont = False
def __init__(self):
self.installed = False
self.batch = None
self.skip_continue = False
self.in_break_w_cont = False
def ensure_batch(self):
def ensure_batch(self) -> None:
if self.batch is None:
self.batch = commands.STATE.client.start_batch()
self.batch = commands.STATE.require_client().start_batch()
def end_batch(self):
def end_batch(self) -> None:
if self.batch is None:
return
self.batch = None
commands.STATE.client.end_batch()
commands.STATE.require_client().end_batch()
def check_skip_continue(self):
def check_skip_continue(self) -> bool:
skip = self.skip_continue
self.skip_continue = False
return skip
@dataclass(frozen=False)
class InferiorState(object):
__slots__ = ('first', 'regions', 'modules', 'threads', 'breaks', 'visited')
first = True
# For things we can detect changes to between stops
regions: List[util.Region] = field(default_factory=list)
modules = False
threads = False
breaks = False
# For frames and threads that have already been synced since last stop
visited: set[Any] = field(default_factory=set)
def __init__(self):
self.first = True
# For things we can detect changes to between stops
self.regions = []
self.modules = False
self.threads = False
self.breaks = False
# For frames and threads that have already been synced since last stop
self.visited = set()
def record(self, description=None):
def record(self, description: Optional[str] = None) -> None:
first = self.first
self.first = False
trace = commands.STATE.require_trace()
if description is not None:
commands.STATE.trace.snapshot(description)
trace.snapshot(description)
if first:
commands.put_inferiors()
commands.put_environment()
@@ -106,7 +107,8 @@ class InferiorState(object):
print(f"Couldn't record page with SP: {e}")
self.visited.add(hashable_frame)
# NB: These commands (put_modules/put_regions) will fail if the process is running
regions_changed, regions = util.REGION_INFO_READER.have_changed(self.regions)
regions_changed, regions = util.REGION_INFO_READER.have_changed(
self.regions)
if regions_changed:
self.regions = commands.put_regions(regions)
if first or self.modules:
@@ -116,31 +118,29 @@ class InferiorState(object):
commands.put_breakpoints()
self.breaks = False
def record_continued(self):
def record_continued(self) -> None:
commands.put_inferiors()
commands.put_threads()
def record_exited(self, exit_code):
def record_exited(self, exit_code: int) -> None:
inf = gdb.selected_inferior()
ipath = commands.INFERIOR_PATTERN.format(infnum=inf.num)
infobj = commands.STATE.trace.proxy_object_path(ipath)
infobj = commands.STATE.require_trace().proxy_object_path(ipath)
infobj.set_value('Exit Code', exit_code)
infobj.set_value('State', 'TERMINATED')
@dataclass(frozen=False)
class BrkState(object):
__slots__ = ('break_loc_counts',)
break_loc_counts: Dict[gdb.Breakpoint, int] = field(default_factory=dict)
def __init__(self):
self.break_loc_counts = {}
def update_brkloc_count(self, b, count):
def update_brkloc_count(self, b: gdb.Breakpoint, count: int) -> None:
self.break_loc_counts[b] = count
def get_brkloc_count(self, b):
def get_brkloc_count(self, b: gdb.Breakpoint) -> int:
return self.break_loc_counts.get(b, 0)
def del_brkloc_count(self, b):
def del_brkloc_count(self, b: gdb.Breakpoint) -> int:
if b not in self.break_loc_counts:
return 0 # TODO: Print a warning?
count = self.break_loc_counts[b]
@@ -150,40 +150,41 @@ class BrkState(object):
HOOK_STATE = HookState()
BRK_STATE = BrkState()
INF_STATES = {}
INF_STATES: Dict[int, InferiorState] = {}
def log_errors(func):
'''
Wrap a function in a try-except that prints and reraises the
exception.
C = TypeVar('C', bound=Callable)
def log_errors(func: C) -> C:
"""Wrap a function in a try-except that prints and reraises the exception.
This is needed because pybag and/or the COM wrappers do not print
exceptions that occur during event callbacks.
'''
"""
@functools.wraps(func)
def _func(*args, **kwargs):
def _func(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except:
traceback.print_exc()
raise
return _func
return cast(C, _func)
@log_errors
def on_new_inferior(event):
def on_new_inferior(event: gdb.NewInferiorEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
HOOK_STATE.ensure_batch()
with trace.open_tx("New Inferior {}".format(event.inferior.num)):
with trace.open_tx(f"New Inferior {event.inferior.num}"):
commands.put_inferiors() # TODO: Could put just the one....
def on_inferior_selected():
def on_inferior_selected() -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -191,25 +192,25 @@ def on_inferior_selected():
if trace is None:
return
HOOK_STATE.ensure_batch()
with trace.open_tx("Inferior {} selected".format(inf.num)):
with trace.open_tx(f"Inferior {inf.num} selected"):
INF_STATES[inf.num].record()
commands.activate()
@log_errors
def on_inferior_deleted(event):
def on_inferior_deleted(event: gdb.InferiorDeletedEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
if event.inferior.num in INF_STATES:
del INF_STATES[event.inferior.num]
HOOK_STATE.ensure_batch()
with trace.open_tx("Inferior {} deleted".format(event.inferior.num)):
with trace.open_tx(f"Inferior {event.inferior.num} deleted"):
commands.put_inferiors() # TODO: Could just delete the one....
@log_errors
def on_new_thread(event):
def on_new_thread(event: gdb.ThreadEvent) -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -217,7 +218,7 @@ def on_new_thread(event):
# TODO: Syscall clone/exit to detect thread destruction?
def on_thread_selected():
def on_thread_selected(event: Optional[gdb.ThreadEvent]) -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -226,12 +227,12 @@ def on_thread_selected():
return
t = gdb.selected_thread()
HOOK_STATE.ensure_batch()
with trace.open_tx("Thread {}.{} selected".format(inf.num, t.num)):
with trace.open_tx(f"Thread {inf.num}.{t.num} selected"):
INF_STATES[inf.num].record()
commands.activate()
def on_frame_selected():
def on_frame_selected() -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -243,13 +244,13 @@ def on_frame_selected():
if f is None:
return
HOOK_STATE.ensure_batch()
with trace.open_tx("Frame {}.{}.{} selected".format(inf.num, t.num, util.get_level(f))):
with trace.open_tx(f"Frame {inf.num}.{t.num}.{util.get_level(f)} selected"):
INF_STATES[inf.num].record()
commands.activate()
@log_errors
def on_memory_changed(event):
def on_memory_changed(event: gdb.MemoryChangedEvent) -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -257,13 +258,15 @@ def on_memory_changed(event):
if trace is None:
return
HOOK_STATE.ensure_batch()
with trace.open_tx("Memory *0x{:08x} changed".format(event.address)):
commands.put_bytes(event.address, event.address + event.length,
address = int(event.address)
length = int(event.length)
with trace.open_tx(f"Memory *0x{address:08x} changed"):
commands.put_bytes(address, address + length,
pages=False, is_mi=False, from_tty=False)
@log_errors
def on_register_changed(event):
def on_register_changed(event: gdb.RegisterChangedEvent) -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -274,7 +277,7 @@ def on_register_changed(event):
# TODO: How do I get the descriptor from the number?
# For now, just record the lot
HOOK_STATE.ensure_batch()
with trace.open_tx("Register {} changed".format(event.regnum)):
with trace.open_tx(f"Register {event.regnum} changed"):
commands.putreg(event.frame, util.get_register_descs(
event.frame.architecture()))
@@ -300,8 +303,9 @@ def on_cont(event):
state.record_continued()
def check_for_continue(event):
if hasattr(event, 'breakpoints'):
def check_for_continue(event: Optional[gdb.StopEvent]) -> bool:
# Attribute check because of version differences
if isinstance(event, gdb.StopEvent) and hasattr(event, 'breakpoints'):
if HOOK_STATE.in_break_w_cont:
return True
for brk in event.breakpoints:
@@ -315,7 +319,7 @@ def check_for_continue(event):
@log_errors
def on_stop(event):
def on_stop(event: Optional[gdb.StopEvent]) -> None:
if check_for_continue(event):
HOOK_STATE.skip_continue = True
return
@@ -336,7 +340,7 @@ def on_stop(event):
@log_errors
def on_exited(event):
def on_exited(event: gdb.ExitedEvent) -> None:
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
return
@@ -358,13 +362,13 @@ def on_exited(event):
HOOK_STATE.end_batch()
def notify_others_breaks(inf):
def notify_others_breaks(inf: gdb.Inferior) -> None:
for num, state in INF_STATES.items():
if num != inf.num:
state.breaks = True
def modules_changed():
def modules_changed() -> None:
# Assumption: affects the current inferior
inf = gdb.selected_inferior()
if inf.num not in INF_STATES:
@@ -373,22 +377,22 @@ def modules_changed():
@log_errors
def on_clear_objfiles(event):
def on_clear_objfiles(event: gdb.ClearObjFilesEvent) -> None:
modules_changed()
@log_errors
def on_new_objfile(event):
def on_new_objfile(event: gdb.NewObjFileEvent) -> None:
modules_changed()
@log_errors
def on_free_objfile(event):
def on_free_objfile(event: gdb.FreeObjFileEvent) -> None:
modules_changed()
@log_errors
def on_breakpoint_created(b):
def on_breakpoint_created(b: gdb.Breakpoint) -> None:
inf = gdb.selected_inferior()
notify_others_breaks(inf)
if inf.num not in INF_STATES:
@@ -398,7 +402,7 @@ def on_breakpoint_created(b):
return
ibpath = commands.INF_BREAKS_PATTERN.format(infnum=inf.num)
HOOK_STATE.ensure_batch()
with trace.open_tx("Breakpoint {} created".format(b.number)):
with trace.open_tx(f"Breakpoint {b.number} created"):
ibobj = trace.create_object(ibpath)
# Do not use retain_values or it'll remove other locs
commands.put_single_breakpoint(b, ibobj, inf, [])
@@ -406,7 +410,7 @@ def on_breakpoint_created(b):
@log_errors
def on_breakpoint_modified(b):
def on_breakpoint_modified(b: gdb.Breakpoint) -> None:
inf = gdb.selected_inferior()
notify_others_breaks(inf)
if inf.num not in INF_STATES:
@@ -429,7 +433,7 @@ def on_breakpoint_modified(b):
@log_errors
def on_breakpoint_deleted(b):
def on_breakpoint_deleted(b: gdb.Breakpoint) -> None:
inf = gdb.selected_inferior()
notify_others_breaks(inf)
if inf.num not in INF_STATES:
@@ -451,17 +455,28 @@ def on_breakpoint_deleted(b):
@log_errors
def on_before_prompt():
def on_before_prompt(n: None) -> object:
HOOK_STATE.end_batch()
return None
def cmd_hook(name):
@dataclass(frozen=True)
class HookFunc(object):
wrapped: Callable[[], None]
hook: Type[gdb.Command]
unhook: Callable[[], None]
def _cmd_hook(func):
def __call__(self) -> None:
self.wrapped()
def cmd_hook(name: str):
def _cmd_hook(func: Callable[[], None]) -> HookFunc:
class _ActiveCommand(gdb.Command):
def __init__(self):
def __init__(self) -> None:
# It seems we can't hook commands using the Python API....
super().__init__(f"hooks-ghidra def-{name}", gdb.COMMAND_USER)
gdb.execute(f"""
@@ -470,50 +485,48 @@ def cmd_hook(name):
end
""")
def invoke(self, argument, from_tty):
def invoke(self, argument: str, from_tty: bool) -> None:
self.dont_repeat()
func()
def _unhook_command():
def _unhook_command() -> None:
gdb.execute(f"""
define {name}
end
""")
func.hook = _ActiveCommand
func.unhook = _unhook_command
return func
return HookFunc(func, _ActiveCommand, _unhook_command)
return _cmd_hook
@cmd_hook('hookpost-inferior')
def hook_inferior():
def hook_inferior() -> None:
on_inferior_selected()
@cmd_hook('hookpost-thread')
def hook_thread():
on_thread_selected()
def hook_thread() -> None:
on_thread_selected(None)
@cmd_hook('hookpost-frame')
def hook_frame():
def hook_frame() -> None:
on_frame_selected()
@cmd_hook('hookpost-up')
def hook_frame_up():
def hook_frame_up() -> None:
on_frame_selected()
@cmd_hook('hookpost-down')
def hook_frame_down():
def hook_frame_down() -> None:
on_frame_selected()
# TODO: Checks and workarounds for events missing in gdb 9
def install_hooks():
def install_hooks() -> None:
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
@@ -548,7 +561,7 @@ def install_hooks():
gdb.events.before_prompt.connect(on_before_prompt)
def remove_hooks():
def remove_hooks() -> None:
if not HOOK_STATE.installed:
return
HOOK_STATE.installed = False
@@ -582,12 +595,12 @@ def remove_hooks():
gdb.events.before_prompt.disconnect(on_before_prompt)
def enable_current_inferior():
def enable_current_inferior() -> None:
inf = gdb.selected_inferior()
INF_STATES[inf.num] = InferiorState()
def disable_current_inferior():
def disable_current_inferior() -> None:
inf = gdb.selected_inferior()
if inf.num in INF_STATES:
# Silently ignore already disabled
File diff suppressed because it is too large Load Diff
@@ -1,17 +1,17 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
import gdb
@@ -26,9 +26,11 @@ class GhidraLanguageParameter(gdb.Parameter):
LanguageID.
"""
def __init__(self):
def __init__(self) -> None:
super().__init__('ghidra-language', gdb.COMMAND_DATA, gdb.PARAM_STRING)
self.value = 'auto'
GhidraLanguageParameter()
@@ -39,8 +41,9 @@ class GhidraCompilerParameter(gdb.Parameter):
that valid compiler spec ids depend on the language id.
"""
def __init__(self):
def __init__(self) -> None:
super().__init__('ghidra-compiler', gdb.COMMAND_DATA, gdb.PARAM_STRING)
self.value = 'auto'
GhidraCompilerParameter()
GhidraCompilerParameter()
@@ -13,17 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from abc import abstractmethod
from collections import namedtuple
import bisect
from dataclasses import dataclass
import re
from typing import Callable, Dict, List, Optional, Set, Tuple, Union
import gdb
GdbVersion = namedtuple('GdbVersion', ['full', 'major', 'minor'])
@dataclass(frozen=True)
class GdbVersion:
full: str
major: int
minor: int
def _compute_gdb_ver():
def _compute_gdb_ver() -> GdbVersion:
blurb = gdb.execute('show version', to_string=True)
top = blurb.split('\n')[0]
full = top.split(' ')[-1]
@@ -57,19 +64,49 @@ OBJFILE_SECTION_PATTERN_V9 = re.compile("\\s*" +
GNU_DEBUGDATA_PREFIX = ".gnu_debugdata for "
class Module(namedtuple('BaseModule', ['name', 'base', 'max', 'sections'])):
pass
@dataclass(frozen=True)
class Region:
start: int
end: int
offset: int
perms: Optional[str]
objfile: str
@dataclass(frozen=True)
class Section:
name: str
start: int
end: int
offset: int
attrs: List[str]
def better(self, other: 'Section') -> 'Section':
start = self.start if self.start != 0 else other.start
end = self.end if self.end != 0 else other.end
offset = self.offset if self.offset != 0 else other.offset
attrs = dict.fromkeys(self.attrs)
attrs.update(dict.fromkeys(other.attrs))
return Section(self.name, start, end, offset, list(attrs))
@dataclass(frozen=True)
class Module:
name: str
base: int
max: int
sections: Dict[str, Section]
class Index:
def __init__(self, regions):
self.regions = {}
self.bases = []
def __init__(self, regions: List[Region]) -> None:
self.regions: Dict[int, Region] = {}
self.bases: List[int] = []
for r in regions:
self.regions[r.start] = r
self.bases.append(r.start)
def compute_base(self, address):
def compute_base(self, address: int) -> int:
index = bisect.bisect_right(self.bases, address) - 1
if index == -1:
return address
@@ -84,34 +121,28 @@ class Index:
return region.start
class Section(namedtuple('BaseSection', ['name', 'start', 'end', 'offset', 'attrs'])):
def better(self, other):
start = self.start if self.start != 0 else other.start
end = self.end if self.end != 0 else other.end
offset = self.offset if self.offset != 0 else other.offset
attrs = dict.fromkeys(self.attrs)
attrs.update(dict.fromkeys(other.attrs))
return Section(self.name, start, end, offset, list(attrs))
def try_hexint(val, name):
def try_hexint(val: str, name: str) -> int:
try:
return int(val, 16)
except ValueError:
gdb.write("Invalid {}: {}".format(name, val), stream=gdb.STDERR)
gdb.write(f"Invalid {name}: {val}\n", stream=gdb.STDERR)
return 0
# AFAICT, Objfile does not give info about load addresses :(
class ModuleInfoReader(object):
def name_from_line(self, line):
cmd: str
objfile_pattern: re.Pattern
section_pattern: re.Pattern
def name_from_line(self, line: str) -> Optional[str]:
mat = self.objfile_pattern.fullmatch(line)
if mat is None:
return None
n = mat['name']
return None if mat is None else mat['name']
def section_from_line(self, line, max_addr):
def section_from_line(self, line: str, max_addr: int) -> Optional[Section]:
mat = self.section_pattern.fullmatch(line)
if mat is None:
return None
@@ -122,7 +153,8 @@ class ModuleInfoReader(object):
attrs = [a for a in mat['attrs'].split(' ') if a != '']
return Section(name, start, end, offset, attrs)
def finish_module(self, name, sections, index):
def finish_module(self, name: str, sections: Dict[str, Section],
index: Index) -> Module:
alloc = {k: s for k, s in sections.items() if 'ALLOC' in s.attrs}
if len(alloc) == 0:
return Module(name, 0, 0, alloc)
@@ -130,13 +162,13 @@ class ModuleInfoReader(object):
max_addr = max(s.end for s in alloc.values())
return Module(name, base_addr, max_addr, alloc)
def get_modules(self):
def get_modules(self) -> Dict[str, Module]:
modules = {}
index = Index(REGION_INFO_READER.get_regions())
out = gdb.execute(self.cmd, to_string=True)
max_addr = compute_max_addr()
name = None
sections = None
sections: Dict[str, Section] = {}
for line in out.split('\n'):
n = self.name_from_line(line)
if n is not None:
@@ -176,7 +208,7 @@ class ModuleInfoReaderV11(ModuleInfoReader):
section_pattern = OBJFILE_SECTION_PATTERN_V9
def _choose_module_info_reader():
def _choose_module_info_reader() -> ModuleInfoReader:
if GDB_VERSION.major == 8:
return ModuleInfoReaderV8()
elif GDB_VERSION.major == 9:
@@ -207,15 +239,11 @@ REGION_PATTERN = re.compile("\\s*" +
"(?P<objfile>.*)")
class Region(namedtuple('BaseRegion', ['start', 'end', 'offset', 'perms', 'objfile'])):
pass
class RegionInfoReader(object):
cmd = REGIONS_CMD
region_pattern = REGION_PATTERN
def region_from_line(self, line, max_addr):
def region_from_line(self, line: str, max_addr: int) -> Optional[Region]:
mat = self.region_pattern.fullmatch(line)
if mat is None:
return None
@@ -226,8 +254,8 @@ class RegionInfoReader(object):
objfile = mat['objfile']
return Region(start, end, offset, perms, objfile)
def get_regions(self):
regions = []
def get_regions(self) -> List[Region]:
regions: List[Region] = []
try:
out = gdb.execute(self.cmd, to_string=True)
max_addr = compute_max_addr()
@@ -240,12 +268,12 @@ class RegionInfoReader(object):
regions.append(r)
return regions
def full_mem(self):
def full_mem(self) -> Region:
# TODO: This may not work for Harvard architectures
max_addr = compute_max_addr()
return Region(0, max_addr+1, 0, None, 'full memory')
def have_changed(self, regions):
def have_changed(self, regions: List[Region]) -> Tuple[bool, Optional[List[Region]]]:
if len(regions) == 1 and regions[0].objfile == 'full memory':
return False, None
new_regions = self.get_regions()
@@ -257,7 +285,7 @@ class RegionInfoReader(object):
return mat['perms']
def _choose_region_info_reader():
def _choose_region_info_reader() -> RegionInfoReader:
if 8 <= GDB_VERSION.major:
return RegionInfoReader()
else:
@@ -273,18 +301,23 @@ BREAK_PATTERN = re.compile('')
BREAK_LOC_PATTERN = re.compile('')
class BreakpointLocation(namedtuple('BaseBreakpointLocation', ['address', 'enabled', 'thread_groups'])):
pass
@dataclass(frozen=True)
class BreakpointLocation:
address: int
enabled: bool
thread_groups: List[int]
class BreakpointLocationInfoReaderV8(object):
def breakpoint_from_line(self, line):
class BreakpointLocationInfoReader(object):
@abstractmethod
def get_locations(self, breakpoint: gdb.Breakpoint) -> List[Union[
BreakpointLocation, gdb.BreakpointLocation]]:
pass
def location_from_line(self, line):
pass
def get_locations(self, breakpoint):
class BreakpointLocationInfoReaderV8(BreakpointLocationInfoReader):
def get_locations(self, breakpoint: gdb.Breakpoint) -> List[Union[
BreakpointLocation, gdb.BreakpointLocation]]:
inf = gdb.selected_inferior()
thread_groups = [inf.num]
if breakpoint.location is not None and breakpoint.location.startswith("*0x"):
@@ -295,20 +328,16 @@ class BreakpointLocationInfoReaderV8(object):
return []
class BreakpointLocationInfoReaderV9(object):
def breakpoint_from_line(self, line):
pass
def location_from_line(self, line):
pass
def get_locations(self, breakpoint):
class BreakpointLocationInfoReaderV9(BreakpointLocationInfoReader):
def get_locations(self, breakpoint: gdb.Breakpoint) -> List[Union[
BreakpointLocation, gdb.BreakpointLocation]]:
inf = gdb.selected_inferior()
thread_groups = [inf.num]
if breakpoint.location is None:
return []
try:
address = gdb.parse_and_eval(breakpoint.location).address
address = int(gdb.parse_and_eval(
breakpoint.location).address) & compute_max_addr()
loc = BreakpointLocation(
address, breakpoint.enabled, thread_groups)
return [loc]
@@ -317,12 +346,13 @@ class BreakpointLocationInfoReaderV9(object):
return []
class BreakpointLocationInfoReaderV13(object):
def get_locations(self, breakpoint):
class BreakpointLocationInfoReaderV13(BreakpointLocationInfoReader):
def get_locations(self, breakpoint: gdb.Breakpoint) -> List[Union[
BreakpointLocation, gdb.BreakpointLocation]]:
return breakpoint.locations
def _choose_breakpoint_location_info_reader():
def _choose_breakpoint_location_info_reader() -> BreakpointLocationInfoReader:
if GDB_VERSION.major >= 13:
return BreakpointLocationInfoReaderV13()
if GDB_VERSION.major >= 9:
@@ -337,16 +367,16 @@ def _choose_breakpoint_location_info_reader():
BREAKPOINT_LOCATION_INFO_READER = _choose_breakpoint_location_info_reader()
def set_bool_param_by_api(name, value):
def set_bool_param_by_api(name: str, value: bool) -> None:
gdb.set_parameter(name, value)
def set_bool_param_by_cmd(name, value):
def set_bool_param_by_cmd(name: str, value: bool) -> None:
val = 'on' if value else 'off'
gdb.execute(f'set {name} {val}')
def choose_set_parameter():
def choose_set_parameter() -> Callable[[str, bool], None]:
if GDB_VERSION.major >= 13:
return set_bool_param_by_api
else:
@@ -356,30 +386,32 @@ def choose_set_parameter():
set_bool_param = choose_set_parameter()
def get_level(frame):
def get_level(frame: gdb.Frame) -> int:
if hasattr(frame, "level"):
return frame.level()
else:
level = -1
f = frame
f: Optional[gdb.Frame] = frame
while f is not None:
level += 1
f = f.newer()
return level
class RegisterDesc(namedtuple('BaseRegisterDesc', ['name'])):
pass
@dataclass(frozen=True)
class RegisterDesc:
name: str
def get_register_descs(arch, group='all'):
def get_register_descs(arch: gdb.Architecture, group: str = 'all') -> List[
Union[RegisterDesc, gdb.RegisterDescriptor]]:
if hasattr(arch, "registers"):
try:
return arch.registers(group)
return list(arch.registers(group))
except ValueError: # No such group, or version too old
return arch.registers()
return list(arch.registers())
else:
descs = []
descs: List[Union[RegisterDesc, gdb.RegisterDescriptor]] = []
try:
regset = gdb.execute(
f"info registers {group}", to_string=True).strip().split('\n')
@@ -393,7 +425,7 @@ def get_register_descs(arch, group='all'):
return descs
def selected_frame():
def selected_frame() -> Optional[gdb.Frame]:
try:
return gdb.selected_frame()
except Exception as e:
@@ -401,5 +433,5 @@ def selected_frame():
return None
def compute_max_addr():
def compute_max_addr() -> int:
return (1 << (int(gdb.parse_and_eval("sizeof(void*)")) * 8)) - 1
@@ -1,18 +1,20 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from dataclasses import dataclass
from typing import Dict, List, Optional
import gdb
from . import util
@@ -23,22 +25,23 @@ from .commands import install, cmd
class GhidraWinePrefix(gdb.Command):
"""Commands for tracing Wine processes"""
def __init__(self):
def __init__(self) -> None:
super().__init__('ghidra wine', gdb.COMMAND_SUPPORT, prefix=True)
def is_mapped(pe_file):
def is_mapped(pe_file: str) -> bool:
return pe_file in gdb.execute("info proc mappings", to_string=True)
def set_break(command):
def set_break(command: str) -> gdb.Breakpoint:
breaks_before = set(gdb.breakpoints())
gdb.execute(command)
return (set(gdb.breakpoints()) - breaks_before).pop()
@cmd('ghidra wine run-to-image', '-ghidra-wine-run-to-image', gdb.COMMAND_SUPPORT, False)
def ghidra_wine_run_to_image(pe_file, *, is_mi, **kwargs):
@cmd('ghidra wine run-to-image', '-ghidra-wine-run-to-image',
gdb.COMMAND_SUPPORT, False)
def ghidra_wine_run_to_image(pe_file: str, *, is_mi: bool, **kwargs) -> None:
mprot_catchpoint = set_break("""
catch syscall mprotect
commands
@@ -53,15 +56,18 @@ end
ORIG_MODULE_INFO_READER = util.MODULE_INFO_READER
@dataclass(frozen=False)
class Range(object):
min: int
max: int
def expand(self, region):
if not hasattr(self, 'min'):
self.min = region.start
self.max = region.end
else:
self.min = min(self.min, region.start)
self.max = max(self.max, region.end)
@staticmethod
def from_region(region: util.Region) -> 'Range':
return Range(region.start, region.end)
def expand(self, region: util.Region):
self.min = min(self.min, region.start)
self.max = max(self.max, region.end)
return self
@@ -69,18 +75,18 @@ class Range(object):
MODULE_SUFFIXES = (".exe", ".dll")
class WineModuleInfoReader(object):
class WineModuleInfoReader(util.ModuleInfoReader):
def get_modules(self):
def get_modules(self) -> Dict[str, util.Module]:
modules = ORIG_MODULE_INFO_READER.get_modules()
ranges = dict()
for region in util.REGION_INFO_READER.get_regions():
if not region.objfile in ranges:
ranges[region.objfile] = Range().expand(region)
ranges[region.objfile] = Range.from_region(region)
else:
ranges[region.objfile].expand(region)
for k, v in ranges.items():
if k in modules:
if k in modules:
continue
if not k.lower().endswith(MODULE_SUFFIXES):
continue
@@ -15,4 +15,5 @@ src/main/py/LICENSE||GHIDRA||||END|
src/main/py/MANIFEST.in||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidralldb/py.typed||GHIDRA||||END|
src/main/py/src/ghidralldb/schema.xml||GHIDRA||||END|
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ghidralldb"
version = "11.3"
version = "11.4"
authors = [
{ name="Ghidra Development Team" },
]
@@ -17,9 +17,12 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"ghidratrace==11.3",
"ghidratrace==11.4",
]
[project.urls]
"Homepage" = "https://github.com/NationalSecurityAgency/ghidra"
"Bug Tracker" = "https://github.com/NationalSecurityAgency/ghidra/issues"
[tool.setuptools.package-data]
ghidralldb = ["py.typed"]
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, List, Optional, Tuple
from ghidratrace.client import Address, RegVal
import lldb
@@ -20,8 +21,9 @@ from . import util
# NOTE: This map is derived from the ldefs using a script
language_map = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
language_map: Dict[str, List[str]] = {
'aarch64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'armv4': ['ARM:BE:32:v4', 'ARM:LE:32:v4'],
'armv4t': ['ARM:BE:32:v4t', 'ARM:LE:32:v4t'],
@@ -50,8 +52,10 @@ language_map = {
'thumbv7em': ['ARM:BE:32:Cortex', 'ARM:LE:32:Cortex'],
'armv8': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'armv8l': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'arm64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'arm64e': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'arm64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm64e': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon',
'AARCH64:LE:64:v8A'],
'arm64_32': ['ARM:BE:32:v8', 'ARM:LE:32:v8'],
'mips': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
'mipsr2': ['MIPS:BE:32:default', 'MIPS:LE:32:default'],
@@ -102,8 +106,11 @@ language_map = {
'hexagon': [],
'hexagonv4': [],
'hexagonv5': [],
'riscv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC', 'RISCV:LE:32:RV32I', 'RISCV:LE:32:RV32IC', 'RISCV:LE:32:RV32IMC', 'RISCV:LE:32:default'],
'riscv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC', 'RISCV:LE:64:RV64I', 'RISCV:LE:64:RV64IC', 'RISCV:LE:64:default'],
'riscv32': ['RISCV:LE:32:RV32G', 'RISCV:LE:32:RV32GC', 'RISCV:LE:32:RV32I',
'RISCV:LE:32:RV32IC', 'RISCV:LE:32:RV32IMC',
'RISCV:LE:32:default'],
'riscv64': ['RISCV:LE:64:RV64G', 'RISCV:LE:64:RV64GC', 'RISCV:LE:64:RV64I',
'RISCV:LE:64:RV64IC', 'RISCV:LE:64:default'],
'unknown-mach-32': ['DATA:LE:32:default', 'DATA:LE:32:default'],
'unknown-mach-64': ['DATA:LE:64:default', 'DATA:LE:64:default'],
'arc': [],
@@ -111,19 +118,20 @@ language_map = {
'wasm32': ['x86:LE:32:default'],
}
data64_compiler_map = {
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
x86_compiler_map = {
x86_compiler_map: Dict[Optional[str], str] = {
'windows': 'windows',
'Cygwin': 'windows',
'linux' : 'gcc',
'linux': 'gcc',
'default': 'gcc',
'unknown': 'gcc',
None: 'gcc',
}
default_compiler_map = {
default_compiler_map: Dict[Optional[str], str] = {
'freebsd': 'gcc',
'linux': 'gcc',
'netbsd': 'gcc',
@@ -138,7 +146,7 @@ default_compiler_map = {
'unknown': 'default',
}
compiler_map = {
compiler_map: Dict[str, Dict[Optional[str], str]] = {
'DATA:BE:64:': data64_compiler_map,
'DATA:LE:64:': data64_compiler_map,
'x86:LE:32:': x86_compiler_map,
@@ -148,7 +156,7 @@ compiler_map = {
}
def find_host_triple():
def find_host_triple() -> str:
dbg = util.get_debugger()
for i in range(dbg.GetNumPlatforms()):
platform = dbg.GetPlatformAtIndex(i)
@@ -157,19 +165,19 @@ def find_host_triple():
return 'unrecognized'
def find_triple():
def find_triple() -> str:
triple = util.get_target().triple
if triple is not None:
return triple
return find_host_triple()
def get_arch():
def get_arch() -> str:
triple = find_triple()
return triple.split('-')[0]
def get_endian():
def get_endian() -> str:
parm = util.get_convenience_variable('endian')
if parm != 'auto':
return parm
@@ -183,7 +191,7 @@ def get_endian():
return 'unrecognized'
def get_osabi():
def get_osabi() -> str:
parm = util.get_convenience_variable('osabi')
if not parm in ['auto', 'default']:
return parm
@@ -195,7 +203,7 @@ def get_osabi():
return triple.split('-')[2]
def compute_ghidra_language():
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = util.get_convenience_variable('ghidra-language')
if lang != 'auto':
@@ -223,37 +231,33 @@ def compute_ghidra_language():
return 'DATA' + lebe + '64:default'
def compute_ghidra_compiler(lang):
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = util.get_convenience_variable('ghidra-compiler')
if comp != 'auto':
return comp
# Check if the selected lang has specific compiler recommendations
matched_lang = sorted(
(l for l in compiler_map if l in lang),
key=lambda l: compiler_map[l]
)
if len(matched_lang) == 0:
# NOTE: Unlike other agents, we put prefixes in map keys
matches = [l for l in compiler_map if lang.startswith(l)]
if len(matches) == 0:
print(f"{lang} not found in compiler map - using default compiler")
return 'default'
comp_map = compiler_map[matched_lang[0]]
comp_map = compiler_map[matches[0]]
if comp_map == data64_compiler_map:
print(f"Using the DATA64 compiler map")
osabi = get_osabi()
if osabi in comp_map:
return comp_map[osabi]
if lang.startswith("x86:"):
print(f"{osabi} not found in compiler map - using gcc")
return 'gcc'
if None in comp_map:
return comp_map[None]
def_comp = comp_map[None]
print(f"{osabi} not found in compiler map - using {def_comp} compiler")
return def_comp
print(f"{osabi} not found in compiler map - using default compiler")
return 'default'
def compute_ghidra_lcsp():
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
@@ -261,10 +265,10 @@ def compute_ghidra_lcsp():
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, proc: lldb.SBProcess, offset: int):
def map(self, proc: lldb.SBProcess, offset: int) -> Tuple[str, Address]:
space = self.defaultSpace
return self.defaultSpace, Address(space, offset)
@@ -277,10 +281,10 @@ class DefaultMemoryMapper(object):
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers = {}
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang):
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
@@ -288,31 +292,31 @@ def compute_memory_mapper(lang):
class DefaultRegisterMapper(object):
def __init__(self, byte_order):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
self.byte_order = byte_order
self.union_winners = {}
def map_name(self, proc, name):
def map_name(self, proc: lldb.SBProcess, name: str) -> str:
return name
def map_value(self, proc, name, value):
def map_value(self, proc: lldb.SBProcess, name: str, value: bytes) -> RegVal:
return RegVal(self.map_name(proc, name), value)
def map_name_back(self, proc, name):
def map_name_back(self, proc: lldb.SBProcess, name: str) -> str:
return name
def map_value_back(self, proc, name, value):
def map_value_back(self, proc: lldb.SBProcess, name: str,
value: bytes) -> RegVal:
return RegVal(self.map_name_back(proc, name), value)
class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
def __init__(self):
def __init__(self) -> None:
super().__init__('little')
def map_name(self, proc, name):
def map_name(self, proc: lldb.SBProcess, name: str) -> str:
if name is None:
return 'UNKNOWN'
if name == 'eflags':
@@ -322,26 +326,27 @@ class Intel_x86_64_RegisterMapper(DefaultRegisterMapper):
return 'ymm' + name[3:]
return super().map_name(proc, name)
def map_value(self, proc, name, value):
def map_value(self, proc: lldb.SBProcess, name: str, value: bytes) -> RegVal:
rv = super().map_value(proc, name, value)
if rv.name.startswith('ymm') and len(rv.value) > 32:
return RegVal(rv.name, rv.value[-32:])
return rv
def map_name_back(self, proc, name):
def map_name_back(self, proc: lldb.SBProcess, name: str) -> str:
if name == 'rflags':
return 'eflags'
return super().map_name_back(proc, name)
DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big')
DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little')
register_mappers = {
register_mappers: Dict[str, DefaultRegisterMapper] = {
'x86:LE:64:default': Intel_x86_64_RegisterMapper()
}
def compute_register_mapper(lang):
def compute_register_mapper(lang: str) -> DefaultRegisterMapper:
if not lang in register_mappers:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER
File diff suppressed because it is too large Load Diff
@@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from dataclasses import dataclass, field
import threading
import time
from typing import Any, Optional, Union
import lldb
@@ -24,34 +26,41 @@ from . import commands, util
ALL_EVENTS = 0xFFFF
@dataclass(frozen=False)
class HookState(object):
__slots__ = ('installed', 'mem_catchpoint')
installed = False
def __init__(self):
def __init__(self) -> None:
self.installed = False
self.mem_catchpoint = None
@dataclass(frozen=False)
class ProcessState(object):
__slots__ = ('first', 'regions', 'modules', 'threads',
'breaks', 'watches', 'visited')
first = True
# For things we can detect changes to between stops
regions = False
modules = False
threads = False
breaks = False
watches = False
# For frames and threads that have already been synced since last stop
visited: set[Any] = field(default_factory=set)
def __init__(self):
def __init__(self) -> None:
self.first = True
# For things we can detect changes to between stops
self.regions = False
self.modules = False
self.threads = False
self.breaks = False
self.watches = False
# For frames and threads that have already been synced since last stop
self.visited = set()
def record(self, description=None):
def record(self, description: Optional[str] = None) -> None:
first = self.first
self.first = False
trace = commands.STATE.require_trace()
if description is not None:
commands.STATE.trace.snapshot(description)
trace.snapshot(description)
if first:
commands.put_processes()
commands.put_environment()
@@ -121,7 +130,8 @@ class QuitSentinel(object):
QUIT = QuitSentinel()
def process_event(self, listener, event):
def process_event(self, listener: lldb.SBListener,
event: lldb.SBEvent) -> Union[QuitSentinel, bool]:
try:
desc = util.get_description(event)
# print(f"Event: {desc}")
@@ -130,7 +140,7 @@ def process_event(self, listener, event):
# LLDB may crash on event.GetBroadcasterClass, otherwise
# All the checks below, e.g. SBTarget.EventIsTargetEvent, call this
print(f"Ignoring {desc} because target is invalid")
return
return False
event_process = util.get_process()
if event_process.IsValid() and event_process.GetProcessID() not in PROC_STATE:
PROC_STATE[event_process.GetProcessID()] = ProcessState()
@@ -260,13 +270,14 @@ def process_event(self, listener, event):
return True
except BaseException as e:
print(e)
return False
class EventThread(threading.Thread):
func = process_event
event = lldb.SBEvent()
def run(self):
def run(self) -> None:
# Let's only try at most 4 times to retrieve any kind of event.
# After that, the thread exits.
listener = lldb.SBListener('eventlistener')
@@ -365,40 +376,40 @@ class EventThread(threading.Thread):
"""
def on_new_process(event):
def on_new_process(event: lldb.SBEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(event.process.num)):
with trace.client.batch():
with trace.open_tx(f"New Process {event.process.num}"):
commands.put_processes() # TODO: Could put just the one....
def on_process_selected():
def on_process_selected() -> None:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Process {} selected".format(proc.GetProcessID())):
with trace.client.batch():
with trace.open_tx(f"Process {proc.GetProcessID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.activate()
def on_process_deleted(event):
def on_process_deleted(event: lldb.SBEvent) -> None:
trace = commands.STATE.trace
if trace is None:
return
if event.process.num in PROC_STATE:
del PROC_STATE[event.process.num]
with commands.STATE.client.batch():
with trace.open_tx("Process {} deleted".format(event.process.num)):
with trace.client.batch():
with trace.open_tx(f"Process {event.process.num} deleted"):
commands.put_processes() # TODO: Could just delete the one....
def on_new_thread(event):
def on_new_thread(event: lldb.SBEvent) -> None:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
@@ -406,224 +417,237 @@ def on_new_thread(event):
# TODO: Syscall clone/exit to detect thread destruction?
def on_thread_selected():
def on_thread_selected() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
t = util.selected_thread()
with commands.STATE.client.batch():
with trace.open_tx("Thread {}.{} selected".format(proc.GetProcessID(), t.GetThreadID())):
with trace.client.batch():
with trace.open_tx(f"Thread {proc.GetProcessID()}.{t.GetThreadID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.put_threads()
commands.activate()
return True
def on_frame_selected():
def on_frame_selected() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
f = util.selected_frame()
t = f.GetThread()
with commands.STATE.client.batch():
with trace.open_tx("Frame {}.{}.{} selected".format(proc.GetProcessID(), t.GetThreadID(), f.GetFrameID())):
with trace.client.batch():
with trace.open_tx(f"Frame {proc.GetProcessID()}.{t.GetThreadID()}.{f.GetFrameID()} selected"):
PROC_STATE[proc.GetProcessID()].record()
commands.put_threads()
commands.put_frames()
commands.activate()
return True
def on_syscall_memory():
def on_syscall_memory() -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
PROC_STATE[proc.GetProcessID()].regions = True
return True
def on_memory_changed(event):
def on_memory_changed(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Memory *0x{:08x} changed".format(event.address)):
return False
with trace.client.batch():
with trace.open_tx(f"Memory *0x{event.address:08x} changed"):
commands.put_bytes(event.address, event.address + event.length,
pages=False, is_mi=False, result=None)
pages=False, result=None)
return True
def on_register_changed(event):
# print("Register changed: {}".format(dir(event)))
def on_register_changed(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
# I'd rather have a descriptor!
# TODO: How do I get the descriptor from the number?
# For now, just record the lot
with commands.STATE.client.batch():
with trace.open_tx("Register {} changed".format(event.regnum)):
return False
with trace.client.batch():
with trace.open_tx(f"Register {event.regnum} changed"):
banks = event.frame.GetRegisters()
commands.putreg(
event.frame, banks.GetFirstValueByName(commands.DEFAULT_REGISTER_BANK))
return True
def on_cont(event):
def on_cont(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
state = PROC_STATE[proc.GetProcessID()]
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Continued"):
state.record_continued()
return True
def on_stop(event):
def on_stop(event: lldb.SBEvent) -> bool:
proc = lldb.SBProcess.GetProcessFromEvent(
event) if event is not None else util.get_process()
if proc.GetProcessID() not in PROC_STATE:
print("not in state")
return
return False
trace = commands.STATE.trace
if trace is None:
print("no trace")
return
return False
state = PROC_STATE[proc.GetProcessID()]
state.visited.clear()
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Stopped"):
state.record("Stopped")
commands.put_event_thread()
commands.put_threads()
commands.put_frames()
commands.activate()
return True
def on_exited(event):
def on_exited(event: lldb.SBEvent) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
state = PROC_STATE[proc.GetProcessID()]
state.visited.clear()
exit_code = proc.GetExitStatus()
description = "Exited with code {}".format(exit_code)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx(description):
state.record(description)
state.record_exited(exit_code)
commands.put_event_thread()
commands.activate()
return False
def modules_changed():
def modules_changed() -> bool:
# Assumption: affects the current process
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
PROC_STATE[proc.GetProcessID()].modules = True
return True
def on_new_objfile(event):
def on_new_objfile(event: lldb.SBEvent) -> bool:
modules_changed()
return True
def on_free_objfile(event):
def on_free_objfile(event: lldb.SBEvent) -> bool:
modules_changed()
return True
def on_breakpoint_created(b):
def on_breakpoint_created(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} created".format(b.GetID())):
commands.put_single_breakpoint(b, proc)
return True
def on_breakpoint_modified(b):
def on_breakpoint_modified(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} modified".format(b.GetID())):
commands.put_single_breakpoint(b, proc)
return True
def on_breakpoint_deleted(b):
def on_breakpoint_deleted(b: lldb.SBBreakpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
bpt_path = commands.PROC_BREAK_PATTERN.format(
procnum=proc.GetProcessID(), breaknum=b.GetID())
bpt_obj = trace.proxy_object_path(bpt_path)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Breakpoint {} deleted".format(b.GetID())):
bpt_obj.remove(tree=True)
return True
def on_watchpoint_created(b):
def on_watchpoint_created(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Breakpoint {} created".format(b.GetID())):
commands.put_single_watchpoint(b, proc)
return True
def on_watchpoint_modified(b):
def on_watchpoint_modified(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
return False
with trace.client.batch():
with trace.open_tx("Watchpoint {} modified".format(b.GetID())):
commands.put_single_watchpoint(b, proc)
return True
def on_watchpoint_deleted(b):
def on_watchpoint_deleted(b: lldb.SBWatchpoint) -> bool:
proc = util.get_process()
if proc.GetProcessID() not in PROC_STATE:
return
return False
trace = commands.STATE.trace
if trace is None:
return
return False
wpt_path = commands.PROC_WATCH_PATTERN.format(
procnum=proc.GetProcessID(), watchnum=b.GetID())
wpt_obj = trace.proxy_object_path(wpt_path)
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Watchpoint {} deleted".format(b.GetID())):
wpt_obj.remove(tree=True)
return True
def install_hooks():
def install_hooks() -> None:
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
@@ -632,18 +656,18 @@ def install_hooks():
event_thread.start()
def remove_hooks():
def remove_hooks() -> None:
if not HOOK_STATE.installed:
return
HOOK_STATE.installed = False
def enable_current_process():
def enable_current_process() -> None:
proc = util.get_process()
PROC_STATE[proc.GetProcessID()] = ProcessState()
def disable_current_process():
def disable_current_process() -> None:
proc = util.get_process()
if proc.GetProcessID() in PROC_STATE:
# Silently ignore already disabled
File diff suppressed because it is too large Load Diff
@@ -14,17 +14,24 @@
# limitations under the License.
##
from collections import namedtuple
from dataclasses import dataclass
import os
import re
import sys
from typing import Any, Dict, List, Optional, Union
import lldb
LldbVersion = namedtuple('LldbVersion', ['display', 'full', 'major', 'minor'])
@dataclass(frozen=True)
class LldbVersion:
display: str
full: str
major: int
minor: int
def _compute_lldb_ver():
def _compute_lldb_ver() -> LldbVersion:
blurb = lldb.debugger.GetVersionString()
top = blurb.split('\n')[0]
if ' version ' in top:
@@ -40,12 +47,15 @@ LLDB_VERSION = _compute_lldb_ver()
GNU_DEBUGDATA_PREFIX = ".gnu_debugdata for "
class Module(namedtuple('BaseModule', ['name', 'base', 'max', 'sections'])):
pass
@dataclass
class Section:
name: str
start: int
end: int
offset: int
attrs: List[str]
class Section(namedtuple('BaseSection', ['name', 'start', 'end', 'offset', 'attrs'])):
def better(self, other):
def better(self, other: 'Section') -> 'Section':
start = self.start if self.start != 0 else other.start
end = self.end if self.end != 0 else other.end
offset = self.offset if self.offset != 0 else other.offset
@@ -54,18 +64,17 @@ class Section(namedtuple('BaseSection', ['name', 'start', 'end', 'offset', 'attr
return Section(self.name, start, end, offset, list(attrs))
@dataclass(frozen=True)
class Module:
name: str
base: int
max: int
sections: Dict[str, Section]
# AFAICT, Objfile does not give info about load addresses :(
class ModuleInfoReader(object):
def name_from_line(self, line):
mat = self.objfile_pattern.fullmatch(line)
if mat is None:
return None
n = mat['name']
if n.startswith(GNU_DEBUGDATA_PREFIX):
return None
return None if mat is None else mat['name']
def section_from_sbsection(self, s):
def section_from_sbsection(self, s: lldb.SBSection) -> Section:
start = s.GetLoadAddress(get_target())
if start >= sys.maxsize*2:
start = 0
@@ -75,7 +84,7 @@ class ModuleInfoReader(object):
attrs = s.GetPermissions()
return Section(name, start, end, offset, attrs)
def finish_module(self, name, sections):
def finish_module(self, name: str, sections: Dict[str, Section]) -> Module:
alloc = {k: s for k, s in sections.items()}
if len(alloc) == 0:
return Module(name, 0, 0, alloc)
@@ -91,10 +100,10 @@ class ModuleInfoReader(object):
max_addr = max(s.end for s in alloc.values())
return Module(name, base_addr, max_addr, alloc)
def get_modules(self):
def get_modules(self) -> Dict[str, Module]:
modules = {}
name = None
sections = {}
sections: Dict[str, Section] = {}
for i in range(0, get_target().GetNumModules()):
module = get_target().GetModuleAtIndex(i)
fspec = module.GetFileSpec()
@@ -108,19 +117,24 @@ class ModuleInfoReader(object):
return modules
def _choose_module_info_reader():
def _choose_module_info_reader() -> ModuleInfoReader:
return ModuleInfoReader()
MODULE_INFO_READER = _choose_module_info_reader()
class Region(namedtuple('BaseRegion', ['start', 'end', 'offset', 'perms', 'objfile'])):
pass
@dataclass
class Region:
start: int
end: int
offset: int
perms: Optional[str]
objfile: str
class RegionInfoReader(object):
def region_from_sbmemreg(self, info):
def region_from_sbmemreg(self, info: lldb.SBMemoryRegionInfo) -> Region:
start = info.GetRegionBase()
end = info.GetRegionEnd()
offset = info.GetRegionBase()
@@ -136,7 +150,7 @@ class RegionInfoReader(object):
objfile = info.GetName()
return Region(start, end, offset, perms, objfile)
def get_regions(self):
def get_regions(self) -> List[Region]:
regions = []
reglist = get_process().GetMemoryRegions()
for i in range(0, reglist.GetSize()):
@@ -148,7 +162,7 @@ class RegionInfoReader(object):
regions.append(r)
return regions
def full_mem(self):
def full_mem(self) -> Region:
# TODO: This may not work for Harvard architectures
try:
sizeptr = int(parse_and_eval('sizeof(void*)')) * 8
@@ -157,7 +171,7 @@ class RegionInfoReader(object):
return Region(0, 1 << 64, 0, None, 'full memory')
def _choose_region_info_reader():
def _choose_region_info_reader() -> RegionInfoReader:
return RegionInfoReader()
@@ -169,69 +183,70 @@ BREAK_PATTERN = re.compile('')
BREAK_LOC_PATTERN = re.compile('')
class BreakpointLocation(namedtuple('BaseBreakpointLocation', ['address', 'enabled', 'thread_groups'])):
pass
class BreakpointLocationInfoReader(object):
def get_locations(self, breakpoint):
def get_locations(self, breakpoint: lldb.SBBreakpoint) -> List[
lldb.SBBreakpointLocation]:
return breakpoint.locations
def _choose_breakpoint_location_info_reader():
def _choose_breakpoint_location_info_reader() -> BreakpointLocationInfoReader:
return BreakpointLocationInfoReader()
BREAKPOINT_LOCATION_INFO_READER = _choose_breakpoint_location_info_reader()
def get_debugger():
def get_debugger() -> lldb.SBDebugger:
return lldb.SBDebugger.FindDebuggerWithID(1)
def get_target():
def get_target() -> lldb.SBTarget:
return get_debugger().GetTargetAtIndex(0)
def get_process():
def get_process() -> lldb.SBProcess:
return get_target().GetProcess()
def selected_thread():
def selected_thread() -> lldb.SBThread:
return get_process().GetSelectedThread()
def selected_frame():
def selected_frame() -> lldb.SBFrame:
return selected_thread().GetSelectedFrame()
def parse_and_eval(expr, signed=False):
def parse_and_eval(expr: str, signed: bool = False) -> int:
if signed is True:
return get_eval(expr).GetValueAsSigned()
return get_eval(expr).GetValueAsUnsigned()
def get_eval(expr):
def get_eval(expr: str) -> lldb.SBValue:
eval = get_target().EvaluateExpression(expr)
if eval.GetError().Fail():
raise ValueError(eval.GetError().GetCString())
return eval
def get_description(object, level=None):
def get_description(object: Union[
lldb.SBThread, lldb.SBBreakpoint, lldb.SBWatchpoint, lldb.SBEvent],
level: Optional[int] = None) -> str:
stream = lldb.SBStream()
if level is None:
object.GetDescription(stream)
else:
elif isinstance(object, lldb.SBWatchpoint):
object.GetDescription(stream, level)
else:
raise ValueError(f"Object {object} does not support description level")
return escape_ansi(stream.GetData())
conv_map = {}
conv_map: Dict[str, str] = {}
def get_convenience_variable(id):
#val = get_target().GetEnvironment().Get(id)
def get_convenience_variable(id: str) -> str:
# val = get_target().GetEnvironment().Get(id)
if id not in conv_map:
return "auto"
val = conv_map[id]
@@ -240,21 +255,21 @@ def get_convenience_variable(id):
return val
def set_convenience_variable(id, value):
#env = get_target().GetEnvironment()
def set_convenience_variable(id: str, value: str) -> None:
# env = get_target().GetEnvironment()
# return env.Set(id, value, True)
conv_map[id] = value
def escape_ansi(line):
def escape_ansi(line: str) -> str:
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', line)
def debracket(init):
val = init
def debracket(init: Optional[str]) -> str:
if init is None:
return ""
val = init
val = val.replace("[", "(")
val = val.replace("]", ")")
return val
@@ -43,6 +43,7 @@ import ghidra.trace.model.program.TraceVariableSnapProgramView;
import ghidra.trace.model.thread.TraceThread;
import ghidra.trace.model.time.schedule.PatchStep;
import ghidra.trace.model.time.schedule.TraceSchedule;
import ghidra.trace.model.time.schedule.TraceSchedule.ScheduleForm;
import ghidra.trace.util.TraceRegisterUtils;
import ghidra.util.exception.CancelledException;
import ghidra.util.task.TaskMonitor;
@@ -389,22 +390,47 @@ public enum ControlMode {
*/
public DebuggerCoordinates validateCoordinates(PluginTool tool,
DebuggerCoordinates coordinates, ActivationCause cause) {
if (!followsPresent()) {
if (!followsPresent() || cause != ActivationCause.USER) {
return coordinates;
}
Target target = coordinates.getTarget();
if (target == null) {
return coordinates;
}
if (cause == ActivationCause.USER &&
(!coordinates.getTime().isSnapOnly() || coordinates.getSnap() != target.getSnap())) {
tool.setStatusInfo(
"Cannot navigate time in %s mode. Switch to Trace or Emulate mode first."
ScheduleForm form =
target.getSupportedTimeForm(coordinates.getObject(), coordinates.getSnap());
if (form == null) {
if (coordinates.getTime().isSnapOnly() &&
coordinates.getSnap() == target.getSnap()) {
return coordinates;
}
else {
tool.setStatusInfo("""
Cannot navigate time in %s mode. Switch to Trace or Emulate mode first."""
.formatted(name),
true);
return null;
true);
return null;
}
}
return coordinates;
TraceSchedule norm = form.validate(coordinates.getTrace(), coordinates.getTime());
if (norm != null) {
return coordinates.time(norm);
}
String errMsg = switch (form) {
case SNAP_ONLY -> """
Target can only navigate to snapshots. Switch to Emulate mode first.""";
case SNAP_EVT_STEPS -> """
Target can only replay steps on the event thread. Switch to Emulate mode \
first.""";
case SNAP_ANY_STEPS -> """
Target cannot perform p-code steps. Switch to Emulate mode first.""";
case SNAP_ANY_STEPS_OPS -> throw new AssertionError();
};
tool.setStatusInfo(errMsg, true);
return null;
}
protected TracePlatform platformFor(DebuggerCoordinates coordinates, Address address) {
@@ -34,8 +34,12 @@ import ghidra.trace.model.breakpoint.TraceBreakpointKind;
import ghidra.trace.model.guest.TracePlatform;
import ghidra.trace.model.memory.TraceMemoryState;
import ghidra.trace.model.stack.TraceStackFrame;
import ghidra.trace.model.target.TraceObject;
import ghidra.trace.model.target.path.KeyPath;
import ghidra.trace.model.thread.TraceThread;
import ghidra.trace.model.time.TraceSnapshot;
import ghidra.trace.model.time.schedule.TraceSchedule;
import ghidra.trace.model.time.schedule.TraceSchedule.ScheduleForm;
import ghidra.util.Swing;
import ghidra.util.exception.CancelledException;
import ghidra.util.task.TaskMonitor;
@@ -278,13 +282,51 @@ public interface Target {
* Get the current snapshot key for the target
*
* <p>
* For most targets, this is the most recently created snapshot.
* For most targets, this is the most recently created snapshot. For time-traveling targets, if
* may not be. If this returns a negative number, then it refers to a scratch snapshot and
* almost certainly indicates time travel with instruction steps. Use {@link #getTime()} in that
* case to get a more precise schedule.
*
* @return the snapshot
*/
// TODO: Should this be TraceSchedule getTime()?
long getSnap();
/**
* Get the current time
*
* @return the current time
*/
default TraceSchedule getTime() {
long snap = getSnap();
if (snap >= 0) {
return TraceSchedule.snap(snap);
}
TraceSnapshot snapshot = getTrace().getTimeManager().getSnapshot(snap, false);
if (snapshot == null) {
return null;
}
return snapshot.getSchedule();
}
/**
* Get the form of schedules supported by "activate" on the back end
*
* <p>
* A non-null return value indicates the back end supports time travel. If it does, the return
* value indicates the form of schedules that can be activated, (i.e., via some "go to time"
* command). NOTE: Switching threads is considered an event by every time-traveling back end
* that we know of. Events are usually mapped to a Ghidra trace's snapshots, and so most back
* ends are constrained to schedules of the form {@link ScheduleForm#SNAP_EVT_STEPS}. A back-end
* based on emulation may support thread switching. To support p-code op stepping, the back-end
* will certainly have to be based on p-code emulation, and it must be using the same Sleigh
* language as Ghidra.
*
* @param obj the object (or an ancestor) that may support time travel
* @param snap the <em>destination</em> snapshot
* @return the form
*/
public ScheduleForm getSupportedTimeForm(TraceObject obj, long snap);
/**
* Collect all actions that implement the given common debugger command
*
@@ -402,7 +402,16 @@ public class DebuggerCoordinates {
return new DebuggerCoordinates(trace, platform, target, thread, view, newTime, frame, path);
}
/**
* Get these same coordinates with time replaced by the given schedule
*
* @param newTime the new schedule
* @return the new coordinates
*/
public DebuggerCoordinates time(TraceSchedule newTime) {
if (Objects.equals(time, newTime)) {
return this;
}
if (trace == null) {
return NOWHERE;
}
+21 -12
View File
@@ -36,27 +36,36 @@ dependencies {
testImplementation project(path: ':Framework-TraceModeling', configuration: 'testArtifacts')
}
task generateProtoPy {
ext.srcdir = file("src/main/proto")
ext.src = fileTree(srcdir) {
include "**/*.proto"
}
ext.outdir = file("build/generated/source/proto/main/py")
outputs.dir(outdir)
inputs.files(src)
task configureGenerateProtoPy {
dependsOn(configurations.protocArtifact)
doLast {
def exe = configurations.protocArtifact.first()
if (!isCurrentWindows()) {
exe.setExecutable(true)
}
providers.exec {
commandLine exe, "--python_out=$outdir", "-I$srcdir"
args src
}.result.get()
generateProtoPy.commandLine exe
generateProtoPy.args "--python_out=${generateProtoPy.outdir}"
generateProtoPy.args "--pyi_out=${generateProtoPy.stubsOutdir}"
generateProtoPy.args "-I${generateProtoPy.srcdir}"
generateProtoPy.args generateProtoPy.src
}
}
// Can't use providers.exec, or else we see no output
task generateProtoPy(type:Exec) {
dependsOn(configureGenerateProtoPy)
ext.srcdir = file("src/main/proto")
ext.src = fileTree(srcdir) {
include "**/*.proto"
}
ext.outdir = file("build/generated/source/proto/main/py")
ext.stubsOutdir = file("build/generated/source/proto/main/pyi/ghidratrace")
outputs.dir(outdir)
outputs.dir(stubsOutdir)
inputs.files(src)
}
tasks.assemblePyPackage {
from(generateProtoPy) {
into "src/ghidratrace"
@@ -14,4 +14,5 @@ src/main/help/help/topics/TraceRmiLauncherServicePlugin/images/GdbTerminal.png||
src/main/py/LICENSE||GHIDRA||||END|
src/main/py/README.md||GHIDRA||||END|
src/main/py/pyproject.toml||GHIDRA||||END|
src/main/py/src/ghidratrace/py.typed||GHIDRA||||END|
src/main/py/tests/EMPTY||GHIDRA||||END|

Some files were not shown because too many files have changed in this diff Show More